15 #include "eckit/filesystem/PathName.h"
16 #include "eckit/io/FileHandle.h"
17 #include "eckit/io/Length.h"
18 #include "eckit/io/Offset.h"
19 #include "eckit/io/PartFileHandle.h"
20 #include "eckit/utils/StringTools.h"
28 using namespace eckit;
35 o <<
" number of blocks: " << p.
blocks_.size() <<
" ";
42 std::ostream& Partition::save(std::ostream& o,
size_t poolNumber)
const
44 for (
size_t i (0);
i < blocks_.size(); ++
i)
45 o << poolNumber <<
"\t" << blocks_[
i] << endl;
49 Partition::Partition(
const PathName& fileName,
size_t partitionNumber)
51 Log::info() <<
"Partition::Partition: read info on partition " << partitionNumber <<
" from " << fileName << endl;
52 if (! fileName.exists())
53 throw UserError(
string(fileName) +
" does not exist");
55 std::ifstream
f (
string(fileName).c_str());
58 while (std::getline(
f, line))
60 Log::info() <<
"Partition::Partition: line: " << line << endl;
62 vector<string> fs (StringTools::split(
"\t", line));
63 ASSERT(fs.size() == 2);
65 size_t part (atol(fs[0].c_str()));
69 if (part == partitionNumber)
70 add(
Block (eckit::PathName(fs[1]),
71 eckit::Offset(atol(fs[2].c_str())),
72 eckit::Offset(atol(fs[3].c_str())),
74 atol(fs[5].c_str())));
78 Partition::Partition()
85 : blocks_ (other.blocks_),
86 startOfLastBlock_ (other.startOfLastBlock_),
87 rowsOnLastBlock_ (other.rowsOnLastBlock_)
157 for (
size_t r(0); r < block.
lastRow; ++r, ++it)
162 if (out->
columns() != it->columns())
168 for (
size_t fi (0); fi < it->columns().size(); ++fi)
169 (*out)[fi] = (*it)[fi];
180 long long rowNumber (0);
182 for (
size_t i (0);
i <
blocks.size(); ++
i)
185 Log::info() <<
"Partition::write: writing block " <<
i <<
":" << block << endl;
191 Log::info() <<
"Partition::write: writing PartFileHandle: " << fh << endl;
210 long long rowNumber (0);
211 const vector<Block>& blocks (blocks_);
213 { PathName(fileName).unlink(); }
215 for (
size_t i (0);
i < blocks.size(); ++
i)
217 const Block& block (blocks[
i]);
219 Log::info() <<
"Partition::write: writing block " <<
i <<
":" << block << endl;
221 FileHandle dh (fileName);
222 dh.openForAppend(Length(1024 * 1024 * 10));
224 Writer<>
writer(&dh,
false,
false);
227 PartFileHandle fh (block.fileName, block.start, block.end - block.start);
ullong numberOfRowsOnLastBlock() const
std::vector< Block > & blocks()
ullong write(const eckit::PathName &fileName) const
Partition & operator=(const Partition &)
std::vector< Block > blocks_
ullong numberOfRows() const
const iterator end() const
const core::MetaData & columns() const
IteratorProxy< ITERATOR, Writer > iterator
ullong writeBlock(DataHandle &in, const Block &block, Writer<>::iterator &out)
unsigned long long ullong
std::ostream & operator<<(std::ostream &o, const Partition &p)