16 #include "eckit/exception/Exceptions.h"
17 #include "eckit/io/DataHandle.h"
23 using namespace eckit;
31 WriterBufferingIterator::WriterBufferingIterator(
Owner &owner, DataHandle *dh,
bool openDataHandle,
const odc::sql::TableDef* tableDef)
42 initialisedColumns_(false),
46 rowsBufferSize_(owner.rowsBufferSize()),
48 openDataHandle_(openDataHandle)
93 for (
const Column* column :
columns_) ASSERT(column->hasInitialisedCoder());
101 int32_t colSize =
columns().size();
112 for (
int i = 0;
i < colSize; ++
i) {
115 ASSERT(
columns_[
i]->hasInitialisedCoder());
120 offset +=
columns_[
i]->dataSizeDoubles();
152 ASSERT(
columns_[
i]->hasInitialisedCoder());
175 ASSERT(nCols ==
columns().size());
197 for (
const auto& column :
columns()) {
198 total += column->dataSizeDoubles();
211 for (; k <
columns().size(); ++k) {
219 static_cast<uint8_t
>((k / 256) % 256),
220 static_cast<uint8_t
>(k % 256)
228 char* p = stream.
get();
230 for (
size_t i = k;
i <
columns().size(); ++
i) {
246 Length estimatedLen = 20 * 1024 * 1024;
247 f_->openForWrite(estimatedLen);
256 ASSERT(index <
columns().size());
264 std::stringstream ss;
265 ss <<
"Attempting to create multiple columns with the same name: " <<
name;
266 throw SeriousBug(ss.str(), Here());
280 ASSERT(index <
columns().size());
315 size_t rowsWritten = 0;
316 unsigned char* p =
reinterpret_cast<unsigned char*
>(
rowsBuffer_.data());
318 doWriteRow(encodedStream,
reinterpret_cast<double *
>(p +
sizeof(uint16_t)));
327 ASSERT(encodedHeader.second <= encodedHeader.first.size());
329 Log::debug() <<
"WriterBufferingIterator::flush: header size: " << encodedHeader.second << std::endl;
331 ASSERT(
dataHandle().
write(encodedHeader.first, encodedHeader.second) ==
long(encodedHeader.second));
334 Log::debug() <<
"WriterBufferingIterator::flush: flushed " << rowsWritten <<
" rows." << std::endl;
366 std::vector<eckit::PathName> r;
static void count(void *counter, const double *data, size_t n)
int doWriteRow(core::DataStream< core::SameByteOrder > &stream, const double *values)
void missingValue(size_t i, double)
std::vector< eckit::PathName > outputFiles()
unsigned char * nextRowInBuffer_
size_t rowDataSizeDoublesInternal() const
int setBitfieldColumn(size_t index, std::string name, api::ColumnType type, eckit::sql::BitfieldDef b)
eckit::DataHandle & dataHandle()
std::pair< eckit::Buffer, size_t > serializeHeader(size_t dataSize, size_t rowsNumber)
codec::CodecOptimizer codecOptimizer_
size_t * columnByteSizes_
~WriterBufferingIterator()
int setColumn(size_t index, std::string name, api::ColumnType type)
size_t rowDataSizeDoubles() const
eckit::Buffer rowsBuffer_
unsigned long gatherStats(const double *values, unsigned long count)
size_t rowDataSizeDoubles_
int writeRow(const double *values, unsigned long count)
core::Properties properties_
unsigned long long nrows_
const core::MetaData & columns() const
int setOptimalCodecs(core::MetaData &columns)
void bitfieldDef(const eckit::sql::BitfieldDef &b)
void missingValue(double v)
void name(const std::string name)
static api::ColumnType type(const std::string &)
void writeBytes(const void *addr, size_t bytes)
eckit::Offset position() const
IODA_DL void copy(const ObjectSelection &from, ObjectSelection &to, const ScaleMapping &scale_map)
Generic data copying function.
subroutine write(self, filename_in)