13 #include "eckit/log/Timer.h"
14 #include "eckit/utils/Translator.h"
28 for(; len <
sizeof(double) && isprint(p[len]); ++len)
30 for(; len > 0 && isspace(p[len - 1]); --len)
33 for (
char *pp = p; isspace(*p) && p < pp + plen;)
46 template <
typename WRITE_ITERATOR,
typename OWNER>
55 outputFileTemplate_(owner_.outputFileTemplate()),
58 values2iteratorIndex_(),
59 lastDispatch_(maxOpenFiles, -1),
60 iteratorIndex2fileName_(maxOpenFiles),
61 lastDispatchedValues_(),
68 templateParameters_(),
69 maxOpenFiles_(maxOpenFiles),
73 template <
typename WRITE_ITERATOR,
typename OWNER>
76 ASSERT(index <
columns().size());
77 Column* col = columns_[index];
85 template <
typename WRITE_ITERATOR,
typename OWNER>
88 ASSERT(index <
columns().size());
89 Column* col = columns_[index];
99 template <
typename WRITE_ITERATOR,
typename OWNER>
102 std::string fileName (outputFileTemplate_);
104 for (TemplateParameters::iterator it (templateParameters_.begin()); it != templateParameters_.end(); ++it)
115 char* sp (
reinterpret_cast<char *
>(&
d));
118 s = std::string(sp, len);
119 while (s.find(
"/") != std::string::npos)
122 size_t pos (s.find(
"/"));
123 s.replace(pos, pos+1, std::string(
"__SLASH__"));
128 s = eckit::Translator<int, std::string>()(
int(
d));
132 diff = outputFileTemplate_.size() - fileName.size();
139 template <
typename WRITE_ITERATOR,
typename OWNER>
142 return *iterators_[this->dispatchIndex(values,
count)];
145 template <
typename WRITE_ITERATOR,
typename OWNER>
149 for (
size_t i (0);
i < dispatchedIndexes_.size(); ++
i)
150 dispatchedValues.push_back(values[dispatchedIndexes_[
i]]);
152 if (dispatchedValues == lastDispatchedValues_)
155 Values2IteratorIndex::iterator p (values2iteratorIndex_.find(dispatchedValues));
156 size_t iteratorIndex ((p != values2iteratorIndex_.end())
158 : createIterator(dispatchedValues, generateFileName(values,
count)));
160 lastDispatchedValues_ = dispatchedValues;
161 lastIndex_ = iteratorIndex;
162 lastDispatch_[iteratorIndex] = nrows_;
163 return iteratorIndex;
166 template <
typename WRITE_ITERATOR,
typename OWNER>
169 std::ostream& L(eckit::Log::debug());
171 int iteratorIndex (iterators_.size());
172 if (iterators_.size() >= maxOpenFiles_)
174 ASSERT(iterators_.size());
177 unsigned long long oldestRow (lastDispatch_[oldest]);
178 for (
size_t i = oldest;
i < lastDispatch_.size(); ++
i)
180 if (lastDispatch_[
i] < oldestRow)
182 oldestRow = lastDispatch_[
i];
186 iteratorIndex = oldest;
188 L <<
"split writer: evicted iterator " << iteratorIndex
189 <<
"' " << iteratorIndex2fileName_[iteratorIndex] <<
"' "
190 <<
" (oldest row: " << oldestRow <<
"), nrows_=" << nrows_ << std::endl;
192 delete iterators_[iteratorIndex];
193 iterators_[iteratorIndex] = 0;
195 Values2IteratorIndex::iterator vit (values2iteratorIndex_.begin());
196 for (; vit != values2iteratorIndex_.end(); ++vit)
197 if (vit->second == iteratorIndex)
199 values2iteratorIndex_.erase(vit);
202 std::string operation;
204 if (append_ || !eckit::PathName(fileName).exists())
206 filesCreated_[fileName] = 1;
207 operation =
"creating";
211 if (filesCreated_.find(fileName) == filesCreated_.end())
213 filesCreated_[fileName] = 1; operation =
"overwriting";
218 filesCreated_[fileName]++; operation =
"appending";
222 L << iteratorIndex <<
": " << operation <<
" '" << fileName <<
"'" << std::endl;
224 if (iteratorIndex == iterators_.size())
226 iterators_.push_back(iteratorsOwner_.createWriteIterator(fileName, append_));
227 files_.push_back(fileName);
231 iterators_[iteratorIndex] = iteratorsOwner_.createWriteIterator(fileName, append_);
232 files_[iteratorIndex] = fileName;
235 values2iteratorIndex_[dispatchedValues] = iteratorIndex;
236 iteratorIndex2fileName_[iteratorIndex] = fileName;
239 iterators_[iteratorIndex]->columns(
columns());
240 iterators_[iteratorIndex]->writeHeader();
242 return iteratorIndex;
245 template <
typename WRITE_ITERATOR,
typename OWNER>
248 std::vector<eckit::PathName> paths;
249 for (std::map<std::string,int>::iterator it (filesCreated_.begin()); it != filesCreated_.end(); ++it)
250 paths.push_back(it->first);
254 template <
typename WRITE_ITERATOR,
typename OWNER>
258 delete [] lastValues_;
260 delete [] columnOffsets_;
261 for (
size_t i = 0;
i < iterators_.size(); ++
i)
262 delete iterators_[
i];
265 template <
typename WRITE_ITERATOR,
typename OWNER>
268 return dispatch(values,
count).gatherStats(values,
count);
271 template <
typename WRITE_ITERATOR,
typename OWNER>
276 delete [] lastValues_;
278 delete [] columnOffsets_;
280 int32_t numDoubles = rowDataSizeDoubles();
283 lastValues_ =
new double [numDoubles];
284 nextRow_ =
new double [numDoubles];
285 columnOffsets_ =
new size_t[
count];
290 nextRow_[
i] = lastValues_[
i] = columns_[
i]->missingValue();
291 columnOffsets_[
i] = offset;
292 offset += columns_[
i]->dataSizeDoubles();
298 template <
typename WRITE_ITERATOR,
typename OWNER>
301 return writeRow(nextRow_,
columns().size()) == 0;
304 template <
typename WRITE_ITERATOR,
typename OWNER>
307 templateParameters_.reset();
309 if (templateParameters_.size() == 0)
311 std::stringstream ss;
312 ss <<
"No parameters in output file template '" << outputFileTemplate_ <<
"'" << std::endl;
313 throw eckit::UserError(ss.str());
315 dispatchedIndexes_.clear();
316 for (
size_t i (0);
i < templateParameters_.size(); ++
i)
317 dispatchedIndexes_.push_back(templateParameters_[
i]->columnIndex);
321 template <
typename WRITE_ITERATOR,
typename OWNER>
324 template <
typename WRITE_ITERATOR,
typename OWNER>
326 return nextRow_[columnOffsets_[
i]];
329 template <
typename WRITE_ITERATOR,
typename OWNER>
333 for (
const auto& column :
columns()) {
334 total += column->dataSizeDoubles();
340 template <
typename WRITE_ITERATOR,
typename OWNER>
344 parseTemplateParameters();
346 WRITE_ITERATOR& wi = dispatch(values,
count);
347 int rc = wi.writeRow(values,
count);
355 template <
typename WRITE_ITERATOR,
typename OWNER>
406 template <
typename WRITE_ITERATOR,
typename OWNER>
413 template <
typename WRITE_ITERATOR,
typename OWNER>
424 template <
typename T>
429 eckit::Log::warning() <<
"Split: No input data." << std::endl;
436 if (!initialized_) parseTemplateParameters();
438 size_t maxcols =
columns().size();
441 eckit::Log::debug() <<
"WriterDispatchingIterator::pass1<WriterBufferingIterator>: columns().size() => " << maxcols << std::endl;
444 for (; it != end; ++it)
446 if (it->isNewDataset() &&
columns() != it->columns() )
449 parseTemplateParameters();
451 for (
size_t i = 0;
i < iterators_.size(); ++
i)
453 iterators_[
i]->flush();
455 iterators_[
i]->writeHeader();
459 const double* data (it->data());
460 size_t size (it->columns().size());
461 int rc (writeRow(data, size));
465 eckit::Log::debug() <<
"Split: processed " << nrows_ <<
" row(s)." << std::endl;
470 template <
typename T>
472 using namespace eckit;
474 Log::info() <<
"Verifying split..." << endl;
475 Timer timer(
"Split verification");
480 vector<Reader*> readers;
481 vector<pair<Reader::iterator, Reader::iterator> > iterators;
482 for (
size_t i (0);
i < files_.size(); ++
i) {
484 readers.push_back(reader);
485 iterators.push_back(make_pair(reader->
begin(), reader->
end()));
488 vector<size_t> rowsRead(files_.size());
490 unsigned long numberOfDifferences (0);
492 for (; it != end; ++
i)
494 if (it->isNewDataset() &&
columns() != it->columns() )
497 parseTemplateParameters();
500 size_t fileIndex(dispatchIndex(it->data(), it->columns().size()));
501 const std::string& outFileName (files_[fileIndex]);
505 std::pair<I, I>& its(iterators[fileIndex]);
506 I& sIt(its.first), sEnd(its.second);
508 const MetaData& sMetaData (sIt->columns());
515 bool compareDataSizes =
false;
518 ++rowsRead[fileIndex];
519 const double*
const originalData(it->data());
520 const double*
const outputData(sIt->data());
521 comparator.
compare(n, originalData, outputData,
columns(), sMetaData);
523 ++numberOfDifferences;
524 Log::info() <<
"Row " <<
i <<
" of input (" << rowsRead[fileIndex] <<
" of " << outFileName <<
") not correct." << endl << endl;
529 Log::info() <<
"Number of rows: " <<
i <<
". Total number of differences: " << numberOfDifferences << std::endl;
530 ASSERT(! (it != end));
532 for (
size_t j = 0; j < readers.size(); ++j)
536 template <
typename WRITE_ITERATOR,
typename OWNER>
540 properties_[
key] = value;
543 template <
typename WRITE_ITERATOR,
typename OWNER>
546 template <
typename WRITE_ITERATOR,
typename OWNER>
551 for (
typename Iterators::iterator it = iterators_.begin(); it != iterators_.end(); ++it)
553 rc |= (*it)->close();
static void count(void *counter, const double *data, size_t n)
bool compare(T1 &it1, const T1 &end1, T2 &it2, const T2 &end2, const std::string &desc1, const std::string &desc2)
const iterator end() const
static TemplateParameters & parse(const std::string &fileNameTemplate, TemplateParameters &, const core::MetaData &=nullMD)
size_t rowDataSizeDoubles() const
WRITE_ITERATOR & dispatch(const double *values, unsigned long count)
Find iterator data should be dispatched to.
unsigned long gatherStats(const double *values, unsigned long count)
void missingValue(size_t i, double)
const core::MetaData & columns() const
std::string generateFileName(const double *values, unsigned long count)
std::vector< eckit::PathName > outputFiles()
int writeRow(const double *values, unsigned long count)
WriterDispatchingIterator(OWNER &owner, int maxOpenFiles, bool append=false)
int createIterator(const Values &dispatchedValues, const std::string &fileName)
std::vector< double > Values
int setColumn(size_t index, std::string name, api::ColumnType type)
int setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
void verify(T &, const T &)
unsigned long pass1(T &, const T &)
int dispatchIndex(const double *values, unsigned long count)
void property(std::string key, std::string value)
~WriterDispatchingIterator()
void parseTemplateParameters()
void bitfieldDef(const eckit::sql::BitfieldDef &b)
void missingValue(double v)
void name(const std::string name)
static api::ColumnType type(const std::string &)
void trimStringInDouble(char *&p, size_t &len)