17 #include "eckit/filesystem/PathName.h"
18 #include "eckit/io/HandleBuf.h"
19 #include "eckit/io/MemoryHandle.h"
20 #include "eckit/log/Log.h"
33 using namespace eckit;
48 bool next(
bool aggregated,
long rowlimit);
50 const std::vector<ColumnInfo>& columnInfo()
const;
52 size_t rowCount()
const;
53 size_t columnCount()
const;
55 void decode(
Decoder& target,
size_t nthreads);
56 Span span(
const std::vector<std::string>&
columns,
bool onlyConstantValues);
74 using core::TablesReader::TablesReader;
81 Reader::Reader(
const std::string& path) :
101 std::vector<StridedData>& columnFacades) :
119 impl_->visit(visitor);
123 return impl_->offset();
127 return impl_->length();
135 reader_(*reader.impl_),
136 it_(reader_.begin()),
140 columnInfo_(rhs.columnInfo_),
141 reader_(rhs.reader_),
143 tables_(rhs.tables_),
144 first_(rhs.first_) {}
163 long nrows =
tables_.back().rowCount();
171 long next_nrows = nrows + it_next->
rowCount();
172 if (rowlimit >= 0 && next_nrows > rowlimit)
break;
173 if (!
tables_.front().columns().compatible(it_next->columns()))
break;
181 ASSERT(rowlimit < 0 || nrows <= rowlimit);
199 const eckit::sql::BitfieldDef& bf(col->bitfieldDef());
201 ASSERT(bf.first.size() == bf.second.size());
202 std::vector<ColumnInfo::Bit> bitfield;
203 bitfield.reserve(bf.first.size());
206 for (
size_t i = 0;
i < bf.first.size();
i++) {
212 offset += bf.second[
i];
220 col->dataSizeDoubles() *
sizeof(
double),
231 [](
size_t n,
const core::Table& t) { return n + t.rowCount(); });
235 ASSERT_MSG(!
tables_.empty(),
"No tables. Have you remembered to call odc_next_frame() on frame?");
236 return tables_[0].columnCount();
245 std::vector<core::DecodeTarget> targets;
247 size_t rowOffset = 0;
249 size_t rows = t.rowCount();
254 targets.emplace_back(subTarget);
260 std::mutex guard_mutex;
261 std::vector<std::future<void>> threads;
262 size_t next_frame = 0;
264 for (
size_t i = 0;
i < nthreads;
i++) {
265 threads.emplace_back(std::async(std::launch::async, [&] {
270 std::lock_guard<std::mutex> guard(guard_mutex);
271 if (next_frame <
tables_.size()) {
272 frame = next_frame++;
278 tables_[frame].decode(targets[frame]);
285 for (
auto& thread : threads) {
294 std::shared_ptr<SpanImpl> s(std::make_shared<SpanImpl>(
tables_.front().span(
columns, onlyConstantValues)));
297 s->extend(it->span(
columns, onlyConstantValues));
313 return impl_->next(aggregated, rowlimit);
318 return impl_->rowCount();
323 return impl_->columnCount();
328 return impl_->columnInfo();
333 impl_->decode(target, nthreads);
375 size_t odbFromCSV(DataHandle& dh_in, DataHandle& dh_out,
const std::string& delimiter) {
378 HandleBuf buf(dh_in);
379 std::istream is(&buf);
384 size_t odbFromCSV(std::istream& in, DataHandle& dh_out,
const std::string& delimiter) {
393 size_t odbFromCSV(
const std::string& in, eckit::DataHandle& dh_out,
const std::string& delimiter) {
394 MemoryHandle dh_in(in.c_str(), in.length());
396 AutoClose close(dh_in);
401 const std::vector<ColumnInfo>&
columns,
402 const std::vector<ConstStridedData>& data,
403 const std::map<std::string, std::string>& properties,
404 size_t maxRowsPerFrame) {
406 ASSERT(
columns.size() == data.size());
407 ASSERT(data.size() > 0);
409 size_t ncols = data.size();
410 size_t nrows = data[0].nelem();
411 ASSERT(std::all_of(data.begin(), data.end(), [nrows](
const ConstStridedData&
d) { return d.nelem() == nrows; }));
413 if (nrows <= maxRowsPerFrame) {
416 std::vector<ConstStridedData> sliced;
417 sliced.reserve(ncols);
419 while (start < nrows) {
420 size_t nelem = std::min(nrows - start, maxRowsPerFrame);
422 sliced.emplace_back(sd.slice(start, nelem));
static const LibOdc & instance()
virtual std::string gitsha1(unsigned int count) const
virtual std::string version() const
static double integerMDI()
void treatIntegersAsDoubles(bool flag)
static ODBAPISettings & instance()
unsigned long pass1(T b, const T e)
std::shared_ptr< DecoderImpl > impl_
Decoder(const std::vector< std::string > &columns, std::vector< StridedData > &columnFacades)
const std::vector< ColumnInfo > & columnInfo() const
Span span(const std::vector< std::string > &columns, bool onlyConstantValues) const
std::unique_ptr< FrameImpl > impl_
bool next(bool aggregated=true, long rowlimit=-1)
void decode(Decoder &target, size_t nthreads) const
size_t columnCount() const
Reader(const std::string &path)
static void setDoubleMissingValue(double val)
static void treatIntegersAsDoubles(bool flag)
static const std::string & version()
static void setIntegerMissingValue(long val)
static long integerMissingValue()
static const std::string & gitsha1()
static double doubleMissingValue()
eckit::Length length() const
Span(std::shared_ptr< SpanImpl > s)
std::shared_ptr< SpanImpl > impl_
void visit(SpanVisitor &visitor) const
eckit::Offset offset() const
DecodeTarget(const std::vector< std::string > &columns, const std::vector< api::StridedData > &facades)
void encode(DataHandle &out, const std::vector< ColumnInfo > &columns, const std::vector< ConstStridedData > &data, const std::map< std::string, std::string > &properties, size_t maxRowsPerFrame)
size_t odbFromCSV(DataHandle &dh_in, DataHandle &dh_out, const std::string &delimiter)
odbFromCSV returns number of lines imported
void encodeFrame(eckit::DataHandle &out, const std::vector< api::ColumnInfo > &columns, const std::vector< api::ConstStridedData > &data, const std::map< std::string, std::string > &properties)
std::vector< ColumnInfo > columnInfo_
const std::vector< ColumnInfo > & columnInfo() const
FrameImpl(Reader &reader)
void decode(Decoder &target, size_t nthreads)
Span span(const std::vector< std::string > &columns, bool onlyConstantValues)
core::TablesReader & reader_
std::vector< core::Table > tables_
bool next(bool aggregated, long rowlimit)
core::TablesReader::iterator it_
size_t columnCount() const