IODA Bundle
api/odc.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2019- ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include <cstdint>
12 #include <functional>
13 #include <numeric>
14 #include <unistd.h>
15 
16 #include "eckit/exception/Exceptions.h"
17 #include "eckit/maths/Functions.h"
18 #include "eckit/runtime/Main.h"
19 #include "eckit/io/MemoryHandle.h"
20 #include "eckit/io/FileDescHandle.h"
21 
22 #include "odc/api/odc.h"
23 #include "odc/api/Odb.h"
24 
25 using namespace odc::api;
26 using namespace eckit;
27 
28 extern "C" {
29 
30 //----------------------------------------------------------------------------------------------------------------------
31 
32 struct odc_reader_t : public Reader {
33  using Reader::Reader;
34 };
35 
36 struct odc_frame_t : public Frame {
37  using Frame::Frame;
38 };
39 
40 struct odc_decoder_t {
41 
42  struct DecodeColumn {
43  void* data;
44  size_t elemSize;
45  size_t stride;
46  bool transpose;
47  };
48 
49  odc_decoder_t() : nrows(0), dataWidth(0), dataHeight(0), externalData(0), columnMajor(false), ownedData() {}
50 
51  size_t nrows;
52  std::vector<std::string> columnNames;
53  std::vector<DecodeColumn> columnData;
54 
55  size_t dataWidth;
56  size_t dataHeight;
57  void* externalData;
59 
60  std::unique_ptr<char> ownedData;
61 };
62 
63 struct odc_encoder_t {
64 
65  odc_encoder_t() : arrayData(0), columnMajorWidth(0), nrows(0), arrayWidth(0), arrayHeight(0), maxRowsPerFrame(10000) {}
66 
67  struct EncodeColumn {
68  const void* data;
69  size_t stride;
70  };
71 
72  const void* arrayData;
74 
75  size_t nrows;
76  size_t arrayWidth;
77  size_t arrayHeight;
79  std::vector<ColumnInfo> columnInfo;
80  std::vector<EncodeColumn> columnData;
81  std::map<std::string, std::string> properties;
82 };
83 
84 
85 //----------------------------------------------------------------------------------------------------------------------
86 
87 /* Error handling */
88 
89 static std::string g_current_error_str;
91 static void* g_failure_handler_context = nullptr;
92 
93 const char* odc_error_string(int err) {
94  switch (err) {
95  case ODC_SUCCESS:
96  return "Success";
99  return g_current_error_str.c_str();
101  return "Iteration complete";
102  default:
103  return "<unknown>";
104  };
105 }
106 
107 } // extern "C"
108 
109 // Template can't have C linkage
110 
111 namespace {
112 
113 // Template magic to provide a consistent error-handling approach
114 
115 int innerWrapFn(std::function<int()> f) {
116  return f();
117 }
118 
119 int innerWrapFn(std::function<void()> f) {
120  f();
121  return ODC_SUCCESS;
122 }
123 
124 template <typename FN>
126 
127  try {
128  return innerWrapFn(f);
129  } catch (Exception& e) {
130  Log::error() << "Caught exception on C-C++ API boundary: " << e.what() << std::endl;
131  g_current_error_str = e.what();
132  if (g_failure_handler) {
134  }
136  } catch (std::exception& e) {
137  Log::error() << "Caught exception on C-C++ API boundary: " << e.what() << std::endl;
138  g_current_error_str = e.what();
139  if (g_failure_handler) {
141  }
143  } catch (...) {
144  Log::error() << "Caught unknown on C-C++ API boundary" << std::endl;
145  g_current_error_str = "Unrecognised and unknown exception";
146  if (g_failure_handler) {
148  }
150  }
151 
152  ASSERT(false);
153 }
154 
155 }
156 
157 extern "C" {
158 
159 //----------------------------------------------------------------------------------------------------------------------
160 
161 // Types for lookup
162 
164  return wrapApiFunction([count] {
165  (*count) = NUM_TYPES;
166  });
167 }
168 
169 int odc_column_type_name(int type, const char** type_name) {
170  return wrapApiFunction([type, type_name] {
171 
172  static const char* names[] = {
179  };
180 
181  if (type < 0 || size_t(type) >= sizeof(names)/sizeof(names[0])) {
182  std::stringstream ss;
183  ss << "Unknown type id: " << type;
184  throw UserError(ss.str(), Here());
185  }
186 
187  *type_name = names[type];
188  });
189 }
190 
191 //----------------------------------------------------------------------------------------------------------------------
192 
193 // TODO: In a sensible, templated, way catch all exceptions.
194 // --> We ought to have a standardised error return process.
195 
196 
197 /*
198  * Initialise API
199  * @note This is only required if being used from a context where Main()
200  * is not otherwise initialised
201 */
202 
204  return wrapApiFunction([] {
205  static bool initialised = false;
206 
207  if (initialised) {
208  Log::warning() << "Initialising ODC library twice" << std::endl;
209  }
210 
211  if (!initialised) {
212  const char* argv[2] = {"odc-api", 0};
213  Main::initialise(1, const_cast<char**>(argv));
214  initialised = true;
215  }
216 
217  ASSERT(ODC_IGNORE == static_cast<int>(IGNORE));
218  ASSERT(ODC_INTEGER == static_cast<int>(INTEGER));
219  ASSERT(ODC_REAL == static_cast<int>(REAL));
220  ASSERT(ODC_STRING == static_cast<int>(STRING));
221  ASSERT(ODC_BITFIELD == static_cast<int>(BITFIELD));
222  ASSERT(ODC_DOUBLE == static_cast<int>(DOUBLE));
223  });
224 }
225 
226 int odc_integer_behaviour(int integerBehaviour) {
227  return wrapApiFunction([integerBehaviour] {
228  if (integerBehaviour != ODC_INTEGERS_AS_DOUBLES && integerBehaviour != ODC_INTEGERS_AS_LONGS) {
229  throw SeriousBug("ODC integer behaviour must be either ODC_INTEGERS_AS_DOUBLES or ODC_INTEGERS_AS_LONGS", Here());
230  }
232  });
233 }
234 
235 int odc_set_failure_handler(odc_failure_handler_t handler, void* context) {
236  return wrapApiFunction([handler, context] {
237  g_failure_handler = handler;
238  g_failure_handler_context = context;
239  eckit::Log::info() << "ODC setting failure handler fn." << std::endl;
240  });
241 }
242 
244  return wrapApiFunction([missing_value] {
245  ASSERT(missing_value);
246  (*missing_value) = Settings::integerMissingValue();
247  });
248 }
249 
251  return wrapApiFunction([missing_value] {
252  ASSERT(missing_value);
253  (*missing_value) = Settings::doubleMissingValue();
254  });
255 }
256 
257 int odc_set_missing_integer(long missing_integer) {
258  return wrapApiFunction([missing_integer] {
259  Settings::setIntegerMissingValue(missing_integer);
260  });
261 }
262 
263 int odc_set_missing_double(double missing_double) {
264  return wrapApiFunction([missing_double] {
265  Settings::setDoubleMissingValue(missing_double);
266  });
267 }
268 
269 int odc_version(const char** version) {
270  return wrapApiFunction([version]{
271  (*version) = Settings::version().c_str();
272  });
273 }
274 
275 int odc_vcs_version(const char** sha1) {
276  return wrapApiFunction([sha1]{
277  (*sha1) = Settings::gitsha1().c_str();
278  });
279 }
280 
281 //----------------------------------------------------------------------------------------------------------------------
282 
283 /* Basic READ objects */
284 
285 int odc_open_path(odc_reader_t** reader, const char* filename) {
286  return wrapApiFunction([reader, filename] {
287 // ASSERT(!(*reader));
288  (*reader) = new odc_reader_t {filename};
289  });
290 }
291 
292 int odc_open_file_descriptor(odc_reader_t** reader, int fd) {
293  return wrapApiFunction([reader, fd] {
294  // Take a copy of the file descriptor. This allows us to decouple the life of this
295  // from the life of the caller
296  int fd2 = dup(fd);
297  if (fd == -1) throw CantOpenFile("dup() failed on supplied file descriptor", Here());
298  (*reader) = new odc_reader_t {new FileDescHandle(fd2, true)};
299  });
300 }
301 
302 int odc_open_buffer(odc_reader_t** reader, const void* data, long length) {
303  return wrapApiFunction([reader, data, length] {
304  (*reader) = new odc_reader_t {new MemoryHandle(data, length)};
305  });
306 
307 }
308 
309 int odc_open_stream(odc_reader_t** reader, void* handle, odc_stream_read_t stream_proc) {
310 
311  // Wrap the stream in a DataHandle
312  struct ReadStreamDataHandle : public eckit::DataHandle {
313  ReadStreamDataHandle(void* handle, odc_stream_read_t fn) : handle_(handle), fn_(fn) {}
314  virtual ~ReadStreamDataHandle() {}
315  void print(std::ostream& s) const override { s << "StreamReadHandle(" << fn_ << "(" << handle_ << "))"; }
316  Length openForRead() override { return 0; }
317  void openForWrite(const Length&) override { NOTIMP; }
318  void openForAppend(const Length&) override { NOTIMP; }
319  long read(void* buffer, long length) override { return fn_(handle_, buffer, length); }
320  long write(const void*, long) override { NOTIMP; }
321  void close() override {}
322 
323  void* handle_;
324  odc_stream_read_t fn_;
325  };
326 
327  return wrapApiFunction([reader, handle, stream_proc] {
328  (*reader) = new odc_reader_t(new ReadStreamDataHandle(handle, stream_proc));
329  });
330 }
331 
332 int odc_close(const odc_reader_t* reader) {
333  return wrapApiFunction([reader]{
334  ASSERT(reader);
335  delete reader;
336  });
337 }
338 
339 //----------------------------------------------------------------------------------------------------------------------
340 
341 /*
342  * Table handling
343  */
344 
345 int odc_new_frame(odc_frame_t** frame, odc_reader_t* reader) {
346  return wrapApiFunction([frame, reader] {
347  ASSERT(reader);
348  (*frame) = new odc_frame_t {*reader};
349  });
350 }
351 
352 int odc_free_frame(const odc_frame_t* frame) {
353  return wrapApiFunction([frame] {
354  ASSERT(frame);
355  delete frame;
356  });
357 }
358 
360  return wrapApiFunction(std::function<int()> {[frame] {
361  ASSERT(frame);
362  if (frame->next(false)) {
363  return ODC_SUCCESS;
364  } else {
365  return ODC_ITERATION_COMPLETE;
366  }
367  }});
368 }
369 
370 int odc_next_frame_aggregated(odc_frame_t* frame, long maximum_rows) {
371  return wrapApiFunction(std::function<int()> {[frame, maximum_rows] {
372  ASSERT(frame);
373  if (frame->next(true, maximum_rows)) {
374  return ODC_SUCCESS;
375  } else {
376  return ODC_ITERATION_COMPLETE;
377  }
378  }});
379 }
380 
382  return wrapApiFunction([source_frame, copy] {
383  ASSERT(source_frame);
384  (*copy) = new odc_frame_t {*source_frame};
385  });
386 }
387 
388 int odc_frame_row_count(const odc_frame_t* frame, long* count) {
389  return wrapApiFunction([frame, count] {
390  ASSERT(frame);
391  ASSERT(count);
392  (*count) = frame->rowCount();
393  });
394 }
395 
396 int odc_frame_column_count(const odc_frame_t* frame, int* count) {
397  return wrapApiFunction([frame, count] {
398  ASSERT(frame);
399  ASSERT(count);
400  (*count) = frame->columnCount();
401  });
402 }
403 
405  int col,
406  const char** name,
407  int* type,
408  int* element_size,
409  int* bitfield_count) {
410  return wrapApiFunction([frame, col, name, type, element_size, bitfield_count] {
411  ASSERT(frame);
412  const auto& ci(frame->columnInfo());
413  ASSERT(col >= 0 && size_t(col) < ci.size());
414  const auto& colInfo(ci[col]);
415 
416  if (name) (*name) = colInfo.name.c_str();
417  if (type) (*type) = colInfo.type;
418  if (element_size) (*element_size) = colInfo.decodedSize;
419  if (bitfield_count) (*bitfield_count) = colInfo.bitfield.size();
420  });
421 }
422 
423 int odc_frame_bitfield_attributes(const odc_frame_t* frame, int col, int field, const char** name, int* offset, int* size) {
424  return wrapApiFunction([frame, col, field, name, offset, size] {
425  ASSERT(frame);
426  const auto& ci(frame->columnInfo());
427  ASSERT(col >= 0 && size_t(col) < ci.size());
428  const auto& colInfo(ci[col]);
429  ASSERT(field >= 0 && size_t(field) < colInfo.bitfield.size());
430 
431  if (name) (*name) = colInfo.bitfield[field].name.c_str();
432  if (offset) (*offset) = colInfo.bitfield[field].offset;
433  if (size) (*size) = colInfo.bitfield[field].size;
434  });
435 }
436 
437 //----------------------------------------------------------------------------------------------------------------------
438 
439 /* Decode functionality */
440 
442  return wrapApiFunction([decoder] {
443  (*decoder) = new odc_decoder_t;
444  });
445 }
446 
447 int odc_free_decoder(const odc_decoder_t* decoder) {
448  return wrapApiFunction([decoder] {
449  delete decoder;
450  });
451 }
452 
454  return wrapApiFunction([decoder, frame] {
455 
456  ASSERT(decoder);
457  ASSERT(frame);
458 
459  size_t nrows = frame->rowCount();
460  size_t ncols = frame->columnCount();
461 
462  decoder->nrows = nrows;
463 
464  // Fill in column details
465 
466  for (size_t col = 0; col < ncols; ++col) {
467  odc_decoder_add_column(decoder, frame->columnInfo()[col].name.c_str());
468  }
469  });
470 }
471 
472 int odc_decoder_set_column_major(odc_decoder_t* decoder, bool columnMajor) {
473  return wrapApiFunction([decoder, columnMajor] {
474  ASSERT(decoder);
475  decoder->columnMajor = columnMajor;
476  });
477 }
478 
479 int odc_decoder_set_row_count(odc_decoder_t* decoder, long nrows) {
480  return wrapApiFunction([decoder, nrows] {
481  ASSERT(decoder);
482  decoder->nrows = nrows;
483  });
484 }
485 
486 int odc_decoder_row_count(const odc_decoder_t* decoder, long* nrows) {
487  return wrapApiFunction([decoder, nrows] {
488  ASSERT(decoder);
489  ASSERT(nrows);
490  (*nrows) = decoder->nrows;
491  });
492 }
493 
494 int odc_decoder_set_data_array(odc_decoder_t* decoder, void* buffer, long width, long height, bool columnMajor) {
495  return wrapApiFunction([decoder, buffer, width, height, columnMajor] {
496  ASSERT(decoder);
497  ASSERT(buffer);
498  ASSERT(width > 0);
499  ASSERT(height > 0);
500  decoder->externalData = buffer;
501  decoder->dataWidth = width;
502  decoder->dataHeight = height;
503  decoder->columnMajor = columnMajor;
504  decoder->nrows = height;
505  });
506 }
507 
508 int odc_decoder_data_array(const odc_decoder_t* decoder, const void** data, long* width, long* height, bool* columnMajor) {
509  return wrapApiFunction([decoder, data, width, height, columnMajor] {
510  ASSERT(decoder);
511 
512  if (data) {
513  if (decoder->ownedData) {
514  (*data) = decoder->ownedData.get();
515  } else {
516  ASSERT(decoder->externalData);
517  (*data) = decoder->externalData;
518  }
519  }
520 
521  if (width) (*width) = decoder->dataWidth;
522  if (height) (*height) = decoder->dataHeight;
523  if (columnMajor) (*columnMajor) = decoder->columnMajor;
524  });
525 }
526 
527 int odc_decoder_add_column(odc_decoder_t* decoder, const char* name) {
528  return wrapApiFunction([decoder, name] {
529  ASSERT(decoder);
530  ASSERT(name);
531  decoder->columnNames.emplace_back(name);
532  decoder->columnData.emplace_back(odc_decoder_t::DecodeColumn {0, 0, 0, false});
533  });
534 }
535 
536 int odc_decoder_column_count(const odc_decoder_t* decoder, int* count) {
537  return wrapApiFunction([decoder, count] {
538  ASSERT(decoder);
539  ASSERT(count);
540  (*count) = decoder->columnNames.size();
541  });
542 }
543 
544 int odc_decoder_column_set_data_size(odc_decoder_t* decoder, int col, int element_size) {
545  return wrapApiFunction([decoder, col, element_size] {
546  ASSERT(decoder);
547  ASSERT(col >= 0 && size_t(col) < decoder->columnData.size());
548 
549  auto& cd(decoder->columnData[col]);
550  cd.elemSize = element_size;
551  });
552 }
553 
554 int odc_decoder_column_set_data_array(odc_decoder_t* decoder, int col, int element_size, int stride, void* data) {
555  return wrapApiFunction([decoder, col, element_size, stride, data] {
556  ASSERT(decoder);
557  ASSERT(col >= 0 && size_t(col) < decoder->columnData.size());
558 
559  auto& cd(decoder->columnData[col]);
560  cd.elemSize = element_size;
561  cd.stride = stride;
562  cd.data = data;
563  });
564 }
565 
566 int odc_decoder_column_data_array(const odc_decoder_t* decoder, int col, int* element_size, int* stride, const void** data) {
567  return wrapApiFunction([decoder, col, element_size, stride, data] {
568  ASSERT(decoder);
569  ASSERT(col >= 0 && size_t(col) < decoder->columnData.size());
570 
571  auto& cd(decoder->columnData[col]);
572  if (element_size) (*element_size) = cd.elemSize;
573  if (stride) (*stride) = cd.stride;
574  if (data) (*data) = cd.data;
575  });
576 }
577 
578 static void fill_in_decoder(odc_decoder_t* decoder, const odc_frame_t* frame) {
579 
580  if (decoder->nrows == 0) {
581  decoder->nrows = frame->rowCount();
582  }
583 
584  size_t height = decoder->nrows; // in rows
585  size_t width = 0; // in bytes
586  std::vector<std::pair<size_t, long>> offsets;
587 
588  for (size_t i = 0; i < decoder->columnData.size(); ++i) {
589  odc_decoder_t::DecodeColumn& col(decoder->columnData[i]);
590 
591  if (col.elemSize == 0) {
592  if (col.data) {
593  col.elemSize = sizeof(double); // backwards compatible default
594  } else {
595  const std::string& colName(decoder->columnNames[i]);
596  auto it = std::find_if(frame->columnInfo().begin(), frame->columnInfo().end(),
597  [&colName](const ColumnInfo& ci) { return ci.name == colName; });
598  ASSERT(it != frame->columnInfo().end());
599  ASSERT(it->decodedSize > 0);
600  ASSERT(it->decodedSize % sizeof(double) == 0);
601  col.elemSize = it->decodedSize;
602  }
603  width += col.elemSize;
604  }
605  }
606 
607  if (width != 0 && height != 0) {
608  void* dataPtr;
609  if (decoder->externalData) {
610  decoder->ownedData.reset();
611  dataPtr = decoder->externalData;
612  ASSERT(width <= decoder->dataWidth);
613  ASSERT(height <= decoder->dataHeight);
614  width = decoder->dataWidth;
615  height = decoder->dataHeight;
616  } else {
617  decoder->ownedData.reset(new char[width * height]);
618  decoder->dataWidth = width;
619  decoder->dataHeight = height;
620  dataPtr = decoder->ownedData.get();
621  }
622 
623  size_t offset = 0;
624  for (auto& col : decoder->columnData) {
625  if (!col.data) {
626  col.data = static_cast<char*>(dataPtr) + offset;
627  if (decoder->columnMajor) {
628  col.stride = col.elemSize;
629  offset += height * col.elemSize;
630  if (col.elemSize > 8) col.transpose = true;
631  } else {
632  col.stride = width;
633  offset += col.elemSize;
634  }
635  } else {
636  if (col.stride == 0) col.stride = col.elemSize;
637  }
638  }
639  }
640 }
641 
642 
643 int odc_decode_threaded(odc_decoder_t* decoder, const odc_frame_t* frame, long* rows_decoded, int nthreads) {
644  return wrapApiFunction([decoder, frame, rows_decoded, nthreads] {
645 
646  ASSERT(decoder);
647  ASSERT(frame);
648 
649  // Sanity checking
650 
651  size_t frame_rows = frame->rowCount();
652  size_t frame_cols = frame->columnCount();
653 
654  ASSERT(decoder->columnData.size() == decoder->columnNames.size());
655  ASSERT(decoder->columnNames.size() <= frame_cols);
656  ASSERT(decoder->nrows >= frame_rows);
657 
658  // Fill in and allocate decode target as required
659 
660  fill_in_decoder(decoder, frame);
661 
662  // Store of column index/temporary data for column-major columns wider than 8-bytes that
663  // need to be transposed (i.e. part of an output array).
664  std::vector<std::pair<size_t, std::unique_ptr<double[]>>> temporaryTransposeData;
665 
666  // Construct C++ API adapter
667 
668  std::vector<StridedData> dataFacade;
669  dataFacade.reserve(decoder->columnNames.size());
670 
671  for (size_t i = 0; i < decoder->columnData.size(); ++i) {
672  const auto& col = decoder->columnData[i];
673 
674  void* data = col.data;
675  if (col.transpose) {
676  ASSERT(col.elemSize % sizeof(double) == 0);
677  ASSERT(col.stride == col.elemSize);
678  size_t cols = col.elemSize / sizeof(double);
679  temporaryTransposeData.emplace_back(i, std::unique_ptr<double[]>(new double[frame_rows * cols]));
680  data = temporaryTransposeData.back().second.get();
681  }
682  dataFacade.emplace_back(StridedData{data, size_t(decoder->nrows), size_t(col.elemSize), size_t(col.stride)});
683  }
684 
685  Decoder target(decoder->columnNames, dataFacade);
686 
687  // Do the decoder
688 
689  ASSERT(nthreads >= 1);
690  frame->decode(target, static_cast<size_t>(nthreads));
691 
692  // For the cases where needed, reorder the data
693 
694  for (const auto& kv : temporaryTransposeData) {
695  size_t colIndex = kv.first;
696  const double* tmpArray = kv.second.get();
697  double* output = static_cast<double*>(decoder->columnData[colIndex].data);
698  size_t rows = decoder->nrows;
699  size_t cols = decoder->columnData[colIndex].elemSize / sizeof(double);
700  for (size_t row = 0; row < frame_rows; row++) {
701  for (size_t col = 0; col < cols; col++) {
702  output[row + (col * rows)] = tmpArray[col + (row * cols)];
703  }
704  }
705  }
706 
707  // And return the values
708 
709 // decoder->nrows = frame_rows;
710  if (rows_decoded) *(rows_decoded) = frame_rows;
711  });
712 }
713 
714 int odc_decode(odc_decoder_t* decoder, const odc_frame_t* frame, long* rows_decoded) {
715  return odc_decode_threaded(decoder, frame, rows_decoded, 1);
716 }
717 
718 //----------------------------------------------------------------------------------------------------------------------
719 
720 /* Encode functionality */
721 
723  return wrapApiFunction([encoder] {
724  (*encoder) = new odc_encoder_t;
725  });
726 }
727 
728 int odc_free_encoder(const odc_encoder_t* encoder) {
729  return wrapApiFunction([encoder] {
730  delete encoder;
731  });
732 }
733 
734 int odc_encoder_add_property(odc_encoder_t* encoder, const char* key, const char* value) {
735  return wrapApiFunction([encoder, key, value] {
736  ASSERT(encoder);
737  encoder->properties[key] = value;
738  });
739 }
740 
741 int odc_encoder_set_row_count(odc_encoder_t* encoder, long nrows) {
742  return wrapApiFunction([encoder, nrows] {
743  ASSERT(encoder);
744  encoder->nrows = nrows;
745  });
746 }
747 
748 int odc_encoder_set_rows_per_frame(odc_encoder_t* encoder, long rows_per_frame) {
749  return wrapApiFunction([encoder, rows_per_frame] {
750  ASSERT(encoder);
751  encoder->maxRowsPerFrame = rows_per_frame;
752  });
753 }
754 
755 int odc_encoder_set_data_array(odc_encoder_t* encoder, const void* data, long width, long height, int columnMajorWidth) {
756  return wrapApiFunction([encoder, data, width, height, columnMajorWidth] {
757  ASSERT(encoder);
758 
759  // If we are setting this _again_ then this is because a configured encoder
760  // is being reused. Make sure that we rezero things that should be rezeroed.
761 
762  if (encoder->arrayData != 0) {
763  for (auto& col : encoder->columnData) {
764  col.data = 0;
765  col.stride = 0;
766  }
767  }
768 
769  ASSERT(columnMajorWidth % 8 == 0);
770  encoder->arrayData = data;
771  encoder->arrayWidth = width;
772  encoder->arrayHeight = height;
773  encoder->columnMajorWidth = columnMajorWidth;
774  if (encoder->nrows == 0) encoder->nrows = height;
775  });
776 }
777 
778 int odc_encoder_add_column(odc_encoder_t* encoder, const char* name, int type) {
779  return wrapApiFunction([encoder, name, type] {
780  ASSERT(encoder);
781  encoder->columnInfo.emplace_back(ColumnInfo{std::string(name), ColumnType(type)});
782  encoder->columnData.emplace_back(odc_encoder_t::EncodeColumn {0, 0});
783  });
784 }
785 
786 int odc_encoder_column_set_data_size(odc_encoder_t* encoder, int col, int element_size) {
787  return wrapApiFunction([encoder, col, element_size] {
788  ASSERT(encoder);
789  ASSERT(col >= 0 && size_t(col) < encoder->columnInfo.size());
790  ASSERT(element_size >= 0 && element_size % 8 == 0);
791  encoder->columnInfo[col].decodedSize = element_size;
792  });
793 }
794 
795 int odc_encoder_column_set_data_array(odc_encoder_t* encoder, int col, int element_size, int stride, const void* data) {
796  return wrapApiFunction([encoder, col, element_size, stride, data] {
797  ASSERT(encoder);
798  ASSERT(col >= 0 && size_t(col) < encoder->columnInfo.size());
799  ASSERT(element_size >= 0 && element_size % 8 == 0);
800  ASSERT(stride >= 0 && stride % 8 == 0);
801  encoder->columnInfo[col].decodedSize = element_size;
802  encoder->columnData[col].stride = stride;
803  encoder->columnData[col].data = data;
804  });
805 }
806 
807 int odc_encoder_column_add_bitfield(odc_encoder_t* encoder, int col, const char* name, int nbits) {
808  return wrapApiFunction([encoder, col, name, nbits] {
809  ASSERT(encoder);
810  ASSERT(col >= 0 && size_t(col) < encoder->columnInfo.size());
811 
812  size_t offset;
813  ColumnInfo& ci(encoder->columnInfo[col]);
814  if (ci.bitfield.size() == 0) {
815  offset = 0;
816  } else {
817  offset = ci.bitfield.back().offset + ci.bitfield.back().size;
818  }
819 
820  ASSERT(offset + nbits <= 32);
821  ci.bitfield.emplace_back(ColumnInfo::Bit {name, nbits, int(offset)});
822  });
823 }
824 
825 void fill_in_encoder(odc_encoder_t* encoder, std::vector<std::unique_ptr<char[]>>& transposedData) {
826 
827  ASSERT(encoder->nrows > 0);
828  ASSERT(encoder->columnData.size() == encoder->columnInfo.size());
829 
830  size_t decodedWidth = 0;
831  for (auto& info : encoder->columnInfo) {
832  if (info.decodedSize == 0) info.decodedSize = 8;
833  decodedWidth += info.decodedSize;
834  }
835 
836  if (encoder->arrayData) {
837 
838  /// We are encoding from an externally supplied array
839 
840  // Some sanity checks
841 
842  if (encoder->nrows > encoder->arrayHeight) {
843  std::stringstream ss;
844  ss << "Expected " << encoder->nrows << " rows, but array only contains " << encoder->arrayHeight;
845  throw SeriousBug(ss.str(), Here());
846  }
847 
848  if (decodedWidth > encoder->arrayWidth) {
849  std::stringstream ss;
850  ss << "Expected array width " << decodedWidth << " bytes, but array only contains " << encoder->arrayWidth;
851  throw SeriousBug(ss.str(), Here());
852  }
853 
854  // We are constructing the encoder from a 2D array of data.
855 
856  // n.b. There is a complication with arrays in column major order. There must be a column size
857  // (i.e. double). If the data size is greater than this, the data becomes non-contiguous
858  // and split across multiple columns. Need to aggregate it for encoding.
859 
860  size_t offset = 0;
861  for (size_t i = 0; i < encoder->columnData.size(); ++i) {
863  const ColumnInfo& info(encoder->columnInfo[i]);
864 
865  ASSERT(info.decodedSize != 0);
866  ASSERT(info.decodedSize % sizeof(double) == 0);
867  ASSERT(c.data == 0);
868  ASSERT(c.stride == 0);
869 
870  c.data = static_cast<const char*>(encoder->arrayData) + offset;
871 
872  if (encoder->columnMajorWidth != 0) {
873  c.stride = info.decodedSize;
874  if (long(info.decodedSize) != encoder->columnMajorWidth) {
875  // Transpose data for contiguous elementsn
876  size_t widthInElems = info.decodedSize / encoder->columnMajorWidth;
877 
878  transposedData.push_back(std::unique_ptr<char[]>(new char[widthInElems * encoder->columnMajorWidth * encoder->arrayHeight]));
879  const char* psrc = reinterpret_cast<const char*>(c.data);
880  char* ptgt = transposedData.back().get();
881  for (size_t y = 0; y < encoder->arrayHeight; ++y) {
882  for (size_t x = 0; x < widthInElems; ++x) {
883  ::memcpy(&ptgt[(x + (y * widthInElems)) * encoder->columnMajorWidth],
884  &psrc[(y + (x * encoder->arrayHeight)) * encoder->columnMajorWidth],
885  encoder->columnMajorWidth);
886  }
887  }
888  c.data = ptgt;
889  }
890  offset += info.decodedSize * encoder->arrayHeight;
891  } else {
892  c.stride = encoder->arrayWidth;
893  offset += info.decodedSize;
894  }
895 
896  // sanity checks
897  ASSERT(c.stride % sizeof(double) == 0);
898  }
899  } else {
900 
901  // We are constructing the data with a supplied structure per column
902 
903  for (size_t i = 0; i < encoder->columnData.size(); ++i) {
905  const ColumnInfo& info(encoder->columnInfo[i]);
906 
907  // If no stride is supplied for a column, assume that it is densely packed
908  ASSERT(info.decodedSize != 0);
909  ASSERT(c.data);
910  if (c.stride == 0) {
911  c.stride = info.decodedSize;
912  }
913 
914  // sanity checks
915  ASSERT(c.stride % sizeof(double) == 0);
916  ASSERT(info.decodedSize % sizeof(double) == 0);
917  }
918  }
919 }
920 
921 void odc_encode_to_data_handle(odc_encoder_t* encoder, eckit::DataHandle& dh) {
922 
923  ASSERT(encoder);
924  ASSERT(encoder->nrows > 0);
925  ASSERT(encoder->columnData.size() == encoder->columnInfo.size());
926  ASSERT(encoder->maxRowsPerFrame > 0);
927 
928  // TransposedData allows column-oriented data to be reordered before encoding if needed.
929  std::vector<std::unique_ptr<char[]>> transposedData;
930  fill_in_encoder(encoder, transposedData);
931 
932  size_t ncolumns = encoder->columnData.size();
933  ASSERT(ncolumns > 0);
934 
935  std::vector<ConstStridedData> stridedData;
936  stridedData.reserve(ncolumns);
937 
938 
939  for (size_t i = 0; i < ncolumns; i++) {
940  const ColumnInfo& info{encoder->columnInfo[i]};
941  const odc_encoder_t::EncodeColumn& c{encoder->columnData[i]};
942  stridedData.emplace_back(ConstStridedData {c.data, encoder->nrows, info.decodedSize, c.stride});
943  }
944 
945  ::odc::api::encode(dh, encoder->columnInfo, stridedData, encoder->properties, encoder->maxRowsPerFrame);
946 }
947 
948 
949 int odc_encode_to_stream(odc_encoder_t* encoder, void* handle, odc_stream_write_t write_fn, long* bytes_encoded) {
950 
951  // Wrap the stream in a DataHandle
952  struct WriteStreamDataHandle : public eckit::DataHandle {
953  WriteStreamDataHandle(void* handle, odc_stream_write_t fn) : handle_(handle), fn_(fn), pos_(0) {}
954  virtual ~WriteStreamDataHandle() {}
955  void print(std::ostream& s) const override { s << "StreamReadHandle(" << fn_ << "(" << handle_ << "))"; }
956  Length openForRead() override { NOTIMP; }
957  void openForWrite(const Length&) override {}
958  void openForAppend(const Length&) override {}
959  long read(void*, long) override { NOTIMP; }
960  long write(const void* buffer, long length) override {
961  long written = fn_(handle_, buffer, length);
962  pos_ += written;
963  return written;
964  }
965  Offset position() override { return pos_; }
966  void close() override {}
967 
968  void* handle_;
969  odc_stream_write_t fn_;
970  Offset pos_;
971  };
972 
973  return wrapApiFunction([encoder, handle, write_fn, bytes_encoded] {
974  WriteStreamDataHandle dh(handle, write_fn);
975  dh.openForWrite(0);
976  AutoClose closer(dh);
977  odc_encode_to_data_handle(encoder, dh);
978  (*bytes_encoded) = dh.position();
979  });
980 }
981 
982 int odc_encode_to_file_descriptor(odc_encoder_t* encoder, int fd, long* bytes_encoded) {
983  return wrapApiFunction([encoder, fd, bytes_encoded] {
984  FileDescHandle dh(fd);
985  dh.openForWrite(0);
986  AutoClose closer(dh);
987  odc_encode_to_data_handle(encoder, dh);
988  if (bytes_encoded) (*bytes_encoded) = dh.position();
989  });
990 }
991 
992 int odc_encode_to_buffer(odc_encoder_t* encoder, void* buffer, long length, long* bytes_encoded) {
993  return wrapApiFunction([encoder, buffer, length, bytes_encoded] {
994  MemoryHandle dh(buffer, length);
995  dh.openForWrite(0);
996  AutoClose closer(dh);
997  odc_encode_to_data_handle(encoder, dh);
998  if (bytes_encoded) (*bytes_encoded) = dh.position();
999  });
1000 }
1001 
1002 //----------------------------------------------------------------------------------------------------------------------
1003 
1004 } // extern "C"
static void count(void *counter, const double *data, size_t n)
Definition: UnitTests.cc:531
int odc_encoder_add_column(odc_encoder_t *encoder, const char *name, int type)
Definition: api/odc.cc:778
int odc_free_encoder(const odc_encoder_t *encoder)
Definition: api/odc.cc:728
static void fill_in_decoder(odc_decoder_t *decoder, const odc_frame_t *frame)
Definition: api/odc.cc:578
int odc_set_missing_integer(long missing_integer)
Definition: api/odc.cc:257
int odc_initialise_api()
Definition: api/odc.cc:203
int odc_decoder_defaults_from_frame(odc_decoder_t *decoder, const odc_frame_t *frame)
Definition: api/odc.cc:453
int odc_frame_column_count(const odc_frame_t *frame, int *count)
Definition: api/odc.cc:396
int odc_encoder_add_property(odc_encoder_t *encoder, const char *key, const char *value)
Definition: api/odc.cc:734
int odc_encode_to_stream(odc_encoder_t *encoder, void *handle, odc_stream_write_t write_fn, long *bytes_encoded)
Definition: api/odc.cc:949
int odc_encoder_column_set_data_size(odc_encoder_t *encoder, int col, int element_size)
Definition: api/odc.cc:786
int odc_decoder_column_set_data_size(odc_decoder_t *decoder, int col, int element_size)
Definition: api/odc.cc:544
int odc_encoder_set_row_count(odc_encoder_t *encoder, long nrows)
Definition: api/odc.cc:741
int odc_open_file_descriptor(odc_reader_t **reader, int fd)
Definition: api/odc.cc:292
static odc_failure_handler_t g_failure_handler
Definition: api/odc.cc:90
int odc_column_type_count(int *count)
Definition: api/odc.cc:163
int odc_decoder_column_count(const odc_decoder_t *decoder, int *count)
Definition: api/odc.cc:536
int odc_missing_integer(long *missing_value)
Definition: api/odc.cc:243
int odc_free_frame(const odc_frame_t *frame)
Definition: api/odc.cc:352
int odc_next_frame(odc_frame_t *frame)
Definition: api/odc.cc:359
int odc_decoder_set_data_array(odc_decoder_t *decoder, void *buffer, long width, long height, bool columnMajor)
Definition: api/odc.cc:494
int odc_decode(odc_decoder_t *decoder, const odc_frame_t *frame, long *rows_decoded)
Definition: api/odc.cc:714
int odc_encoder_set_data_array(odc_encoder_t *encoder, const void *data, long width, long height, int columnMajorWidth)
Definition: api/odc.cc:755
int odc_encoder_column_add_bitfield(odc_encoder_t *encoder, int col, const char *name, int nbits)
Definition: api/odc.cc:807
int odc_decode_threaded(odc_decoder_t *decoder, const odc_frame_t *frame, long *rows_decoded, int nthreads)
Definition: api/odc.cc:643
int odc_open_buffer(odc_reader_t **reader, const void *data, long length)
Definition: api/odc.cc:302
int odc_set_missing_double(double missing_double)
Definition: api/odc.cc:263
int odc_decoder_row_count(const odc_decoder_t *decoder, long *nrows)
Definition: api/odc.cc:486
int odc_frame_column_attributes(const odc_frame_t *frame, int col, const char **name, int *type, int *element_size, int *bitfield_count)
Definition: api/odc.cc:404
void odc_encode_to_data_handle(odc_encoder_t *encoder, eckit::DataHandle &dh)
Definition: api/odc.cc:921
int odc_missing_double(double *missing_value)
Definition: api/odc.cc:250
int odc_decoder_add_column(odc_decoder_t *decoder, const char *name)
Definition: api/odc.cc:527
int odc_encode_to_file_descriptor(odc_encoder_t *encoder, int fd, long *bytes_encoded)
Definition: api/odc.cc:982
int odc_copy_frame(odc_frame_t *source_frame, odc_frame_t **copy)
Definition: api/odc.cc:381
int odc_open_stream(odc_reader_t **reader, void *handle, odc_stream_read_t stream_proc)
Definition: api/odc.cc:309
int odc_integer_behaviour(int integerBehaviour)
Definition: api/odc.cc:226
int odc_decoder_data_array(const odc_decoder_t *decoder, const void **data, long *width, long *height, bool *columnMajor)
Definition: api/odc.cc:508
int odc_new_encoder(odc_encoder_t **encoder)
Definition: api/odc.cc:722
int odc_open_path(odc_reader_t **reader, const char *filename)
Definition: api/odc.cc:285
int odc_decoder_set_column_major(odc_decoder_t *decoder, bool columnMajor)
Definition: api/odc.cc:472
int odc_new_decoder(odc_decoder_t **decoder)
Definition: api/odc.cc:441
int odc_decoder_column_set_data_array(odc_decoder_t *decoder, int col, int element_size, int stride, void *data)
Definition: api/odc.cc:554
int odc_decoder_column_data_array(const odc_decoder_t *decoder, int col, int *element_size, int *stride, const void **data)
Definition: api/odc.cc:566
int odc_encoder_column_set_data_array(odc_encoder_t *encoder, int col, int element_size, int stride, const void *data)
Definition: api/odc.cc:795
int odc_frame_bitfield_attributes(const odc_frame_t *frame, int col, int field, const char **name, int *offset, int *size)
Definition: api/odc.cc:423
int odc_frame_row_count(const odc_frame_t *frame, long *count)
Definition: api/odc.cc:388
int odc_new_frame(odc_frame_t **frame, odc_reader_t *reader)
Definition: api/odc.cc:345
int odc_close(const odc_reader_t *reader)
Definition: api/odc.cc:332
static void * g_failure_handler_context
Definition: api/odc.cc:91
int odc_encoder_set_rows_per_frame(odc_encoder_t *encoder, long rows_per_frame)
Definition: api/odc.cc:748
int odc_encode_to_buffer(odc_encoder_t *encoder, void *buffer, long length, long *bytes_encoded)
Definition: api/odc.cc:992
void fill_in_encoder(odc_encoder_t *encoder, std::vector< std::unique_ptr< char[]>> &transposedData)
Definition: api/odc.cc:825
static std::string g_current_error_str
Definition: api/odc.cc:89
int odc_next_frame_aggregated(odc_frame_t *frame, long maximum_rows)
Definition: api/odc.cc:370
int odc_decoder_set_row_count(odc_decoder_t *decoder, long nrows)
Definition: api/odc.cc:479
int odc_free_decoder(const odc_decoder_t *decoder)
Definition: api/odc.cc:447
const std::vector< ColumnInfo > & columnInfo() const
Definition: Odb.cc:326
Frame(Reader &reader)
Definition: Odb.cc:303
bool next(bool aggregated=true, long rowlimit=-1)
Definition: Odb.cc:311
void decode(Decoder &target, size_t nthreads) const
Definition: Odb.cc:331
size_t rowCount() const
Definition: Odb.cc:316
size_t columnCount() const
Definition: Odb.cc:321
Reader(const std::string &path)
Definition: Odb.cc:81
static void setDoubleMissingValue(double val)
Definition: Odb.cc:351
static void treatIntegersAsDoubles(bool flag)
Definition: Odb.cc:343
static const std::string & version()
Definition: Odb.cc:355
static void setIntegerMissingValue(long val)
Definition: Odb.cc:347
static long integerMissingValue()
Definition: Odb.cc:365
static const std::string & gitsha1()
Definition: Odb.cc:360
static double doubleMissingValue()
Definition: Odb.cc:369
int innerWrapFn(std::function< void()> f)
Definition: api/odc.cc:119
IODA_DL void copy(const ObjectSelection &from, ObjectSelection &to, const ScaleMapping &scale_map)
Generic data copying function.
Definition: Copying.cpp:63
void encode(DataHandle &out, const std::vector< ColumnInfo > &columns, const std::vector< ConstStridedData > &data, const std::map< std::string, std::string > &properties, size_t maxRowsPerFrame)
Definition: Odb.cc:400
constexpr int NUM_TYPES
Definition: ColumnType.h:31
@ BITFIELD
Definition: ColumnType.h:27
character(:) function, allocatable, target, public odc_error_string(err)
Definition: odc.f90:620
integer function, public odc_version(version_str)
Definition: odc.f90:595
integer(c_long) function write_fn(context, buffer, length)
Definition: odc.f90:1038
integer function, public odc_vcs_version(git_sha1)
Definition: odc.f90:603
integer function, public odc_set_failure_handler(handler, context)
Definition: odc.f90:586
integer function, public odc_column_type_name(type, type_name)
Definition: odc.f90:611
long(* odc_stream_read_t)(void *context, void *buffer, long length)
struct odc_encoder_t odc_encoder_t
const int ODC_INTEGERS_AS_LONGS
@ ODC_ERROR_GENERAL_EXCEPTION
@ ODC_ITERATION_COMPLETE
@ ODC_SUCCESS
@ ODC_ERROR_UNKNOWN_EXCEPTION
long(* odc_stream_write_t)(void *context, const void *buffer, long length)
const int ODC_INTEGERS_AS_DOUBLES
void(* odc_failure_handler_t)(void *context, int error_code)
struct odc_decoder_t odc_decoder_t
@ ODC_INTEGER
@ ODC_IGNORE
@ ODC_REAL
@ ODC_BITFIELD
@ ODC_STRING
@ ODC_DOUBLE
struct odc_reader_t odc_reader_t
std::vector< Bit > bitfield
Definition: ColumnInfo.h:39
std::vector< DecodeColumn > columnData
Definition: api/odc.cc:53
void * externalData
Definition: api/odc.cc:57
size_t nrows
Definition: api/odc.cc:51
std::unique_ptr< char > ownedData
Definition: api/odc.cc:60
bool columnMajor
Definition: api/odc.cc:58
size_t dataHeight
Definition: api/odc.cc:56
std::vector< std::string > columnNames
Definition: api/odc.cc:52
size_t dataWidth
Definition: api/odc.cc:55
size_t arrayWidth
Definition: api/odc.cc:76
size_t arrayHeight
Definition: api/odc.cc:77
std::vector< EncodeColumn > columnData
Definition: api/odc.cc:80
std::vector< ColumnInfo > columnInfo
Definition: api/odc.cc:79
size_t maxRowsPerFrame
Definition: api/odc.cc:78
const void * arrayData
Definition: api/odc.cc:72
size_t nrows
Definition: api/odc.cc:75
int columnMajorWidth
Definition: api/odc.cc:73
std::map< std::string, std::string > properties
Definition: api/odc.cc:81