16 #include "eckit/exception/Exceptions.h"
17 #include "eckit/maths/Functions.h"
18 #include "eckit/runtime/Main.h"
19 #include "eckit/io/MemoryHandle.h"
20 #include "eckit/io/FileDescHandle.h"
26 using namespace eckit;
49 odc_decoder_t() : nrows(0), dataWidth(0), dataHeight(0), externalData(0), columnMajor(false), ownedData() {}
65 odc_encoder_t() : arrayData(0), columnMajorWidth(0), nrows(0), arrayWidth(0), arrayHeight(0), maxRowsPerFrame(10000) {}
101 return "Iteration complete";
124 template <
typename FN>
129 }
catch (Exception& e) {
130 Log::error() <<
"Caught exception on C-C++ API boundary: " << e.what() << std::endl;
136 }
catch (std::exception& e) {
137 Log::error() <<
"Caught exception on C-C++ API boundary: " << e.what() << std::endl;
144 Log::error() <<
"Caught unknown on C-C++ API boundary" << std::endl;
172 static const char* names[] = {
181 if (
type < 0 ||
size_t(
type) >=
sizeof(names)/
sizeof(names[0])) {
182 std::stringstream ss;
183 ss <<
"Unknown type id: " <<
type;
184 throw UserError(ss.str(), Here());
187 *type_name = names[
type];
205 static bool initialised =
false;
208 Log::warning() <<
"Initialising ODC library twice" << std::endl;
212 const char* argv[2] = {
"odc-api", 0};
213 Main::initialise(1,
const_cast<char**
>(argv));
229 throw SeriousBug(
"ODC integer behaviour must be either ODC_INTEGERS_AS_DOUBLES or ODC_INTEGERS_AS_LONGS", Here());
239 eckit::Log::info() <<
"ODC setting failure handler fn." << std::endl;
297 if (fd == -1)
throw CantOpenFile(
"dup() failed on supplied file descriptor", Here());
298 (*reader) =
new odc_reader_t {
new FileDescHandle(fd2,
true)};
304 (*reader) =
new odc_reader_t {
new MemoryHandle(data, length)};
312 struct ReadStreamDataHandle :
public eckit::DataHandle {
313 ReadStreamDataHandle(
void* handle,
odc_stream_read_t fn) : handle_(handle), fn_(fn) {}
314 virtual ~ReadStreamDataHandle() {}
315 void print(std::ostream& s)
const override { s <<
"StreamReadHandle(" << fn_ <<
"(" << handle_ <<
"))"; }
316 Length openForRead()
override {
return 0; }
317 void openForWrite(
const Length&)
override { NOTIMP; }
318 void openForAppend(
const Length&)
override { NOTIMP; }
319 long read(
void* buffer,
long length)
override {
return fn_(handle_, buffer, length); }
320 long write(
const void*,
long)
override { NOTIMP; }
321 void close()
override {}
328 (*reader) =
new odc_reader_t(
new ReadStreamDataHandle(handle, stream_proc));
362 if (frame->
next(
false)) {
373 if (frame->
next(
true, maximum_rows)) {
383 ASSERT(source_frame);
409 int* bitfield_count) {
413 ASSERT(col >= 0 &&
size_t(col) < ci.size());
414 const auto& colInfo(ci[col]);
416 if (
name) (*name) = colInfo.name.c_str();
417 if (
type) (*type) = colInfo.type;
418 if (element_size) (*element_size) = colInfo.decodedSize;
419 if (bitfield_count) (*bitfield_count) = colInfo.bitfield.size();
427 ASSERT(col >= 0 &&
size_t(col) < ci.size());
428 const auto& colInfo(ci[col]);
429 ASSERT(field >= 0 &&
size_t(field) < colInfo.bitfield.size());
431 if (
name) (*name) = colInfo.bitfield[field].name.c_str();
432 if (offset) (*offset) = colInfo.bitfield[field].offset;
433 if (size) (*size) = colInfo.bitfield[field].size;
462 decoder->
nrows = nrows;
466 for (
size_t col = 0; col < ncols; ++col) {
482 decoder->
nrows = nrows;
490 (*nrows) = decoder->
nrows;
504 decoder->
nrows = height;
521 if (width) (*width) = decoder->
dataWidth;
523 if (columnMajor) (*columnMajor) = decoder->
columnMajor;
547 ASSERT(col >= 0 &&
size_t(col) < decoder->
columnData.size());
550 cd.elemSize = element_size;
557 ASSERT(col >= 0 &&
size_t(col) < decoder->
columnData.size());
560 cd.elemSize = element_size;
569 ASSERT(col >= 0 &&
size_t(col) < decoder->
columnData.size());
572 if (element_size) (*element_size) = cd.elemSize;
573 if (stride) (*stride) = cd.stride;
574 if (data) (*data) = cd.data;
580 if (decoder->
nrows == 0) {
584 size_t height = decoder->
nrows;
586 std::vector<std::pair<size_t, long>> offsets;
597 [&colName](
const ColumnInfo& ci) { return ci.name == colName; });
599 ASSERT(it->decodedSize > 0);
600 ASSERT(it->decodedSize %
sizeof(
double) == 0);
607 if (width != 0 && height != 0) {
612 ASSERT(width <= decoder->dataWidth);
613 ASSERT(height <= decoder->dataHeight);
617 decoder->
ownedData.reset(
new char[width * height]);
626 col.data =
static_cast<char*
>(dataPtr) + offset;
628 col.stride = col.elemSize;
629 offset += height * col.elemSize;
630 if (col.elemSize > 8) col.transpose =
true;
633 offset += col.elemSize;
636 if (col.stride == 0) col.stride = col.elemSize;
651 size_t frame_rows = frame->
rowCount();
656 ASSERT(decoder->
nrows >= frame_rows);
664 std::vector<std::pair<size_t, std::unique_ptr<double[]>>> temporaryTransposeData;
668 std::vector<StridedData> dataFacade;
674 void* data = col.data;
676 ASSERT(col.elemSize %
sizeof(
double) == 0);
677 ASSERT(col.stride == col.elemSize);
678 size_t cols = col.elemSize /
sizeof(double);
679 temporaryTransposeData.emplace_back(
i, std::unique_ptr<
double[]>(
new double[frame_rows * cols]));
680 data = temporaryTransposeData.back().second.get();
682 dataFacade.emplace_back(
StridedData{data, size_t(decoder->
nrows), size_t(col.elemSize), size_t(col.stride)});
689 ASSERT(nthreads >= 1);
690 frame->
decode(target,
static_cast<size_t>(nthreads));
694 for (
const auto& kv : temporaryTransposeData) {
695 size_t colIndex = kv.first;
696 const double* tmpArray = kv.second.get();
697 double* output =
static_cast<double*
>(decoder->
columnData[colIndex].data);
698 size_t rows = decoder->
nrows;
699 size_t cols = decoder->
columnData[colIndex].elemSize /
sizeof(double);
700 for (
size_t row = 0; row < frame_rows; row++) {
701 for (
size_t col = 0; col < cols; col++) {
702 output[row + (col * rows)] = tmpArray[col + (row * cols)];
710 if (rows_decoded) *(rows_decoded) = frame_rows;
744 encoder->
nrows = nrows;
756 return wrapApiFunction([encoder, data, width, height, columnMajorWidth] {
769 ASSERT(columnMajorWidth % 8 == 0);
774 if (encoder->
nrows == 0) encoder->
nrows = height;
789 ASSERT(col >= 0 &&
size_t(col) < encoder->
columnInfo.size());
790 ASSERT(element_size >= 0 && element_size % 8 == 0);
791 encoder->
columnInfo[col].decodedSize = element_size;
798 ASSERT(col >= 0 &&
size_t(col) < encoder->
columnInfo.size());
799 ASSERT(element_size >= 0 && element_size % 8 == 0);
800 ASSERT(stride >= 0 && stride % 8 == 0);
801 encoder->
columnInfo[col].decodedSize = element_size;
810 ASSERT(col >= 0 &&
size_t(col) < encoder->
columnInfo.size());
820 ASSERT(offset + nbits <= 32);
827 ASSERT(encoder->
nrows > 0);
830 size_t decodedWidth = 0;
832 if (info.decodedSize == 0) info.decodedSize = 8;
833 decodedWidth += info.decodedSize;
843 std::stringstream ss;
844 ss <<
"Expected " << encoder->
nrows <<
" rows, but array only contains " << encoder->
arrayHeight;
845 throw SeriousBug(ss.str(), Here());
849 std::stringstream ss;
850 ss <<
"Expected array width " << decodedWidth <<
" bytes, but array only contains " << encoder->
arrayWidth;
851 throw SeriousBug(ss.str(), Here());
868 ASSERT(
c.stride == 0);
870 c.data =
static_cast<const char*
>(encoder->
arrayData) + offset;
879 const char* psrc =
reinterpret_cast<const char*
>(
c.data);
880 char* ptgt = transposedData.back().get();
881 for (
size_t y = 0; y < encoder->
arrayHeight; ++y) {
882 for (
size_t x = 0; x < widthInElems; ++x) {
897 ASSERT(
c.stride %
sizeof(
double) == 0);
915 ASSERT(
c.stride %
sizeof(
double) == 0);
924 ASSERT(encoder->
nrows > 0);
929 std::vector<std::unique_ptr<char[]>> transposedData;
933 ASSERT(ncolumns > 0);
935 std::vector<ConstStridedData> stridedData;
936 stridedData.reserve(ncolumns);
939 for (
size_t i = 0;
i < ncolumns;
i++) {
952 struct WriteStreamDataHandle :
public eckit::DataHandle {
953 WriteStreamDataHandle(
void* handle,
odc_stream_write_t fn) : handle_(handle), fn_(fn), pos_(0) {}
954 virtual ~WriteStreamDataHandle() {}
955 void print(std::ostream& s)
const override { s <<
"StreamReadHandle(" << fn_ <<
"(" << handle_ <<
"))"; }
956 Length openForRead()
override { NOTIMP; }
957 void openForWrite(
const Length&)
override {}
958 void openForAppend(
const Length&)
override {}
959 long read(
void*,
long)
override { NOTIMP; }
960 long write(
const void* buffer,
long length)
override {
961 long written = fn_(handle_, buffer, length);
965 Offset position()
override {
return pos_; }
966 void close()
override {}
974 WriteStreamDataHandle dh(handle,
write_fn);
976 AutoClose closer(dh);
978 (*bytes_encoded) = dh.position();
984 FileDescHandle dh(fd);
986 AutoClose closer(dh);
988 if (bytes_encoded) (*bytes_encoded) = dh.position();
994 MemoryHandle dh(buffer, length);
996 AutoClose closer(dh);
998 if (bytes_encoded) (*bytes_encoded) = dh.position();
static void count(void *counter, const double *data, size_t n)
int odc_encoder_add_column(odc_encoder_t *encoder, const char *name, int type)
int odc_free_encoder(const odc_encoder_t *encoder)
static void fill_in_decoder(odc_decoder_t *decoder, const odc_frame_t *frame)
int odc_set_missing_integer(long missing_integer)
int odc_decoder_defaults_from_frame(odc_decoder_t *decoder, const odc_frame_t *frame)
int odc_frame_column_count(const odc_frame_t *frame, int *count)
int odc_encoder_add_property(odc_encoder_t *encoder, const char *key, const char *value)
int odc_encode_to_stream(odc_encoder_t *encoder, void *handle, odc_stream_write_t write_fn, long *bytes_encoded)
int odc_encoder_column_set_data_size(odc_encoder_t *encoder, int col, int element_size)
int odc_decoder_column_set_data_size(odc_decoder_t *decoder, int col, int element_size)
int odc_encoder_set_row_count(odc_encoder_t *encoder, long nrows)
int odc_open_file_descriptor(odc_reader_t **reader, int fd)
static odc_failure_handler_t g_failure_handler
int odc_column_type_count(int *count)
int odc_decoder_column_count(const odc_decoder_t *decoder, int *count)
int odc_missing_integer(long *missing_value)
int odc_free_frame(const odc_frame_t *frame)
int odc_next_frame(odc_frame_t *frame)
int odc_decoder_set_data_array(odc_decoder_t *decoder, void *buffer, long width, long height, bool columnMajor)
int odc_decode(odc_decoder_t *decoder, const odc_frame_t *frame, long *rows_decoded)
int odc_encoder_set_data_array(odc_encoder_t *encoder, const void *data, long width, long height, int columnMajorWidth)
int odc_encoder_column_add_bitfield(odc_encoder_t *encoder, int col, const char *name, int nbits)
int odc_decode_threaded(odc_decoder_t *decoder, const odc_frame_t *frame, long *rows_decoded, int nthreads)
int odc_open_buffer(odc_reader_t **reader, const void *data, long length)
int odc_set_missing_double(double missing_double)
int odc_decoder_row_count(const odc_decoder_t *decoder, long *nrows)
int odc_frame_column_attributes(const odc_frame_t *frame, int col, const char **name, int *type, int *element_size, int *bitfield_count)
void odc_encode_to_data_handle(odc_encoder_t *encoder, eckit::DataHandle &dh)
int odc_missing_double(double *missing_value)
int odc_decoder_add_column(odc_decoder_t *decoder, const char *name)
int odc_encode_to_file_descriptor(odc_encoder_t *encoder, int fd, long *bytes_encoded)
int odc_copy_frame(odc_frame_t *source_frame, odc_frame_t **copy)
int odc_open_stream(odc_reader_t **reader, void *handle, odc_stream_read_t stream_proc)
int odc_integer_behaviour(int integerBehaviour)
int odc_decoder_data_array(const odc_decoder_t *decoder, const void **data, long *width, long *height, bool *columnMajor)
int odc_new_encoder(odc_encoder_t **encoder)
int odc_open_path(odc_reader_t **reader, const char *filename)
int odc_decoder_set_column_major(odc_decoder_t *decoder, bool columnMajor)
int odc_new_decoder(odc_decoder_t **decoder)
int odc_decoder_column_set_data_array(odc_decoder_t *decoder, int col, int element_size, int stride, void *data)
int odc_decoder_column_data_array(const odc_decoder_t *decoder, int col, int *element_size, int *stride, const void **data)
int odc_encoder_column_set_data_array(odc_encoder_t *encoder, int col, int element_size, int stride, const void *data)
int odc_frame_bitfield_attributes(const odc_frame_t *frame, int col, int field, const char **name, int *offset, int *size)
int odc_frame_row_count(const odc_frame_t *frame, long *count)
int odc_new_frame(odc_frame_t **frame, odc_reader_t *reader)
int odc_close(const odc_reader_t *reader)
static void * g_failure_handler_context
int odc_encoder_set_rows_per_frame(odc_encoder_t *encoder, long rows_per_frame)
int odc_encode_to_buffer(odc_encoder_t *encoder, void *buffer, long length, long *bytes_encoded)
void fill_in_encoder(odc_encoder_t *encoder, std::vector< std::unique_ptr< char[]>> &transposedData)
static std::string g_current_error_str
int odc_next_frame_aggregated(odc_frame_t *frame, long maximum_rows)
int odc_decoder_set_row_count(odc_decoder_t *decoder, long nrows)
int odc_free_decoder(const odc_decoder_t *decoder)
const std::vector< ColumnInfo > & columnInfo() const
bool next(bool aggregated=true, long rowlimit=-1)
void decode(Decoder &target, size_t nthreads) const
size_t columnCount() const
Reader(const std::string &path)
static void setDoubleMissingValue(double val)
static void treatIntegersAsDoubles(bool flag)
static const std::string & version()
static void setIntegerMissingValue(long val)
static long integerMissingValue()
static const std::string & gitsha1()
static double doubleMissingValue()
int wrapApiFunction(FN f)
int innerWrapFn(std::function< void()> f)
IODA_DL void copy(const ObjectSelection &from, ObjectSelection &to, const ScaleMapping &scale_map)
Generic data copying function.
void encode(DataHandle &out, const std::vector< ColumnInfo > &columns, const std::vector< ConstStridedData > &data, const std::map< std::string, std::string > &properties, size_t maxRowsPerFrame)
character(:) function, allocatable, target, public odc_error_string(err)
integer function, public odc_version(version_str)
integer(c_long) function write_fn(context, buffer, length)
integer function, public odc_vcs_version(git_sha1)
integer function, public odc_set_failure_handler(handler, context)
integer function, public odc_column_type_name(type, type_name)
subroutine write(self, filename_in)
long(* odc_stream_read_t)(void *context, void *buffer, long length)
struct odc_encoder_t odc_encoder_t
const int ODC_INTEGERS_AS_LONGS
@ ODC_ERROR_GENERAL_EXCEPTION
@ ODC_ERROR_UNKNOWN_EXCEPTION
long(* odc_stream_write_t)(void *context, const void *buffer, long length)
const int ODC_INTEGERS_AS_DOUBLES
void(* odc_failure_handler_t)(void *context, int error_code)
struct odc_decoder_t odc_decoder_t
struct odc_reader_t odc_reader_t
std::vector< Bit > bitfield
std::vector< DecodeColumn > columnData
std::unique_ptr< char > ownedData
std::vector< std::string > columnNames
std::vector< EncodeColumn > columnData
std::vector< ColumnInfo > columnInfo
std::map< std::string, std::string > properties