IODA Bundle
Odb.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2019- ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "odc/api/Odb.h"
12 
13 #include <numeric>
14 #include <mutex>
15 #include <future>
16 
17 #include "eckit/filesystem/PathName.h"
18 #include "eckit/io/HandleBuf.h"
19 #include "eckit/io/MemoryHandle.h"
20 #include "eckit/log/Log.h"
21 
22 #include "odc/core/DecodeTarget.h"
23 #include "odc/core/Encoder.h"
24 #include "odc/core/Table.h"
25 #include "odc/core/TablesReader.h"
26 #include "odc/csv/TextReader.h"
28 #include "odc/LibOdc.h"
29 #include "odc/MDI.h"
30 #include "odc/ODBAPISettings.h"
31 #include "odc/Writer.h"
32 
33 using namespace eckit;
34 
35 namespace odc {
36 namespace api {
37 
38 //----------------------------------------------------------------------------------------------------------------------
39 
40 ///
41 /// Internal types
42 
43 struct FrameImpl {
44  FrameImpl(Reader& reader);
45  FrameImpl(const FrameImpl& rhs);
46 
47  // Moves this frame onwards
48  bool next(bool aggregated, long rowlimit);
49 
50  const std::vector<ColumnInfo>& columnInfo() const;
51 
52  size_t rowCount() const;
53  size_t columnCount() const;
54 
55  void decode(Decoder& target, size_t nthreads);
56  Span span(const std::vector<std::string>& columns, bool onlyConstantValues);
57 
58 private: // members
59 
60  mutable std::vector<ColumnInfo> columnInfo_;
63  std::vector<core::Table> tables_;
64  bool first_;
65 };
66 
67 
68 // Internal API class definition
69 
71 
72 public: // methods
73 
74  using core::TablesReader::TablesReader;
75 };
76 
77 //----------------------------------------------------------------------------------------------------------------------
78 
79 // API Forwarding
80 
81 Reader::Reader(const std::string& path) :
82  impl_(std::make_shared<ReaderImpl>(path)) {}
83 
84 Reader::Reader(eckit::DataHandle& dh) :
85  impl_(std::make_shared<ReaderImpl>(dh)) {}
86 
87 Reader::Reader(eckit::DataHandle* dh) :
88  impl_(std::make_shared<ReaderImpl>(dh)) {}
89 
91 
92 //----------------------------------------------------------------------------------------------------------------------
93 
94 // Shim for decoding
96 public:
98 };
99 
100 Decoder::Decoder(const std::vector<std::string>& columns,
101  std::vector<StridedData>& columnFacades) :
102  impl_(std::make_shared<DecoderImpl>(columns, columnFacades)) {}
103 
105 
106 //----------------------------------------------------------------------------------------------------------------------
107 
109 
110 //----------------------------------------------------------------------------------------------------------------------
111 
113  SpanImpl(core::Span&& s) : core::Span(std::move(s)) {}
114 };
115 
116 Span::Span(std::shared_ptr<SpanImpl> s) : impl_(s) {}
117 
118 void Span::visit(SpanVisitor& visitor) const {
119  impl_->visit(visitor);
120 }
121 
122 Offset Span::offset() const {
123  return impl_->offset();
124 }
125 
126 Length Span::length() const {
127  return impl_->length();
128 }
129 
130 //----------------------------------------------------------------------------------------------------------------------
131 
132 // Table implementation
133 
135  reader_(*reader.impl_),
136  it_(reader_.begin()),
137  first_(true) {}
138 
140  columnInfo_(rhs.columnInfo_),
141  reader_(rhs.reader_),
142  it_(rhs.it_),
143  tables_(rhs.tables_),
144  first_(rhs.first_) {}
145 
146 bool FrameImpl::next(bool aggregated, long rowlimit) {
147 
148  // n.b. Slightly convoluted incrementing of iterator ensures that for a simple read we do it
149  // in a single pass, so it will work on a non random-access DataHandle.
150 
151  columnInfo_.clear();
152  tables_.clear();
153 
154  if (it_ == reader_.end()) return false;
155 
156  if (!first_) {
157  ++it_;
158  if (it_ == reader_.end()) return false;
159  }
160 
161  first_ = false;
162  tables_.emplace_back(*it_);
163  long nrows = tables_.back().rowCount();
164 
165  if (aggregated) {
166  while (true) {
167  auto it_next = it_;
168  ++it_next;
169  if (it_next == reader_.end()) break;
170 
171  long next_nrows = nrows + it_next->rowCount();
172  if (rowlimit >= 0 && next_nrows > rowlimit) break;
173  if (!tables_.front().columns().compatible(it_next->columns())) break;
174 
175  ++it_;
176  tables_.emplace_back(*it_);
177  nrows = next_nrows;
178  }
179  }
180 
181  ASSERT(rowlimit < 0 || nrows <= rowlimit);
182  return true;
183 }
184 
185 const std::vector<ColumnInfo>& FrameImpl::columnInfo() const {
186 
187  ASSERT(tables_.size() > 0);
188 
189  // ColumnInfo is memoised, so only constructed once
190 
191  if (columnInfo_.empty()) {
192 
193  columnInfo_.reserve(columnCount());
194 
195  for (const core::Column* col : tables_.begin()->columns()) {
196 
197  // Extract any bitfield details
198 
199  const eckit::sql::BitfieldDef& bf(col->bitfieldDef());
200 
201  ASSERT(bf.first.size() == bf.second.size());
202  std::vector<ColumnInfo::Bit> bitfield;
203  bitfield.reserve(bf.first.size());
204 
205  uint8_t offset = 0;
206  for (size_t i = 0; i < bf.first.size(); i++) {
207  bitfield.emplace_back(ColumnInfo::Bit {
208  bf.first[i], // name
209  bf.second[i], // size
210  offset // offset
211  });
212  offset += bf.second[i];
213  }
214 
215  // Construct column details
216 
217  columnInfo_.emplace_back(ColumnInfo {
218  col->name(),
219  col->type(),
220  col->dataSizeDoubles() * sizeof(double),
221  std::move(bitfield)
222  });
223  }
224  }
225 
226  return columnInfo_;
227 }
228 
229 size_t FrameImpl::rowCount() const {
230  return std::accumulate(tables_.begin(), tables_.end(), size_t(0),
231  [](size_t n, const core::Table& t) { return n + t.rowCount(); });
232 }
233 
234 size_t FrameImpl::columnCount() const {
235  ASSERT_MSG(!tables_.empty(), "No tables. Have you remembered to call odc_next_frame() on frame?");
236  return tables_[0].columnCount();
237 }
238 
239 void FrameImpl::decode(Decoder& target, size_t nthreads) {
240 
241  if (tables_.size() == 1) {
242  tables_[0].decode(*target.impl_);
243  } else {
244 
245  std::vector<core::DecodeTarget> targets;
246 
247  size_t rowOffset = 0;
248  for (core::Table& t : tables_) {
249  size_t rows = t.rowCount();
250  core::DecodeTarget&& subTarget(target.impl_->slice(rowOffset, rows));
251  if (nthreads == 1) {
252  t.decode(subTarget);
253  } else {
254  targets.emplace_back(subTarget);
255  }
256  rowOffset += rows;
257  }
258 
259  if (nthreads > 1) {
260  std::mutex guard_mutex;
261  std::vector<std::future<void>> threads;
262  size_t next_frame = 0;
263 
264  for (size_t i = 0; i < nthreads; i++) {
265  threads.emplace_back(std::async(std::launch::async, [&] {
266  while (true) {
267  size_t frame;
268 
269  {
270  std::lock_guard<std::mutex> guard(guard_mutex);
271  if (next_frame < tables_.size()) {
272  frame = next_frame++;
273  } else {
274  return;
275  }
276  }
277 
278  tables_[frame].decode(targets[frame]);
279  }
280  }));
281  }
282 
283  // Waits for the threads. If any exceptions have been thrown, they get thrown into
284  // the main thread here.
285  for (auto& thread : threads) {
286  thread.get();
287  }
288  }
289  }
290 }
291 
292 Span FrameImpl::span(const std::vector<std::string>& columns, bool onlyConstantValues) {
293 
294  std::shared_ptr<SpanImpl> s(std::make_shared<SpanImpl>(tables_.front().span(columns, onlyConstantValues)));
295 
296  for (auto it = tables_.begin() + 1; it != tables_.end(); ++it) {
297  s->extend(it->span(columns, onlyConstantValues));
298  }
299 
300  return s;
301 }
302 
304  impl_(new FrameImpl(reader)) {}
305 
306 Frame::Frame(const Frame& rhs) :
307  impl_(new FrameImpl(*rhs.impl_)) {}
308 
310 
311 bool Frame::next(bool aggregated, long rowlimit) {
312  ASSERT(impl_);
313  return impl_->next(aggregated, rowlimit);
314 }
315 
316 size_t Frame::rowCount() const {
317  ASSERT(impl_);
318  return impl_->rowCount();
319 }
320 
321 size_t Frame::columnCount() const {
322  ASSERT(impl_);
323  return impl_->columnCount();
324 }
325 
326 const std::vector<ColumnInfo>& Frame::columnInfo() const {
327  ASSERT(impl_);
328  return impl_->columnInfo();
329 }
330 
331 void Frame::decode(Decoder& target, size_t nthreads) const {
332  ASSERT(impl_);
333  impl_->decode(target, nthreads);
334 }
335 
336 Span Frame::span(const std::vector<std::string>& columns, bool onlyConstantValues) const {
337  ASSERT(impl_);
338  return impl_->span(columns, onlyConstantValues);
339 }
340 
341 //----------------------------------------------------------------------------------------------------------------------
342 
345 }
346 
349 }
350 
353 }
354 
355 const std::string& Settings::version() {
356  static std::string vstring = LibOdc::instance().version();
357  return vstring;
358 }
359 
360 const std::string& Settings::gitsha1() {
361  static std::string vstring = LibOdc::instance().gitsha1(40);
362  return vstring;
363 }
364 
366  return odc::MDI::integerMDI();
367 }
368 
370  return odc::MDI::realMDI();
371 }
372 
373 //----------------------------------------------------------------------------------------------------------------------
374 
375 size_t odbFromCSV(DataHandle& dh_in, DataHandle& dh_out, const std::string& delimiter) {
376 
377  // Convert data handle to std::istream.
378  HandleBuf buf(dh_in);
379  std::istream is(&buf);
380 
381  return odbFromCSV(is, dh_out, delimiter);
382 }
383 
384 size_t odbFromCSV(std::istream& in, DataHandle& dh_out, const std::string& delimiter) {
385 
386  odc::TextReader reader(in, delimiter);
387  odc::Writer<> writer(dh_out);
388  odc::Writer<>::iterator output(writer.begin());
389 
390  return output->pass1(reader.begin(), reader.end());
391 }
392 
393 size_t odbFromCSV(const std::string& in, eckit::DataHandle& dh_out, const std::string& delimiter) {
394  MemoryHandle dh_in(in.c_str(), in.length());
395  dh_in.openForRead();
396  AutoClose close(dh_in);
397  return odbFromCSV(dh_in, dh_out, delimiter);
398 }
399 
400 void encode(DataHandle& out,
401  const std::vector<ColumnInfo>& columns,
402  const std::vector<ConstStridedData>& data,
403  const std::map<std::string, std::string>& properties,
404  size_t maxRowsPerFrame) {
405 
406  ASSERT(columns.size() == data.size());
407  ASSERT(data.size() > 0);
408 
409  size_t ncols = data.size();
410  size_t nrows = data[0].nelem();
411  ASSERT(std::all_of(data.begin(), data.end(), [nrows](const ConstStridedData& d) { return d.nelem() == nrows; }));
412 
413  if (nrows <= maxRowsPerFrame) {
414  core::encodeFrame(out, columns, data, properties);
415  } else {
416  std::vector<ConstStridedData> sliced;
417  sliced.reserve(ncols);
418  size_t start = 0;
419  while (start < nrows) {
420  size_t nelem = std::min(nrows - start, maxRowsPerFrame);
421  for (const ConstStridedData& sd : data) {
422  sliced.emplace_back(sd.slice(start, nelem));
423  }
424  core::encodeFrame(out, columns, sliced, properties);
425  start += nelem;
426  sliced.clear();
427  }
428  }
429 }
430 
431 //----------------------------------------------------------------------------------------------------------------------
432 
433 } // namespace api
434 } // namespace odc
static const LibOdc & instance()
Definition: LibOdc.cc:27
virtual std::string gitsha1(unsigned int count) const
Definition: LibOdc.cc:36
virtual std::string version() const
Definition: LibOdc.cc:34
static double realMDI()
Definition: MDI.h:21
static double integerMDI()
Definition: MDI.h:22
void treatIntegersAsDoubles(bool flag)
static ODBAPISettings & instance()
unsigned long pass1(T b, const T e)
iterator end() const
Definition: TextReader.cc:77
iterator begin()
Definition: TextReader.cc:71
std::shared_ptr< DecoderImpl > impl_
Definition: Odb.h:156
Decoder(const std::vector< std::string > &columns, std::vector< StridedData > &columnFacades)
Definition: Odb.cc:100
const std::vector< ColumnInfo > & columnInfo() const
Definition: Odb.cc:326
Frame(Reader &reader)
Definition: Odb.cc:303
Span span(const std::vector< std::string > &columns, bool onlyConstantValues) const
Definition: Odb.cc:336
std::unique_ptr< FrameImpl > impl_
Definition: Odb.h:137
bool next(bool aggregated=true, long rowlimit=-1)
Definition: Odb.cc:311
void decode(Decoder &target, size_t nthreads) const
Definition: Odb.cc:331
size_t rowCount() const
Definition: Odb.cc:316
size_t columnCount() const
Definition: Odb.cc:321
Reader(const std::string &path)
Definition: Odb.cc:81
static void setDoubleMissingValue(double val)
Definition: Odb.cc:351
static void treatIntegersAsDoubles(bool flag)
Definition: Odb.cc:343
static const std::string & version()
Definition: Odb.cc:355
static void setIntegerMissingValue(long val)
Definition: Odb.cc:347
static long integerMissingValue()
Definition: Odb.cc:365
static const std::string & gitsha1()
Definition: Odb.cc:360
static double doubleMissingValue()
Definition: Odb.cc:369
eckit::Length length() const
Definition: Odb.cc:126
Span(std::shared_ptr< SpanImpl > s)
Definition: Odb.cc:116
std::shared_ptr< SpanImpl > impl_
Definition: Odb.h:85
void visit(SpanVisitor &visitor) const
Definition: Odb.cc:118
eckit::Offset offset() const
Definition: Odb.cc:122
virtual ~SpanVisitor()
Definition: Odb.cc:108
DecodeTarget(const std::vector< std::string > &columns, const std::vector< api::StridedData > &facades)
Definition: DecodeTarget.cc:19
size_t rowCount() const
Definition: Table.cc:50
void encode(DataHandle &out, const std::vector< ColumnInfo > &columns, const std::vector< ConstStridedData > &data, const std::map< std::string, std::string > &properties, size_t maxRowsPerFrame)
Definition: Odb.cc:400
size_t odbFromCSV(DataHandle &dh_in, DataHandle &dh_out, const std::string &delimiter)
odbFromCSV returns number of lines imported
Definition: Odb.cc:375
void encodeFrame(eckit::DataHandle &out, const std::vector< api::ColumnInfo > &columns, const std::vector< api::ConstStridedData > &data, const std::map< std::string, std::string > &properties)
Definition: Encoder.cc:25
Definition: ColumnInfo.h:23
Definition: encode.cc:30
std::string name
Definition: ColumnInfo.h:36
Internal types.
Definition: Odb.cc:43
std::vector< ColumnInfo > columnInfo_
Definition: Odb.cc:60
const std::vector< ColumnInfo > & columnInfo() const
Definition: Odb.cc:185
FrameImpl(Reader &reader)
Definition: Odb.cc:134
void decode(Decoder &target, size_t nthreads)
Definition: Odb.cc:239
Span span(const std::vector< std::string > &columns, bool onlyConstantValues)
Definition: Odb.cc:292
core::TablesReader & reader_
Definition: Odb.cc:61
std::vector< core::Table > tables_
Definition: Odb.cc:63
bool next(bool aggregated, long rowlimit)
Definition: Odb.cc:146
core::TablesReader::iterator it_
Definition: Odb.cc:62
size_t rowCount() const
Definition: Odb.cc:229
size_t columnCount() const
Definition: Odb.cc:234
SpanImpl(core::Span &&s)
Definition: Odb.cc:113