IODA Bundle
Table.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2018 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "odc/core/Table.h"
12 
13 #include <functional>
14 #include <bitset>
15 
16 #include "eckit/io/AutoCloser.h"
17 #include "eckit/io/Buffer.h"
18 #include "eckit/io/MemoryHandle.h"
19 #include "eckit/types/FixedString.h"
20 
21 #include "odc/core/DecodeTarget.h"
22 #include "odc/core/Header.h"
23 #include "odc/core/MetaData.h"
24 #include "odc/core/Codec.h"
25 
26 using namespace eckit;
27 
28 
29 namespace odc {
30 namespace core {
31 
32 //----------------------------------------------------------------------------------------------------------------------
33 
34 Table::Table(const ThreadSharedDataHandle& dh) :
35  dh_(dh) {}
36 
37 Offset Table::startPosition() const {
38  return startPosition_;
39 }
40 
41 
42 Offset Table::nextPosition() const {
43  return nextPosition_;
44 }
45 
46 Length Table::encodedDataSize() const {
47  return dataSize_;
48 }
49 
50 size_t Table::rowCount() const {
51  return metadata_.rowsNumber();
52 }
53 
54 size_t Table::columnCount() const {
55  return metadata_.size();
56 }
57 
58 int32_t Table::byteOrder() const {
59  return byteOrder_;
60 }
61 
62 bool Table::otherByteOrder() const {
64 }
65 
66 const MetaData& Table::columns() const {
67  return metadata_;
68 }
69 
70 const Properties& Table::properties() const {
71  return properties_;
72 }
73 
75 
76  Buffer data(dataSize_);
77 
79  dh_.read(data, dataSize_);
80  return data;
81 }
82 
83 const std::map<std::string, size_t>& Table::columnLookup() {
84 
85  if (columnLookup_.empty()) {
86 
87  const MetaData& metadata(columns());
88  size_t ncols = metadata.size();
89 
90  for (size_t i = 0; i < ncols; i++) {
91  const auto& nm(metadata[i]->name());
92  if (!columnLookup_.emplace(nm, i).second) {
93  std::stringstream ss;
94  ss << "Duplicate column '" << nm << "' " << " found in table";
95  throw ODBDecodeError(ss.str(), Here());
96  }
97  simpleColumnLookup_.emplace(nm.substr(0, nm.find('@')), i);
98  }
99  }
100 
101  return columnLookup_;
102 }
103 
104 const std::map<std::string, size_t>&Table::simpleColumnLookup() {
105 
106  if (simpleColumnLookup_.empty()) {
107  columnLookup();
108  }
109 
110  return simpleColumnLookup_;
111 }
112 
113 
115 
116  const MetaData& metadata(columns());
117  size_t nrows = metadata.rowsNumber();
118  size_t ncols = metadata.size();
119 
120  const std::map<std::string, size_t>& columnLookup(this->columnLookup());
121  const std::map<std::string, size_t>& lookupSimple(simpleColumnLookup());
122 
123  // Loop over the specified output columns, and select the correct ones for decoding.
124 
125  std::vector<char> visitColumn(ncols, false);
126  std::vector<api::StridedData*> facades(ncols, 0); // TODO: Do we want to do a copy, rather than point to StridedData*?
127 
128  ASSERT(target.columns().size() == target.dataFacades().size());
129  ASSERT(target.columns().size() <= ncols);
130 
131  for (size_t i = 0; i < target.columns().size(); i++) {
132 
133  const auto& nm(target.columns()[i]);
134  auto it = columnLookup.find(nm);
135  if (it == columnLookup.end()) it = lookupSimple.find(nm);
136  if (it == lookupSimple.end()) {
137  std::stringstream ss;
138  ss << "Column '" << nm << "' not found in ODB";
139  throw ODBDecodeError(ss.str(), Here());
140  }
141 
142  size_t pos = it->second;
143  if (visitColumn[pos]) {
144  std::stringstream ss;
145  ss << "Duplicated column '" << nm << "' in decode specification";
146  throw ODBDecodeError(ss.str(), Here());
147  }
148 
149  visitColumn[pos] = true;
150  facades[pos] = &target.dataFacades()[i];
151  ASSERT(target.dataFacades()[i].nelem() >= nrows);
152  }
153 
154  // Read the data in in bulk for this table
155 
156  const Buffer readBuffer(readEncodedData());
157  GeneralDataStream ds(otherByteOrder(), readBuffer);
158 
159  std::vector<std::reference_wrapper<Codec>> decoders;
160  decoders.reserve(ncols);
161  for (auto& col : metadata) {
162  decoders.push_back(col->coder());
163  decoders.back().get().setDataStream(ds);
164  }
165 
166  // Do the decoding
167 
168  int lastStartCol = 0;
169  std::vector<size_t> lastDecoded(ncols, 0);
170 
171  for (size_t rowCount = 0; rowCount < nrows; ++rowCount) {
172 
173  unsigned char marker[2];
174  ds.readBytes(&marker, sizeof(marker));
175  int startCol = (marker[0] * 256) + marker[1]; // Endian independant
176 
177  if (lastStartCol > startCol) {
178  for (int col = startCol; col < lastStartCol; col++) {
179  if (visitColumn[col]) {
180  facades[col]->fill(lastDecoded[col], rowCount-1);
181  }
182  }
183  }
184 
185  for (int col = startCol; col < long(ncols); col++) {
186  if (visitColumn[col]) {
187  decoders[col].get().decode(reinterpret_cast<double*>((*facades[col])[rowCount]));
188  lastDecoded[col] = rowCount;
189  } else {
190  decoders[col].get().skip();
191  }
192  }
193 
194  lastStartCol = startCol;
195  }
196 
197  // And fill in any columns that are incomplete
198 
199  for (size_t col = 0; col < ncols; col++) {
200  if (lastDecoded[col] < nrows-1) {
201  if (visitColumn[col]) {
202  facades[col]->fill(lastDecoded[col], nrows-1);
203  }
204  } else {
205  break;
206  }
207  }
208 }
209 
210 
211 Span Table::span(const std::vector<std::string>& columns, bool onlyConstants) {
212 
214 
215  // Get any constant columns
216 
217  std::vector<std::string> nonConstantColumns;
218 
219  for (const std::string& columnName : columns) {
220 
221  Column* column = metadata_.columnByName(columnName);
222  if (!column) throw UserError("Column '" + columnName + "' not found", Here());
223 
224  if (column->isConstant()) {
225  s.addValue(columnName, column->type(), column->min());
226  } else {
227  nonConstantColumns.push_back(columnName);
228  }
229  }
230 
231  // We don't decode non-constant columns unless allowed to
232 
233  if (!nonConstantColumns.empty() && onlyConstants) {
234  std::stringstream ss;
235  ss << "Non-constant columns required in span: " << nonConstantColumns;
236  throw UserError(ss.str(), Here());
237  }
238 
239  if (!nonConstantColumns.empty()) {
240  s.extend(decodeSpan(nonConstantColumns));
241  }
242 
243  return s;
244 }
245 
246 
247 // Helper workers to simplify building span decoder
248 
250 public: // methods
251 
252  virtual ~ColumnValuesBase() {}
253 
254  virtual void updateSpan(Span& span) = 0;
255  virtual void addValue(double* val) = 0;
256 };
257 
258 
259 template <typename T>
261 public: // methods
262 
263  ColumnValues(const std::string& name) : name_(name) {}
264 
265  void updateSpan(Span& s) override {
266  s.addValues(name_, values_);
267  }
268 
269 protected: // members
270  std::string name_;
271  std::set<T> values_;
272 };
273 
276  void addValue(double* val) override { values_.insert(static_cast<int64_t>(*val)); }
277 };
278 
281  void addValue(double* val) override { values_.insert(*val); }
282 };
283 
284 struct StringColumnValues : ColumnValues<std::string> {
285  StringColumnValues(const std::string& nm, size_t maxlen) : ColumnValues<std::string>(nm), maxLength_(maxlen) {}
286  void addValue(double* val) override {
287  const char* c = reinterpret_cast<const char*>(val);
288  values_.insert(std::string(c, ::strnlen(c, maxLength_)));
289  }
290  size_t maxLength_;
291 };
292 
293 
294 
295 Span Table::decodeSpan(const std::vector<std::string>& columns) {
296 
297  const MetaData& metadata(this->columns());
298  size_t nrows = metadata.rowsNumber();
299  size_t ncols = metadata.size();
300 
301  const std::map<std::string, size_t>& columnLookup = this->columnLookup();
302  const std::map<std::string, size_t>& lookupSimple = simpleColumnLookup();
303 
304  // Store the unique values
305 
306  std::vector<std::unique_ptr<ColumnValuesBase>> columnValues(ncols);
307 
308  // Loop over the specified output columns, and select the correct ones for decoding.
309 
310  std::vector<char> visitColumn(ncols, false);
311  size_t maxDoublesDecode = 1;
312 
313  for (const std::string& columnName : columns) {
314 
315  auto it = columnLookup.find(columnName);
316  if (it == columnLookup.end()) it = lookupSimple.find(columnName);
317  if (it == lookupSimple.end()) {
318  std::stringstream ss;
319  ss << "Column '" << columnName << "' not found in ODB";
320  throw ODBDecodeError(ss.str(), Here());
321  }
322 
323  visitColumn[it->second] = true;
324 
325  // What do we do with the values?
326 
327  switch (metadata[it->second]->type()) {
328  case api::BITFIELD:
329  case api::INTEGER:
330  columnValues[it->second].reset(new IntegerColumnValues(columnName));
331  break;
332  case api::REAL:
333  case api::DOUBLE:
334  columnValues[it->second].reset(new DoubleColumnValues(columnName));
335  break;
336  case api::STRING:
337  columnValues[it->second].reset(new StringColumnValues(columnName, sizeof(double)*metadata[it->second]->dataSizeDoubles()));
338  maxDoublesDecode = std::max(maxDoublesDecode, metadata[it->second]->dataSizeDoubles());
339  break;
340  default:
341  throw SeriousBug("Unexpected type in decoding column: " + columnName, Here());
342  };
343  }
344 
345  // Read the data in in bulk for this table
346 
347  const Buffer readBuffer(readEncodedData());
348  GeneralDataStream ds(otherByteOrder(), readBuffer);
349 
350  std::vector<std::reference_wrapper<Codec>> decoders;
351  decoders.reserve(ncols);
352  for (auto& col : metadata) {
353  decoders.push_back(col->coder());
354  decoders.back().get().setDataStream(ds);
355  }
356 
357  // Do the decoding
358 
359  std::vector<size_t> lastDecoded(ncols, 0);
360  double decodeBuffer[maxDoublesDecode];
361 
362  for (size_t rowCount = 0; rowCount < nrows; ++rowCount) {
363 
364  unsigned char marker[2];
365  ds.readBytes(&marker, sizeof(marker));
366  int startCol = (marker[0] * 256) + marker[1]; // Endian independant
367 
368  for (int col = startCol; col < long(ncols); col++) {
369  if (visitColumn[col]) {
370  decoders[col].get().decode(decodeBuffer);
371  columnValues[col]->addValue(decodeBuffer);
372  } else {
373  decoders[col].get().skip();
374  }
375  }
376  }
377 
378  // And add these to the spans
379 
381  for (const auto& values : columnValues) values->updateSpan(s);
382  return s;
383 }
384 
385 
387 
388  Offset startPosition = dh.position();
389 
390  // Check the magic number. If no more data, we are done
391 
392  if (!Header::readMagic(dh)) return 0;
393 
394  // Load the header
395 
396  std::unique_ptr<Table> newTable(new Table(dh));
397  Header hdr(newTable->metadata_, newTable->properties_);
398  hdr.loadAfterMagic(dh);
399 
400  newTable->startPosition_ = startPosition;
401  newTable->dataPosition_ = dh.position();
402  newTable->dataSize_ = hdr.dataSize();
403  newTable->nextPosition_ = dh.position() + newTable->dataSize_;
404  newTable->byteOrder_ = hdr.byteOrder();
405 
406  // Check that the ODB hasn't been truncated.
407  // n.b. Some DataHandles always return 0 (e.g. on a stream), so leth that pass.
408  if (newTable->nextPosition_ > dh.estimate() && dh.estimate() != 0) {
409  throw ODBIncomplete(dh.title(), Here());
410  }
411 
412  return newTable;
413 }
414 
415 //----------------------------------------------------------------------------------------------------------------------
416 
417 }
418 }
static api::ColumnType type(const std::string &)
Definition: Column.cc:74
bool isConstant()
Definition: Column.cc:89
void min(double m)
Definition: Column.h:74
virtual ~ColumnValuesBase()
Definition: Table.cc:252
virtual void addValue(double *val)=0
virtual void updateSpan(Span &span)=0
void updateSpan(Span &s) override
Definition: Table.cc:265
std::string name_
Definition: Table.cc:270
std::set< T > values_
Definition: Table.cc:271
ColumnValues(const std::string &name)
Definition: Table.cc:263
const std::vector< std::string > & columns() const
Definition: DecodeTarget.cc:31
std::vector< api::StridedData > & dataFacades()
Definition: DecodeTarget.cc:35
void readBytes(Args &&... args)
Definition: DataStream.h:144
static bool readMagic(eckit::DataHandle &dh)
Definition: Header.cc:39
Column * columnByName(const std::string &) const
Definition: MetaData.cc:84
unsigned long long rowsNumber() const
Definition: MetaData.h:39
void extend(const Span &other)
Definition: Span.cc:32
void addValue(const std::string &column, api::ColumnType t, double val)
Definition: Span.cc:47
void addValues(const std::string &column, const std::set< long > &vals)
Definition: Span.cc:89
eckit::Buffer readEncodedData()
Definition: Table.cc:74
eckit::Offset startPosition_
Definition: Table.h:77
Table(const ThreadSharedDataHandle &dh)
Definition: Table.cc:34
bool otherByteOrder() const
Definition: Table.cc:62
size_t columnCount() const
Definition: Table.cc:54
Properties properties_
Definition: Table.h:84
static std::unique_ptr< Table > readTable(ThreadSharedDataHandle &dh)
Definition: Table.cc:386
Span span(const std::vector< std::string > &columns, bool onlyConstant=false)
Definition: Table.cc:211
eckit::Length dataSize_
Definition: Table.h:79
eckit::Offset nextPosition() const
Definition: Table.cc:42
int32_t byteOrder_
Definition: Table.h:81
eckit::Offset nextPosition_
Definition: Table.h:80
Span decodeSpan(const std::vector< std::string > &columns)
Definition: Table.cc:295
eckit::Offset dataPosition_
Definition: Table.h:78
const Properties & properties() const
Definition: Table.cc:70
size_t rowCount() const
Definition: Table.cc:50
const std::map< std::string, size_t > & simpleColumnLookup()
Definition: Table.cc:104
const std::map< std::string, size_t > & columnLookup()
Lookups used for decoding. Memoised for efficiency.
Definition: Table.cc:83
int32_t byteOrder() const
Definition: Table.cc:58
void decode(DecodeTarget &target)
Definition: Table.cc:114
const MetaData & columns() const
Definition: Table.cc:66
std::map< std::string, size_t > columnLookup_
Definition: Table.h:88
eckit::Length encodedDataSize() const
Definition: Table.cc:46
std::map< std::string, size_t > simpleColumnLookup_
Definition: Table.h:89
eckit::Offset startPosition() const
Definition: Table.cc:37
MetaData metadata_
Definition: Table.h:83
ThreadSharedDataHandle dh_
Definition: Table.h:75
std::string title() const override
eckit::Offset seek(const eckit::Offset &) override
@ BITFIELD
Definition: ColumnType.h:27
std::map< std::string, std::string > Properties
Definition: Header.h:35
const int32_t BYTE_ORDER_INDICATOR
Definition: Header.h:40
Definition: ColumnInfo.h:23
Definition: encode.cc:30
void addValue(double *val) override
Definition: Table.cc:281
void addValue(double *val) override
Definition: Table.cc:276
StringColumnValues(const std::string &nm, size_t maxlen)
Definition: Table.cc:285
void addValue(double *val) override
Definition: Table.cc:286