IODA Bundle
Partition.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "odc/Partition.h"
12 
13 #include <fstream>
14 
15 #include "eckit/filesystem/PathName.h"
16 #include "eckit/io/FileHandle.h"
17 #include "eckit/io/Length.h"
18 #include "eckit/io/Offset.h"
19 #include "eckit/io/PartFileHandle.h"
20 #include "eckit/utils/StringTools.h"
21 
22 #include "odc/core/MetaData.h"
23 #include "odc/Reader.h"
24 #include "odc/Select.h"
26 #include "odc/Writer.h"
27 
28 using namespace eckit;
29 using namespace std;
30 
31 namespace odc {
32 
33 std::ostream& operator<< (std::ostream& o, const Partition& p)
34 {
35  o << " number of blocks: " << p.blocks_.size() << " ";
36 
37  for (size_t i (0); i < p.blocks_.size(); ++i)
38  o << p.blocks_[i] << ", ";
39  return o;
40 }
41 
42 std::ostream& Partition::save(std::ostream& o, size_t poolNumber) const
43 {
44  for (size_t i (0); i < blocks_.size(); ++i)
45  o << poolNumber << "\t" << blocks_[i] << endl;
46  return o;
47 }
48 
49 Partition::Partition(const PathName& fileName, size_t partitionNumber)
50 {
51  Log::info() << "Partition::Partition: read info on partition " << partitionNumber << " from " << fileName << endl;
52  if (! fileName.exists())
53  throw UserError(string(fileName) + " does not exist");
54 
55  std::ifstream f (string(fileName).c_str());
56 
57  string line;
58  while (std::getline(f, line))
59  {
60  Log::info() << "Partition::Partition: line: " << line << endl;
61 
62  vector<string> fs (StringTools::split("\t", line));
63  ASSERT(fs.size() == 2);
64 
65  size_t part (atol(fs[0].c_str()));
66 
67  // TODO: Check that this has been updated correctly on the removal of ecml ExecutionContexts
68 
69  if (part == partitionNumber)
70  add( Block (eckit::PathName(fs[1]),
71  eckit::Offset(atol(fs[2].c_str())),
72  eckit::Offset(atol(fs[3].c_str())),
73  atol(fs[4].c_str()),
74  atol(fs[5].c_str())));
75  }
76 }
77 
78 Partition::Partition()
79 : blocks_ (),
80  startOfLastBlock_(0),
81  rowsOnLastBlock_(0)
82 {}
83 
85 : blocks_ (other.blocks_),
86  startOfLastBlock_ (other.startOfLastBlock_),
87  rowsOnLastBlock_ (other.rowsOnLastBlock_)
88 {}
89 
91 {
92  blocks_ = other.blocks_;
95  return *this;
96 }
97 
99 {
100  ullong r (0);
101  for (size_t i(0); i < blocks_.size(); ++i)
102  r += blocks_[i].lastRow - blocks_[i].firstRow;
103  return r;
104 }
105 
107 {
108  ASSERT(blocks_.size());
109  return blocks_.back().lastRow;
110 }
111 
112 void Partition::add(const Block& block)
113 {
114  blocks_.push_back(block);
115 }
116 
117 /*
118 void Partition::add(const PathName& fileName, ullong start, ullong length, ullong seqno, ullong firstRow, ullong nRows)
119 {
120  if (! blocks_.size())
121  {
122  startOfLastBlock_ = Offset(start);
123  rowsOnLastBlock_ = firstRow + nRows;
124  blocks_.push_back(Block(fileName, start, length, firstRow, firstRow + nRows));
125  }
126  else
127  {
128  ASSERT(firstRow == 0); // new file, should start from beginning
129 
130  Block& last (blocks_.back());
131  if (last.fileName != fileName)
132  blocks_.push_back(Block(fileName, start, length, firstRow, firstRow + nRows));
133  else
134  {
135  last.lastRow += nRows;
136 
137  if (startOfLastBlock_ == Offset(start))
138  rowsOnLastBlock_ += nRows;
139  else
140  {
141  startOfLastBlock_ = Offset(start);
142  rowsOnLastBlock_ = firstRow + nRows;
143  last.length += Length(length);
144  }
145  }
146  }
147 }
148 */
149 
150 
151 ullong writeBlock(DataHandle& in, const Block& block, Writer<>::iterator& out)
152 {
153  odc::Reader reader(in);
154  odc::Reader::iterator it (reader.begin()), end (reader.end());
155 
156  ullong rowNumber (0);
157  for (size_t r(0); r < block.lastRow; ++r, ++it)
158  {
159  ASSERT(it != end);
160  if (r >= block.firstRow)
161  {
162  if (out->columns() != it->columns())
163  {
164  out->columns(it->columns());
165  out->writeHeader();
166  }
167 
168  for (size_t fi (0); fi < it->columns().size(); ++fi)
169  (*out)[fi] = (*it)[fi];
170 
171  ++out;
172  ++rowNumber;
173  }
174  }
175  return rowNumber;
176 }
177 
178 ullong Partition::write(DataHandle& dh) const
179 {
180  long long rowNumber (0);
181  const vector<Block>& blocks (blocks_);
182  for (size_t i (0); i < blocks.size(); ++i)
183  {
184  const Block& block (blocks[i]);
185  Log::info() << "Partition::write: writing block " << i << ":" << block << endl;
186  Writer<> writer(&dh, false, false);
187  Writer<>::iterator out (writer.begin(/*openDataHandle*/ false));
188 
189  PartFileHandle fh (block.fileName, block.start, block.end - block.start);
190 
191  Log::info() << "Partition::write: writing PartFileHandle: " << fh << endl;
192 
193  fh.openForRead();
194  ullong nr (writeBlock(fh, block, out));
195  rowNumber += nr;
196  }
197  return rowNumber;
198 
199 }
200 
201 ullong Partition::write(const PathName& fileName) const
202 //{
203 // FileHandle oh (fileName);
204 // oh.openForWrite(Length(1024 * 1024 * 10));
205 // return write(oh);
206 //}
207 
208 //ullong Partition::write(DataHandle& dh) const
209 {
210  long long rowNumber (0);
211  const vector<Block>& blocks (blocks_);
212 
213  { PathName(fileName).unlink(); }
214 
215  for (size_t i (0); i < blocks.size(); ++i)
216  {
217  const Block& block (blocks[i]);
218 
219  Log::info() << "Partition::write: writing block " << i << ":" << block << endl;
220 
221  FileHandle dh (fileName);
222  dh.openForAppend(Length(1024 * 1024 * 10));
223 
224  Writer<> writer(&dh, false, false);
225  Writer<>::iterator out (writer.begin(/*openDataHandle*/ false));
226 
227  PartFileHandle fh (block.fileName, block.start, block.end - block.start);
228  fh.openForRead();
229  ullong nr (writeBlock(fh, block, out));
230  rowNumber += nr;
231  }
232  return rowNumber;
233 }
234 
235 } // namespace odc
236 
eckit::Offset start
Definition: Block.h:73
eckit::PathName fileName
Definition: Block.h:72
ullong firstRow
Definition: Block.h:75
eckit::Offset end
Definition: Block.h:74
ullong lastRow
Definition: Block.h:76
ullong numberOfRowsOnLastBlock() const
Definition: Partition.cc:106
ullong rowsOnLastBlock_
Definition: Partition.h:55
void add(const Block &)
Definition: Partition.cc:112
ullong startOfLastBlock_
Definition: Partition.h:54
std::vector< Block > & blocks()
Definition: Partition.h:38
ullong write(const eckit::PathName &fileName) const
Partition & operator=(const Partition &)
Definition: Partition.cc:90
std::vector< Block > blocks_
Definition: Partition.h:53
ullong numberOfRows() const
Definition: Partition.cc:98
const iterator end() const
Definition: Reader.cc:81
iterator begin()
Definition: Reader.cc:74
void writeHeader()
const core::MetaData & columns() const
Definition: IteratorProxy.h:94
IteratorProxy< ITERATOR, Writer > iterator
Definition: Writer.h:35
Definition: ColumnInfo.h:23
ullong writeBlock(DataHandle &in, const Block &block, Writer<>::iterator &out)
Definition: Partition.cc:151
unsigned long long ullong
Definition: Block.h:27
std::ostream & operator<<(std::ostream &o, const Partition &p)
Definition: Partition.cc:33
Definition: encode.cc:30