IODA Bundle
WriterBufferingIterator.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 ///
12 /// \file WriterBufferingIterator.cc
13 ///
14 /// @author Piotr Kuchta, Feb 2009
15 
16 #include "eckit/exception/Exceptions.h"
17 #include "eckit/io/DataHandle.h"
18 
19 #include "odc/core/Header.h"
21 #include "odc/Writer.h"
22 
23 using namespace eckit;
24 using namespace odc::api;
25 using namespace odc::core;
26 
27 namespace odc {
28 
29 //----------------------------------------------------------------------------------------------------------------------
30 
31 WriterBufferingIterator::WriterBufferingIterator(Owner &owner, DataHandle *dh, bool openDataHandle, const odc::sql::TableDef* tableDef)
32 : refCount_(0),
33  owner_(owner),
34  columns_(0),
35  lastValues_(0),
36  nextRow_(0),
37  columnOffsets_(0),
38  columnByteSizes_(0),
39  nrows_(0),
40  f_(dh),
41  path_(owner.path()),
42  initialisedColumns_(false),
43  properties_(),
44  rowsBuffer_(0),
45  nextRowInBuffer_(0),
46  rowsBufferSize_(owner.rowsBufferSize()),
47  tableDef_(tableDef),
48  openDataHandle_(openDataHandle)
49 {
50  if (openDataHandle)
51  open();
52 }
53 
55 {
56  close();
57  delete [] lastValues_;
58  delete [] nextRow_;
59  delete [] columnOffsets_;
60  delete [] columnByteSizes_;
61  if (! openDataHandle_)
62  delete f_;
63 }
64 
65 unsigned long WriterBufferingIterator::gatherStats(const double* values, unsigned long count)
66 {
67  ASSERT(count == columns().size());
68 
69  //for (size_t i = 0; i < columns_.size(); ++i) Log::info() << "gatherStats: columns_[" << i << "]=" << *columns_[i] << std::endl;
70 
71  for(size_t i = 0; i < count; i++) {
72  columns_[i]->coder().gatherStats(values[columnOffsets_[i]]);
73  }
74 
75  return 0;
76 }
77 
79 {
81 }
82 
84 {
85  delete [] lastValues_;
86  delete [] nextRow_;
87  delete [] columnOffsets_;
88  delete [] columnByteSizes_;
89 
90  // Don't do anything until evenything is initialised (this check is before we do rowDataSizeDoubles
91  // which makes use of the coders).
92 
93  for (const Column* column : columns_) ASSERT(column->hasInitialisedCoder());
94 
95  // Initialise this value
97 
98  // Allocate arrays
99 
100  int32_t numDoubles = rowDataSizeDoubles();
101  int32_t colSize = columns().size();
102 
103  lastValues_ = new double [numDoubles];
104  nextRow_ = new double [numDoubles];
105  columnOffsets_ = new size_t[colSize];
106  columnByteSizes_ = new size_t[colSize];
107  ASSERT(lastValues_);
108 
109  // Initialise data
110 
111  size_t offset = 0;
112  for (int i = 0; i < colSize; ++i) {
113 
114  // If we are trying to do anything before the writer is properly initialised ...
115  ASSERT(columns_[i]->hasInitialisedCoder());
116 
117  nextRow_[i] = lastValues_[i] = columns_[i]->missingValue();
118  columnOffsets_[i] = offset;
119  columnByteSizes_[i] = columns_[i]->dataSizeDoubles() * sizeof(double);
120  offset += columns_[i]->dataSizeDoubles();
121  }
122 
123  nrows_ = 0;
124 }
125 
127 {
129  rowByteSize_ = sizeof(uint16_t) + rowDataSizeDoubles() * sizeof(double);
131  nextRowInBuffer_ = reinterpret_cast<unsigned char*>(rowsBuffer_.data());
132 }
133 
135 {
136  allocBuffers();
137 
138  // If the calculated buffer size now is bigger than the size used for an
139  // existing buffer, then clear it.
140  // n.b. if zero, this is no problem as we allocate the buffer lazily in writeRow
141 
142  if (rowsBuffer_.size() != 0 && rowByteSize_ < (sizeof(uint16_t) + rowDataSizeDoublesInternal()*sizeof(double))) {
144  rowByteSize_ = 0;
145  rowsBuffer_ = eckit::Buffer(0);
146  nextRowInBuffer_ = 0;
147  }
148 
149  for (size_t i = 0; i < columns_.size(); ++i) {
150 
151  // If we haven't configured a row, then this is bad
152  ASSERT(columns_[i]->hasInitialisedCoder());
153 
154  columns_[i]->coder().resetStats();
155  }
156  initialisedColumns_ = true;
157  //for (size_t i = 0; i < columns_.size(); ++i) Log::info() << "writeHeader: columns_[" << i << "]=" << *columns_[i] << std::endl;
158 }
159 
161 {
162  return writeRow(nextRow_, columns().size()) == 0;
163 }
164 
167 {
168  ASSERT(initialisedColumns_);
169  ASSERT(i >= 0 && i < columns().size());
170  return nextRow_[columnOffsets_[i]];
171 }
172 
173 int WriterBufferingIterator::writeRow(const double* data, unsigned long nCols)
174 {
175  ASSERT(nCols == columns().size());
176  ASSERT(initialisedColumns_);
177 
178  if (rowsBuffer_.size() == 0)
179  allocRowsBuffer();
180 
181  gatherStats(data, nCols);
182 
183  std::copy(data, data + rowDataSizeDoubles(), reinterpret_cast<double*>(nextRowInBuffer_ + sizeof(uint16_t)));
185 
186  ASSERT((char*)nextRowInBuffer_ <= rowsBuffer_ + rowsBuffer_.size());
187 
188  if ((char*)nextRowInBuffer_ == rowsBuffer_ + rowsBuffer_.size())
189  flush();
190 
191  return 0;
192 }
193 
195 
196  size_t total = 0;
197  for (const auto& column : columns()) {
198  total += column->dataSizeDoubles();
199  }
200  return total;
201 }
202 
204 {
205  if (lastValues_ == 0) allocBuffers();
206 
207  // Find where the first changing row is
208 
209  // BUG: First row may not be properly encoded if it is zero.
210  uint16_t k = 0;
211  for (; k < columns().size(); ++k) {
212  if (::memcmp(&values[columnOffsets_[k]], &lastValues_[columnOffsets_[k]], columnByteSizes_[k]) != 0) break;
213  }
214 
215  // Marker stores the starting column
216  // static_cast eliminates unecessary warnings due to % operator returning an int.
217 
218  uint8_t marker[2] {
219  static_cast<uint8_t>((k / 256) % 256),
220  static_cast<uint8_t>(k % 256)
221  };
222  stream.writeBytes(marker, sizeof(marker)); // raw write
223 
224  // TODO: Update Codecs to encode to a DataStream directly.
225  // n.b. We are relying on the sizing of the buffer behind stream to have been done correctly.
226  // This is fundamentally unsafe. TODO: Do it properly.
227 
228  char* p = stream.get();
229 
230  for (size_t i = k; i < columns().size(); ++i) {
231  p = columns_[i]->coder().encode(p, values[columnOffsets_[i]]);
232  ::memcpy(&lastValues_[columnOffsets_[i]], &values[columnOffsets_[i]], columnByteSizes_[i]);
233  }
234 
235  stream.set(p);
236 
237  nrows_++;
238  return 0;
239 }
240 
242 {
243  //Log::debug() << "WriterBufferingIterator::open@" << this << ": Opening data handle " << f_ << std::endl;
244  ASSERT(f_);
245 
246  Length estimatedLen = 20 * 1024 * 1024;
247  f_->openForWrite(estimatedLen);
248 
249  return 0;
250 }
251 
252 
254 {
255  //Log::debug() << "WriterBufferingIterator::setColumn: " << std::endl;
256  ASSERT(index < columns().size());
257  Column* col = columns_[index];
258  ASSERT(col);
259 
260  // Ensure that this column is unique!
261  for (size_t i = 0; i < columns_.size(); i++) {
262  if (index != i && columns_[i] != 0) {
263  if (columns_[i]->name() == name) {
264  std::stringstream ss;
265  ss << "Attempting to create multiple columns with the same name: " << name;
266  throw SeriousBug(ss.str(), Here());
267  }
268  }
269  }
270 
271  col->name(name);
272  col->type<SameByteOrder>(type);
273 
274  return 0;
275 }
276 
277 int WriterBufferingIterator::setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
278 {
279  //Log::debug() << "WriterBufferingIterator::setBitfieldColumn: " << std::endl;
280  ASSERT(index < columns().size());
281  Column* col = columns_[index];
282  ASSERT(col);
283 
284  col->name(name);
285  col->type<SameByteOrder>(type);
286  col->bitfieldDef(b);
287  col->missingValue(0);
288  return 0;
289 }
290 
291 void WriterBufferingIterator::missingValue(size_t i, double missingValue)
292 {
293  ASSERT(i < columns().size());
294  Column* col = columns_[i];
295  ASSERT(col);
296 
298 }
299 
301 {
302  ASSERT(initialisedColumns_);
303  if (nextRowInBuffer_ == rowsBuffer_ || rowsBuffer_.size() == 0)
304  return;
305 
307 
308  Buffer encodedBuffer(rowsBuffer_.size());
309  core::DataStream<core::SameByteOrder> encodedStream(encodedBuffer);
310 
311  // Iterate over stored rows, and re-encode them into the encodedBuffer
312 
313  // TODO: Note we can ensure alignment when storing these. Currently the uint16_t ensure non-alignment.
314 
315  size_t rowsWritten = 0;
316  unsigned char* p = reinterpret_cast<unsigned char*>(rowsBuffer_.data());
317  while (p < nextRowInBuffer_) {
318  doWriteRow(encodedStream, reinterpret_cast<double *>(p + sizeof(uint16_t)));
319  p += rowByteSize_;
320  ++rowsWritten;
321  }
322 
323  // Clean up storage buffers for row data
324  allocBuffers();
325 
326  std::pair<Buffer, size_t> encodedHeader = serializeHeader(encodedStream.position(), rowsWritten);
327  ASSERT(encodedHeader.second <= encodedHeader.first.size());
328 
329  Log::debug() << "WriterBufferingIterator::flush: header size: " << encodedHeader.second << std::endl;
330 
331  ASSERT(dataHandle().write(encodedHeader.first, encodedHeader.second) == long(encodedHeader.second)); // Write header
332  ASSERT(dataHandle().write(encodedBuffer, encodedStream.position()) == encodedStream.position()); // Write encoded data
333 
334  Log::debug() << "WriterBufferingIterator::flush: flushed " << rowsWritten << " rows." << std::endl;
335 
336  // Reset the write buffers
337 
338  nextRowInBuffer_ = reinterpret_cast<unsigned char*>(rowsBuffer_.data());
339 
340  // This is a bad place to be. We need to reset the coders in the columns, not clone
341  // the existing ones (which have been optimised).
342 
345 }
346 
347 
348 std::pair<Buffer, size_t> WriterBufferingIterator::serializeHeader(size_t dataSize, size_t rowsNumber) {
349  return core::Header::serializeHeader(dataSize, rowsNumber, properties_, columns());
350 }
351 
353 {
354  if (initialisedColumns_) flush();
355 
356  if (!openDataHandle_ && f_)
357  {
358  f_->close();
359  f_ = 0;
360  }
361  return 0;
362 }
363 
364 std::vector<eckit::PathName> WriterBufferingIterator::outputFiles()
365 {
366  std::vector<eckit::PathName> r;
367  r.push_back(path_);
368  return r;
369 }
370 
371 //----------------------------------------------------------------------------------------------------------------------
372 
373 } // namespace odc
374 
static void count(void *counter, const double *data, size_t n)
Definition: UnitTests.cc:531
int doWriteRow(core::DataStream< core::SameByteOrder > &stream, const double *values)
std::vector< eckit::PathName > outputFiles()
int setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
std::pair< eckit::Buffer, size_t > serializeHeader(size_t dataSize, size_t rowsNumber)
int setColumn(size_t index, std::string name, api::ColumnType type)
unsigned long gatherStats(const double *values, unsigned long count)
int writeRow(const double *values, unsigned long count)
const core::MetaData & columns() const
int setOptimalCodecs(core::MetaData &columns)
void bitfieldDef(const eckit::sql::BitfieldDef &b)
Definition: Column.h:85
void missingValue(double v)
Definition: Column.h:80
void name(const std::string name)
Definition: Column.h:57
static api::ColumnType type(const std::string &)
Definition: Column.cc:74
void writeBytes(const void *addr, size_t bytes)
Definition: DataStream.h:321
void set(char *p)
Definition: DataStream.h:95
eckit::Offset position() const
Definition: DataStream.h:198
static std::pair< eckit::Buffer, size_t > serializeHeader(size_t dataSize, size_t rowsNumber, const Properties &properties, const MetaData &columns)
Definition: Header.cc:204
void resetCodecs()
Definition: MetaData.h:96
IODA_DL void copy(const ObjectSelection &from, ObjectSelection &to, const ScaleMapping &scale_map)
Generic data copying function.
Definition: Copying.cpp:63
Definition: ColumnInfo.h:23