IODA Bundle
WriterBufferingIterator.h
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 ///
12 /// \file WriterBufferingIterator.h
13 ///
14 /// @author Piotr Kuchta, August 2009
15 
16 #ifndef odc_WriterBufferingIterator_H
17 #define odc_WriterBufferingIterator_H
18 
19 #include "eckit/filesystem/PathName.h"
20 
22 #include "odc/IteratorProxy.h"
23 
24 namespace eckit { class PathName; }
25 namespace eckit { class DataHandle; }
26 
27 namespace odc {
28 
29 //----------------------------------------------------------------------------------------------------------------------
30 
31 template <typename I> class Writer;
32 namespace sql { class TableDef; }
33 
35 {
36 public:
38 
39  //WriterBufferingIterator (Owner &owner, eckit::DataHandle *, bool openDataHandle=true);
40  WriterBufferingIterator (Owner &owner, eckit::DataHandle *, bool openDataHandle, const odc::sql::TableDef* tableDef=0);
41 
43 
44  int open();
45 
46  double* data();
47  double& data(size_t i);
48 
49  int setColumn(size_t index, std::string name, api::ColumnType type);
50  int setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b);
51 
52  void missingValue(size_t i, double);
53 
54  template <typename T> unsigned long pass1(T&, const T&);
55  unsigned long gatherStats(const double* values, unsigned long count);
56 
57  int close();
58 
59  const core::MetaData& columns() const { return columns_; }
61  columns_ = md;
62  for (auto& col : columns_) col->resetCodec<core::SameByteOrder>(); // We want to use the default codecs for encoding
64  return columns_;
65  }
66 
67  void setNumberOfColumns(size_t n) { columns_.setSize(n); }
68 
69  Owner& owner() { return owner_; }
70 
71  eckit::DataHandle& dataHandle() { return *f_; }
72 
73  void property(std::string key, std::string value) { properties_[key] = value; }
74 
75  /// The offset of a given column in the doubles[] data array
76  size_t dataOffset(size_t i) const { ASSERT(columnOffsets_); return columnOffsets_[i]; }
77 
78 //protected:
79 
80  int setOptimalCodecs();
81 
82  void writeHeader();
83 
84  int writeRow(const double* values, unsigned long count);
85 
86  // Get the number of doubles per row.
87  size_t rowDataSizeDoubles() const { return rowDataSizeDoubles_; }
88 
89  size_t rowsBufferSize() { return rowsBufferSize_; }
90  void rowsBufferSize(size_t n) { rowsBufferSize_ = n; }
91 
92  void flush();
93 
94  std::vector<eckit::PathName> outputFiles();
95  bool next();
96 
97 private:
98  size_t rowDataSizeDoublesInternal() const;
99 public:
101 protected:
104  double* lastValues_;
105  double* nextRow_;
106  size_t* columnOffsets_; // in doubles
108  unsigned long long nrows_;
109 
110  eckit::DataHandle* f_;
111  eckit::PathName path_;
112 
113 private:
114 // No copy allowed.
117 
118  template <typename T> void pass1init(T&, const T&);
119 
120  std::pair<eckit::Buffer, size_t> serializeHeader(size_t dataSize, size_t rowsNumber);
121 
122  void allocBuffers();
123  void allocRowsBuffer();
125 
126  int doWriteRow(core::DataStream<core::SameByteOrder>& stream, const double* values);
127 
130 
131  eckit::Buffer rowsBuffer_;
132  unsigned char* nextRowInBuffer_;
133 
136  size_t rowByteSize_;
137 
139 
140  const odc::sql::TableDef* tableDef_;
141 
142 private:
144 
146 };
147 
148 template<typename T>
149 void WriterBufferingIterator::pass1init(T& it, const T& end)
150 {
151  eckit::Log::debug() << "WriterBufferingIterator::pass1init" << std::endl;
152 
153  // Copy columns from the input iterator.
154  columns(it->columns());
155 
158 
159  size_t nCols = it->columns().size();
160  ASSERT(nCols > 0);
161 
162  allocRowsBuffer();
163 }
164 
165 template<typename T>
166 unsigned long WriterBufferingIterator::pass1(T& it, const T& end)
167 {
168  eckit::Log::debug() << "WriterBufferingIterator::pass1" << std::endl;
169 
170  pass1init(it, end);
171  writeHeader();
172 
173  unsigned long nrows = 0;
174  for ( ; it != end; ++it, ++nrows)
175  {
176  if (it->isNewDataset() && it->columns() != columns())
177  {
178  eckit::Log::debug() << "WriterBufferingIterator::pass1: Change of input metadata." << std::endl;
179  flush();
180  pass1init(it, end);
181  writeHeader();
182  }
183 
184  writeRow(it->data(), it->columns().size());
185  }
186 
187  eckit::Log::debug() << "Flushing rest of the buffer..." << std::endl;
188  flush();
189 
190  eckit::Log::debug() << "WriterBufferingIterator::pass1: processed " << nrows << " row(s)." << std::endl;
191  ASSERT(close() == 0);
192  return nrows;
193 }
194 
195 //----------------------------------------------------------------------------------------------------------------------
196 
197 } // namespace odc
198 
199 #endif
static void count(void *counter, const double *data, size_t n)
Definition: UnitTests.cc:531
int doWriteRow(core::DataStream< core::SameByteOrder > &stream, const double *values)
void property(std::string key, std::string value)
std::vector< eckit::PathName > outputFiles()
unsigned long pass1(T &, const T &)
int setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
const odc::sql::TableDef * tableDef_
std::pair< eckit::Buffer, size_t > serializeHeader(size_t dataSize, size_t rowsNumber)
WriterBufferingIterator & operator=(const WriterBufferingIterator &)
const core::MetaData & columns(const core::MetaData &md)
int setColumn(size_t index, std::string name, api::ColumnType type)
WriterBufferingIterator(Owner &owner, eckit::DataHandle *, bool openDataHandle, const odc::sql::TableDef *tableDef=0)
unsigned long gatherStats(const double *values, unsigned long count)
Writer< WriterBufferingIterator > Owner
int writeRow(const double *values, unsigned long count)
WriterBufferingIterator(const WriterBufferingIterator &)
const core::MetaData & columns() const
size_t dataOffset(size_t i) const
The offset of a given column in the doubles[] data array.
void setSize(size_t)
Definition: MetaData.cc:64
void resetCodecs()
Definition: MetaData.h:96
bool allColumnsInitialised() const
Definition: MetaData.cc:284
std::map< std::string, std::string > Properties
Definition: Header.h:35
Definition: ColumnInfo.h:23