IODA Bundle
Partitions.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "odc/Partitions.h"
12 
13 #include <fstream>
14 
15 #include "eckit/filesystem/PathName.h"
16 #include "eckit/io/Length.h"
17 #include "eckit/io/Offset.h"
18 #include "eckit/io/PartFileHandle.h"
19 
20 #include "odc/core/MetaData.h"
21 #include "odc/Reader.h"
22 #include "odc/Select.h"
23 
24 using namespace eckit;
25 using namespace std;
26 
27 namespace odc {
28 
29 std::ostream& operator<< (std::ostream& o, const Partitions& p)
30 {
31  for (size_t i (0); i < p.size(); ++i)
32  p[i].save(o, i);
33  //o << "[" << i << ":" << i << p[i] << "], ";
34  return o;
35 }
36 
37 vector<PathName> Partitions::write(const PathName& fileNamePrefix) const
38 {
39  vector<PathName> r;
40 
41  for (size_t i(0); i < size(); ++i)
42  {
43  const Partition& p (at(i));
44 
45  stringstream ss;
46  ss << fileNamePrefix << ".pool_" << i;
47  PathName partitionFileName (ss.str());
48 
49  Log::info() << "" << " ##### Writing partition " << i << " to file " << partitionFileName << endl;
50 
51  p.write(partitionFileName);
52 
53  r.push_back(partitionFileName);
54  }
55  return r;
56 }
57 
58 void Partitions::save(const PathName& partitionFile)
59 {
60  Log::info() << "Saving partitions info to " << partitionFile << endl;
61 
62  ofstream f;
63  f.exceptions(ofstream::badbit | ofstream::failbit);
64  f.open(string(partitionFile).c_str());
65  f << *this;
66  f.close();
67 }
68 
69 std::string Partitions::str() const
70 {
71  stringstream ss;
72  ss << *this;
73  return ss.str();
74 }
75 
76 void Partitions::addReport(const PathName& fileName, ullong blockStart, ullong blockLength, ullong seqno, ullong nRows, ullong rowsPerPartition)
77 {
78  Partitions& parts (*this);
79  if (parts.empty())
80  parts.push_back(Partition());
81 
82  Partition& currentPartition (parts.back());
83  // Do we have to open new partition?
84  if (! (currentPartition.numberOfRows() + nRows <= rowsPerPartition)) {
85  // Open new partition
86  ullong firstRow (0);
87  if (! currentPartition.blocks().empty()) {
88  const Block& lastBlock (currentPartition.blocks().back());
89  if (lastBlock.fileName == fileName && Offset(currentPartition.startOfLastBlock()) == Offset(blockStart))
90  firstRow = currentPartition.rowsOnLastBlock();
91  }
92 
93  parts.push_back(Partition());
94  Partition& newPartition (parts.back());
95 
96  newPartition.add(Block(fileName, blockStart, blockStart + blockLength, firstRow, firstRow + nRows));
97  newPartition.startOfLastBlock(Offset(blockStart));
98  newPartition.rowsOnLastBlock(nRows);
99  } else {
100  // Add to existing partition.
101  // First block seen?
102  if (currentPartition.blocks().empty()) {
103  currentPartition.add(Block(fileName, blockStart, blockStart + blockLength, /*firstRow*/ 0, /*lastRow*/ nRows));
104  currentPartition.startOfLastBlock(Offset(blockStart));
105  currentPartition.rowsOnLastBlock(/*firstRow*/ 0 + nRows);
106  } else {
107  // If this is a new file then we need to create a new block
108  Block& currentBlock (currentPartition.blocks().back());
109 
110  if (currentBlock.fileName != fileName) {
111  currentPartition.add(Block(fileName, blockStart, blockStart + blockLength, /*firstRow*/ 0, /*lastRow*/ nRows));
112  currentPartition.startOfLastBlock(Offset(blockStart));
113  currentPartition.rowsOnLastBlock(/*firstRow*/ 0 + nRows);
114  } else {
115  // It's a block on the same file as the previously processed report.
116  // If this is a report in a new physical block then we need to adjust current block's boundaries
117  if (currentBlock.end < Offset(blockStart + blockLength))
118  currentBlock.end = blockStart + blockLength;
119 
120  currentBlock.lastRow += nRows;
121 
122  if (Offset(currentPartition.startOfLastBlock()) == Offset(blockStart))
123  currentPartition.rowsOnLastBlock(currentPartition.rowsOnLastBlock() + nRows);
124  else {
125  currentPartition.startOfLastBlock(Offset(blockStart));
126  currentPartition.rowsOnLastBlock(/*firstRow*/ 0 + nRows);
127  }
128  }
129  }
130  }
131 }
132 
133 } // namespace odc
134 
eckit::PathName fileName
Definition: Block.h:72
eckit::Offset end
Definition: Block.h:74
ullong lastRow
Definition: Block.h:76
void add(const Block &)
Definition: Partition.cc:112
std::vector< Block > & blocks()
Definition: Partition.h:38
ullong write(const eckit::PathName &fileName) const
ullong rowsOnLastBlock() const
Definition: Partition.h:40
ullong numberOfRows() const
Definition: Partition.cc:98
ullong startOfLastBlock()
Definition: Partition.h:43
Definition: ColumnInfo.h:23
unsigned long long ullong
Definition: Block.h:27
std::ostream & operator<<(std::ostream &o, const Partitions &p)
Definition: Partitions.cc:29
Definition: encode.cc:30