16 #include "eckit/io/AutoCloser.h"
17 #include "eckit/io/Buffer.h"
18 #include "eckit/io/MemoryHandle.h"
19 #include "eckit/types/FixedString.h"
26 using namespace eckit;
88 size_t ncols = metadata.size();
90 for (
size_t i = 0;
i < ncols;
i++) {
91 const auto& nm(metadata[
i]->
name());
94 ss <<
"Duplicate column '" << nm <<
"' " <<
" found in table";
118 size_t ncols = metadata.size();
125 std::vector<char> visitColumn(ncols,
false);
126 std::vector<api::StridedData*> facades(ncols, 0);
129 ASSERT(target.
columns().size() <= ncols);
131 for (
size_t i = 0;
i < target.
columns().size();
i++) {
133 const auto& nm(target.
columns()[
i]);
135 if (it ==
columnLookup.end()) it = lookupSimple.find(nm);
136 if (it == lookupSimple.end()) {
137 std::stringstream ss;
138 ss <<
"Column '" << nm <<
"' not found in ODB";
142 size_t pos = it->second;
143 if (visitColumn[pos]) {
144 std::stringstream ss;
145 ss <<
"Duplicated column '" << nm <<
"' in decode specification";
149 visitColumn[pos] =
true;
159 std::vector<std::reference_wrapper<Codec>> decoders;
160 decoders.reserve(ncols);
161 for (
auto& col : metadata) {
162 decoders.push_back(col->coder());
163 decoders.back().get().setDataStream(ds);
168 int lastStartCol = 0;
169 std::vector<size_t> lastDecoded(ncols, 0);
173 unsigned char marker[2];
175 int startCol = (marker[0] * 256) + marker[1];
177 if (lastStartCol > startCol) {
178 for (
int col = startCol; col < lastStartCol; col++) {
179 if (visitColumn[col]) {
180 facades[col]->fill(lastDecoded[col],
rowCount-1);
185 for (
int col = startCol; col <
long(ncols); col++) {
186 if (visitColumn[col]) {
187 decoders[col].get().decode(
reinterpret_cast<double*
>((*facades[col])[
rowCount]));
190 decoders[col].get().skip();
194 lastStartCol = startCol;
199 for (
size_t col = 0; col < ncols; col++) {
200 if (lastDecoded[col] < nrows-1) {
201 if (visitColumn[col]) {
202 facades[col]->fill(lastDecoded[col], nrows-1);
217 std::vector<std::string> nonConstantColumns;
219 for (
const std::string& columnName :
columns) {
222 if (!column)
throw UserError(
"Column '" + columnName +
"' not found", Here());
227 nonConstantColumns.push_back(columnName);
233 if (!nonConstantColumns.empty() && onlyConstants) {
234 std::stringstream ss;
235 ss <<
"Non-constant columns required in span: " << nonConstantColumns;
236 throw UserError(ss.str(), Here());
239 if (!nonConstantColumns.empty()) {
259 template <
typename T>
287 const char*
c =
reinterpret_cast<const char*
>(
val);
299 size_t ncols = metadata.size();
306 std::vector<std::unique_ptr<ColumnValuesBase>> columnValues(ncols);
310 std::vector<char> visitColumn(ncols,
false);
311 size_t maxDoublesDecode = 1;
313 for (
const std::string& columnName :
columns) {
316 if (it ==
columnLookup.end()) it = lookupSimple.find(columnName);
317 if (it == lookupSimple.end()) {
318 std::stringstream ss;
319 ss <<
"Column '" << columnName <<
"' not found in ODB";
323 visitColumn[it->second] =
true;
327 switch (metadata[it->second]->type()) {
337 columnValues[it->second].reset(
new StringColumnValues(columnName,
sizeof(
double)*metadata[it->second]->dataSizeDoubles()));
338 maxDoublesDecode = std::max(maxDoublesDecode, metadata[it->second]->dataSizeDoubles());
341 throw SeriousBug(
"Unexpected type in decoding column: " + columnName, Here());
350 std::vector<std::reference_wrapper<Codec>> decoders;
351 decoders.reserve(ncols);
352 for (
auto& col : metadata) {
353 decoders.push_back(col->coder());
354 decoders.back().get().setDataStream(ds);
359 std::vector<size_t> lastDecoded(ncols, 0);
360 double decodeBuffer[maxDoublesDecode];
364 unsigned char marker[2];
366 int startCol = (marker[0] * 256) + marker[1];
368 for (
int col = startCol; col <
long(ncols); col++) {
369 if (visitColumn[col]) {
370 decoders[col].get().decode(decodeBuffer);
371 columnValues[col]->addValue(decodeBuffer);
373 decoders[col].get().skip();
381 for (
const auto& values : columnValues) values->updateSpan(s);
396 std::unique_ptr<Table> newTable(
new Table(dh));
397 Header hdr(newTable->metadata_, newTable->properties_);
398 hdr.loadAfterMagic(dh);
401 newTable->dataPosition_ = dh.
position();
402 newTable->dataSize_ =
hdr.dataSize();
403 newTable->nextPosition_ = dh.
position() + newTable->dataSize_;
404 newTable->byteOrder_ =
hdr.byteOrder();
static api::ColumnType type(const std::string &)
virtual ~ColumnValuesBase()
virtual void addValue(double *val)=0
virtual void updateSpan(Span &span)=0
void updateSpan(Span &s) override
ColumnValues(const std::string &name)
const std::vector< std::string > & columns() const
std::vector< api::StridedData > & dataFacades()
void readBytes(Args &&... args)
void extend(const Span &other)
void addValue(const std::string &column, api::ColumnType t, double val)
void addValues(const std::string &column, const std::set< long > &vals)
eckit::Buffer readEncodedData()
eckit::Offset startPosition_
Table(const ThreadSharedDataHandle &dh)
bool otherByteOrder() const
size_t columnCount() const
static std::unique_ptr< Table > readTable(ThreadSharedDataHandle &dh)
Span span(const std::vector< std::string > &columns, bool onlyConstant=false)
eckit::Offset nextPosition() const
eckit::Offset nextPosition_
Span decodeSpan(const std::vector< std::string > &columns)
eckit::Offset dataPosition_
const Properties & properties() const
const std::map< std::string, size_t > & simpleColumnLookup()
const std::map< std::string, size_t > & columnLookup()
Lookups used for decoding. Memoised for efficiency.
int32_t byteOrder() const
void decode(DecodeTarget &target)
const MetaData & columns() const
std::map< std::string, size_t > columnLookup_
eckit::Length encodedDataSize() const
std::map< std::string, size_t > simpleColumnLookup_
eckit::Offset startPosition() const
ThreadSharedDataHandle dh_
eckit::Offset position() override
std::string title() const override
long read(void *, long) override
eckit::Offset seek(const eckit::Offset &) override
eckit::Length estimate() override
std::map< std::string, std::string > Properties
const int32_t BYTE_ORDER_INDICATOR
void addValue(double *val) override
void addValue(double *val) override
StringColumnValues(const std::string &nm, size_t maxlen)
void addValue(double *val) override