IODA Bundle
WriterDispatchingIterator.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
12 
13 #include "eckit/log/Timer.h"
14 #include "eckit/utils/Translator.h"
15 
16 #include "odc/Comparator.h"
17 #include "odc/Reader.h"
18 
19 using namespace odc::core;
20 
21 
22 namespace {
23 // n.b. Duplicated from eckit::sql::expression::function::FunctionEQ::trimStringInDouble.
24 // TODO: Put somewhere better.
25 void trimStringInDouble(char* &p, size_t& len)
26 {
27  len = 0;
28  for(; len < sizeof(double) && isprint(p[len]); ++len)
29  ;
30  for(; len > 0 && isspace(p[len - 1]); --len)
31  ;
32  size_t plen = len;
33  for (char *pp = p; isspace(*p) && p < pp + plen;)
34  {
35  ++p;
36  --len;
37  }
38 }
39 }
40 
41 namespace odc {
42 
43 //----------------------------------------------------------------------------------------------------------------------
44 
45 
46 template <typename WRITE_ITERATOR, typename OWNER>
48 : owner_(owner),
49  iteratorsOwner_(),
50  columns_(0),
51  lastValues_(0),
52  nextRow_(0),
53  columnOffsets_(0),
54  nrows_(0),
55  outputFileTemplate_(owner_.outputFileTemplate()),
56  properties_(),
57  dispatchedIndexes_(),
58  values2iteratorIndex_(),
59  lastDispatch_(maxOpenFiles, -1),
60  iteratorIndex2fileName_(maxOpenFiles),
61  lastDispatchedValues_(),
62  lastIndex_(),
63  initialized_(false),
64  append_(append),
65  refCount_(0),
66  iterators_(),
67  files_(),
68  templateParameters_(),
69  maxOpenFiles_(maxOpenFiles),
70  filesCreated_() {}
71 
72 
73 template <typename WRITE_ITERATOR, typename OWNER>
75 {
76  ASSERT(index < columns().size());
77  Column* col = columns_[index];
78  ASSERT(col);
79 
80  col->name(name);
81  col->type<SameByteOrder>(type);
82  return 0;
83 }
84 
85 template <typename WRITE_ITERATOR, typename OWNER>
86 int WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
87 {
88  ASSERT(index < columns().size());
89  Column* col = columns_[index];
90  ASSERT(col);
91 
92  col->name(name);
93  col->type<SameByteOrder>(type);
94  col->bitfieldDef(b);
95  col->missingValue(0);
96  return 0;
97 }
98 
99 template <typename WRITE_ITERATOR, typename OWNER>
100 std::string WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::generateFileName(const double* values, unsigned long count)
101 {
102  std::string fileName (outputFileTemplate_);
103  int diff (0);
104  for (TemplateParameters::iterator it (templateParameters_.begin()); it != templateParameters_.end(); ++it)
105  {
106  TemplateParameter& p (*(*it));
107 
108  // TODO: if values collected can be of different type then integer,
109  // then below code must be updated:
110  // code updated for std::string [19/07/2011] AF
111  double d (values[p.columnIndex]);
112  std::string s;
113  if ( columns_[p.columnIndex]->type() == api::STRING)
114  {
115  char* sp (reinterpret_cast<char *>(&d));
116  size_t len (0);
117  trimStringInDouble(sp, len);
118  s = std::string(sp, len);
119  while (s.find("/") != std::string::npos)
120  {
121  std::string old (s);
122  size_t pos (s.find("/"));
123  s.replace(pos, pos+1, std::string("__SLASH__"));
124  //eckit::Log::info() << "WriterDispatchingIterator::generateFileName: '" << old << "' => '" << s << "'" << std::endl;
125  }
126  } else
127  {
128  s = eckit::Translator<int, std::string>()(int(d));
129  }
130 
131  fileName.replace(p.startPos - diff, p.endPos - p.startPos + 1, s);
132  diff = outputFileTemplate_.size() - fileName.size();
133  }
134 
135  //eckit::Log::debug() << "WriterDispatchingIterator::generateFileName: fileName = " << fileName << std::endl;
136  return fileName;
137 }
138 
139 template <typename WRITE_ITERATOR, typename OWNER>
140 WRITE_ITERATOR& WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::dispatch(const double* values, unsigned long count)
141 {
142  return *iterators_[this->dispatchIndex(values, count)];
143 }
144 
145 template <typename WRITE_ITERATOR, typename OWNER>
147 {
148  Values dispatchedValues;
149  for (size_t i (0); i < dispatchedIndexes_.size(); ++i)
150  dispatchedValues.push_back(values[dispatchedIndexes_[i]]);
151 
152  if (dispatchedValues == lastDispatchedValues_)
153  return lastIndex_;
154 
155  Values2IteratorIndex::iterator p (values2iteratorIndex_.find(dispatchedValues));
156  size_t iteratorIndex ((p != values2iteratorIndex_.end())
157  ? p->second
158  : createIterator(dispatchedValues, generateFileName(values, count)));
159 
160  lastDispatchedValues_ = dispatchedValues;
161  lastIndex_ = iteratorIndex;
162  lastDispatch_[iteratorIndex] = nrows_;
163  return iteratorIndex;
164 }
165 
166 template <typename WRITE_ITERATOR, typename OWNER>
167 int WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::createIterator(const Values& dispatchedValues, const std::string& fileName)
168 {
169  std::ostream& L(eckit::Log::debug());
170 
171  int iteratorIndex (iterators_.size());
172  if (iterators_.size() >= maxOpenFiles_)
173  {
174  ASSERT(iterators_.size());
175 
176  size_t oldest (0);
177  unsigned long long oldestRow (lastDispatch_[oldest]);
178  for (size_t i = oldest; i < lastDispatch_.size(); ++i)
179  {
180  if (lastDispatch_[i] < oldestRow)
181  {
182  oldestRow = lastDispatch_[i];
183  oldest = i;
184  }
185  }
186  iteratorIndex = oldest;
187 
188  L << "split writer: evicted iterator " << iteratorIndex
189  << "' " << iteratorIndex2fileName_[iteratorIndex] << "' "
190  << " (oldest row: " << oldestRow << "), nrows_=" << nrows_ << std::endl;
191 
192  delete iterators_[iteratorIndex];
193  iterators_[iteratorIndex] = 0;
194 
195  Values2IteratorIndex::iterator vit (values2iteratorIndex_.begin());
196  for (; vit != values2iteratorIndex_.end(); ++vit)
197  if (vit->second == iteratorIndex)
198  break;
199  values2iteratorIndex_.erase(vit);
200  }
201 
202  std::string operation;
203  //bool append = false;
204  if (append_ || !eckit::PathName(fileName).exists())
205  {
206  filesCreated_[fileName] = 1;
207  operation = "creating";
208  }
209  else
210  {
211  if (filesCreated_.find(fileName) == filesCreated_.end())
212  {
213  filesCreated_[fileName] = 1; operation = "overwriting";
214  }
215  else
216  {
217  append_ = true;
218  filesCreated_[fileName]++; operation = "appending";
219  }
220  }
221 
222  L << iteratorIndex << ": " << operation << " '" << fileName << "'" << std::endl;
223 
224  if (iteratorIndex == iterators_.size())
225  {
226  iterators_.push_back(iteratorsOwner_.createWriteIterator(fileName, append_));
227  files_.push_back(fileName);
228  }
229  else
230  {
231  iterators_[iteratorIndex] = iteratorsOwner_.createWriteIterator(fileName, append_);
232  files_[iteratorIndex] = fileName;
233  //ASSERT(files_[iteratorIndex] == fileName);
234  }
235  values2iteratorIndex_[dispatchedValues] = iteratorIndex;
236  iteratorIndex2fileName_[iteratorIndex] = fileName;
237 
238  // Prop. metadata
239  iterators_[iteratorIndex]->columns(columns());
240  iterators_[iteratorIndex]->writeHeader();
241 
242  return iteratorIndex;
243 }
244 
245 template <typename WRITE_ITERATOR, typename OWNER>
247 {
248  std::vector<eckit::PathName> paths;
249  for (std::map<std::string,int>::iterator it (filesCreated_.begin()); it != filesCreated_.end(); ++it)
250  paths.push_back(it->first);
251  return paths;
252 }
253 
254 template <typename WRITE_ITERATOR, typename OWNER>
256 {
257  //eckit::Log::debug() << "WriterDispatchingIterator<WRITE_ITERATOR>::~WriterDispatchingIterator()" << std::endl;
258  delete [] lastValues_;
259  delete [] nextRow_;
260  delete [] columnOffsets_;
261  for (size_t i = 0; i < iterators_.size(); ++i)
262  delete iterators_[i];
263 }
264 
265 template <typename WRITE_ITERATOR, typename OWNER>
266 unsigned long WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::gatherStats(const double* values, unsigned long count)
267 {
268  return dispatch(values, count).gatherStats(values, count);
269 }
270 
271 template <typename WRITE_ITERATOR, typename OWNER>
273 {
274  //eckit::Log::debug() << "WriterDispatchingIterator<WRITE_ITERATOR>::writeHeader" << std::endl;
275 
276  delete [] lastValues_;
277  delete [] nextRow_;
278  delete [] columnOffsets_;
279 
280  int32_t numDoubles = rowDataSizeDoubles();
281  int32_t count = columns().size();
282 
283  lastValues_ = new double [numDoubles];
284  nextRow_ = new double [numDoubles];
285  columnOffsets_ = new size_t[count];
286  ASSERT(lastValues_);
287 
288  size_t offset = 0;
289  for (int i (0); i < count; i++) {
290  nextRow_[i] = lastValues_[i] = columns_[i]->missingValue();
291  columnOffsets_[i] = offset;
292  offset += columns_[i]->dataSizeDoubles();
293  }
294 
295  nrows_ = 0;
296 }
297 
298 template <typename WRITE_ITERATOR, typename OWNER>
300 {
301  return writeRow(nextRow_, columns().size()) == 0;
302 }
303 
304 template <typename WRITE_ITERATOR, typename OWNER>
306 {
307  templateParameters_.reset();
308  TemplateParameters::parse(outputFileTemplate_, templateParameters_, columns());
309  if (templateParameters_.size() == 0)
310  {
311  std::stringstream ss;
312  ss << "No parameters in output file template '" << outputFileTemplate_ << "'" << std::endl;
313  throw eckit::UserError(ss.str());
314  }
315  dispatchedIndexes_.clear();
316  for (size_t i (0); i < templateParameters_.size(); ++i)
317  dispatchedIndexes_.push_back(templateParameters_[i]->columnIndex);
318  initialized_ = true;
319 }
320 
321 template <typename WRITE_ITERATOR, typename OWNER>
323 
324 template <typename WRITE_ITERATOR, typename OWNER>
326  return nextRow_[columnOffsets_[i]];
327 }
328 
329 template <typename WRITE_ITERATOR, typename OWNER>
331 
332  size_t total = 0;
333  for (const auto& column : columns()) {
334  total += column->dataSizeDoubles();
335  }
336  return total;
337 }
338 
339 
340 template <typename WRITE_ITERATOR, typename OWNER>
342 {
343  if (!initialized_)
344  parseTemplateParameters();
345 
346  WRITE_ITERATOR& wi = dispatch(values, count);
347  int rc = wi.writeRow(values, count);
348 
349  if (rc == 0)
350  nrows_++;
351 
352  return rc;
353 }
354 
355 template <typename WRITE_ITERATOR, typename OWNER>
357 
358 /* * /
359 template <typename WRITE_ITERATOR, typename OWNER>
360 int WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::setColumn(size_t index, std::string name, api::ColumnType type)
361 {
362  ASSERT(index < columns().size());
363  Column* col = columns_[index];
364  ASSERT(col);
365 
366  typedef DataStream<SameByteOrder, FastInMemoryDataHandle> DS;
367 
368  col->name(name);
369  col->type<DS>(type, false);
370  //col->hasMissing(hasMissing);
371  //col->missingValue(missingValue);
372 
373  for (typename Iterators::iterator it = iterators_.begin(); it != iterators_.end(); ++it)
374  //(*it)->setColumn(index, name, type, hasMissing, missingValue);
375  (*it)->setColumn(index, name, type);
376 
377  return 0;
378 }
379 
380 template <typename WRITE_ITERATOR, typename OWNER>
381 int WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
382 {
383  //eckit::Log::info() << "WriterDispatchingIterator::setBitfieldColumn: " << std::endl;
384 
385  ASSERT(index < columns().size());
386  Column* col = columns_[index];
387  ASSERT(col);
388 
389  typedef DataStream<SameByteOrder, FastInMemoryDataHandle> DS;
390 
391  col->name(name);
392  col->type<DS>(type, false);
393  //col->hasMissing(hasMissing);
394  //col->missingValue(missingValue);
395  col->bitfieldDef(b);
396 
397  for (typename Iterators::iterator it = iterators_.begin(); it != iterators_.end(); ++it)
398  //(*it)->setBitfieldColumn(index, name, type, b, hasMissing, missingValue);
399  (*it)->setBitfieldColumn(index, name, type, b);
400 
401  return 0;
402 }
403 / *
404 */
405 
406 template <typename WRITE_ITERATOR, typename OWNER>
408 {
409  columns_ = md;
410  return md;
411 }
412 
413 template <typename WRITE_ITERATOR, typename OWNER>
415 {
416  ASSERT(i < columns().size());
417  Column* col (columns_[i]);
418  ASSERT(col);
419 
420  col->missingValue(missingValue);
421 }
422 
423 template <>
424 template <typename T>
426 {
427  if (! (it != end))
428  {
429  eckit::Log::warning() << "Split: No input data." << std::endl;
430  return 0;
431  }
432 
433  // Copy columns from the input iterator.
434  columns(it->columns());
435 
436  if (!initialized_) parseTemplateParameters();
437 
438  size_t maxcols = columns().size();
439  ASSERT(maxcols > 0);
440 
441  eckit::Log::debug() << "WriterDispatchingIterator::pass1<WriterBufferingIterator>: columns().size() => " << maxcols << std::endl;
442 
443  nrows_ = 0;
444  for (; it != end; ++it)
445  {
446  if (it->isNewDataset() && columns() != it->columns() )
447  {
448  columns(it->columns());
449  parseTemplateParameters();
450 
451  for (size_t i = 0; i < iterators_.size(); ++i)
452  {
453  iterators_[i]->flush();
454  iterators_[i]->columns(columns());
455  iterators_[i]->writeHeader();
456  }
457  }
458 
459  const double* data (it->data());
460  size_t size (it->columns().size());
461  int rc (writeRow(data, size));
462  ASSERT(rc == 0);
463  }
464 
465  eckit::Log::debug() << "Split: processed " << nrows_ << " row(s)." << std::endl;
466  return nrows_;
467 }
468 
469 template <>
470 template <typename T>
472  using namespace eckit;
473  using namespace std;
474  Log::info() << "Verifying split..." << endl;
475  Timer timer("Split verification");
476 
477  // Copy columns from the input iterator.
478  columns(it->columns());
479 
480  vector<Reader*> readers;
481  vector<pair<Reader::iterator, Reader::iterator> > iterators;
482  for (size_t i (0); i < files_.size(); ++i) {
483  Reader* reader(new Reader(files_[i]));
484  readers.push_back(reader);
485  iterators.push_back(make_pair(reader->begin(), reader->end()));
486  }
487 
488  vector<size_t> rowsRead(files_.size());
489  Comparator comparator;
490  unsigned long numberOfDifferences (0);
491  long long i (0);
492  for (; it != end; ++i)
493  {
494  if (it->isNewDataset() && columns() != it->columns() )
495  {
496  columns(it->columns());
497  parseTemplateParameters();
498  }
499 
500  size_t fileIndex(dispatchIndex(it->data(), it->columns().size()));
501  const std::string& outFileName (files_[fileIndex]);
502 
503  size_t n(columns().size());
504  typedef Reader::iterator I;
505  std::pair<I, I>& its(iterators[fileIndex]);
506  I& sIt(its.first), sEnd(its.second);
507 
508  const MetaData& sMetaData (sIt->columns());
509  try {
510  ASSERT(sIt != sEnd);
511 
512  // In the parent codec, we will always have the largest data size (e.g. for
513  // the longest string). This will not be true in the dispatched iterators
514  // which will only be large enough for the strings dispatched to them.
515  bool compareDataSizes = false;
516  ASSERT(sMetaData.equals(columns(), compareDataSizes));
517 
518  ++rowsRead[fileIndex];
519  const double* const originalData(it->data());
520  const double* const outputData(sIt->data());
521  comparator.compare(n, originalData, outputData, columns(), sMetaData);
522  } catch (...) {
523  ++numberOfDifferences;
524  Log::info() << "Row " << i << " of input (" << rowsRead[fileIndex] << " of " << outFileName << ") not correct." << endl << endl;
525  }
526  ++it;
527  ++sIt;
528  }
529  Log::info() << "Number of rows: " << i << ". Total number of differences: " << numberOfDifferences << std::endl;
530  ASSERT(! (it != end));
531 
532  for (size_t j = 0; j < readers.size(); ++j)
533  delete readers[j];
534 }
535 
536 template <typename WRITE_ITERATOR, typename OWNER>
538 {
539  // TODO: save property, make sure they are propagated to iterators as they are created
540  properties_[key] = value;
541 }
542 
543 template <typename WRITE_ITERATOR, typename OWNER>
544 std::string WriterDispatchingIterator<WRITE_ITERATOR, OWNER>::property(std::string key) { return properties_[key]; }
545 
546 template <typename WRITE_ITERATOR, typename OWNER>
548 {
549  //eckit::Log::debug() << "WriterDispatchingIterator<WRITE_ITERATOR>::close()" << std::endl;
550  int rc = 0;
551  for (typename Iterators::iterator it = iterators_.begin(); it != iterators_.end(); ++it)
552  {
553  rc |= (*it)->close();
554  delete *it;
555  }
556  iterators_.clear();
557 
558  return rc;
559 }
560 
561 //----------------------------------------------------------------------------------------------------------------------
562 
563 } // namespace odc
564 
static void count(void *counter, const double *data, size_t n)
Definition: UnitTests.cc:531
bool compare(T1 &it1, const T1 &end1, T2 &it2, const T2 &end2, const std::string &desc1, const std::string &desc2)
Definition: Comparator.h:108
const iterator end() const
Definition: Reader.cc:81
iterator begin()
Definition: Reader.cc:74
static TemplateParameters & parse(const std::string &fileNameTemplate, TemplateParameters &, const core::MetaData &=nullMD)
WRITE_ITERATOR & dispatch(const double *values, unsigned long count)
Find iterator data should be dispatched to.
unsigned long gatherStats(const double *values, unsigned long count)
const core::MetaData & columns() const
std::string generateFileName(const double *values, unsigned long count)
std::vector< eckit::PathName > outputFiles()
int writeRow(const double *values, unsigned long count)
WriterDispatchingIterator(OWNER &owner, int maxOpenFiles, bool append=false)
int createIterator(const Values &dispatchedValues, const std::string &fileName)
int setColumn(size_t index, std::string name, api::ColumnType type)
int setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
void verify(T &, const T &)
unsigned long pass1(T &, const T &)
int dispatchIndex(const double *values, unsigned long count)
void property(std::string key, std::string value)
void bitfieldDef(const eckit::sql::BitfieldDef &b)
Definition: Column.h:85
void missingValue(double v)
Definition: Column.h:80
void name(const std::string name)
Definition: Column.h:57
static api::ColumnType type(const std::string &)
Definition: Column.cc:74
bool equals(const MetaData &md, bool compareDataSizes=true) const
Definition: MetaData.cc:216
Definition: ColumnInfo.h:23
Definition: encode.cc:30