IODA Bundle
ReaderIterator.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "odc/ReaderIterator.h"
12 
13 #include "eckit/io/DataHandle.h"
14 
15 #include "odc/core/Codec.h"
16 #include "odc/core/Header.h"
18 #include "odc/Reader.h"
19 
20 using namespace eckit;
21 using namespace odc::core;
22 
23 namespace odc {
24 
25 ReaderIterator::ReaderIterator(Reader &owner)
26 : owner_(owner),
27  columns_(0),
28  lastValues_(0),
29  columnOffsets_(0),
30  rowDataSizeDoubles_(0),
31  nrows_(0),
32  rowsRemainingInTable_(0),
33  f_(owner_.dataHandle()->clone()),
34  newDataset_(false),
35  rowDataBuffer_(0),
36  noMore_(false),
37  headerCounter_(0),
38  byteOrder_(BYTE_ORDER_INDICATOR),
39  refCount_(0)
40 {
41  ASSERT(f_);
42  f_->openForRead();
43 
45 }
46 
47 eckit::DataHandle* ReaderIterator::dataHandle()
48 {
49  ASSERT(f_);
50  return f_.get();
51 }
52 
53 ReaderIterator::ReaderIterator(Reader &owner, const PathName& pathName)
54 : owner_(owner),
55  columns_(0),
56  lastValues_(0),
57  columnOffsets_(0),
58  rowDataSizeDoubles_(0),
59  nrows_(0),
60  rowsRemainingInTable_(0),
61  f_(odc::DataHandleFactory::openForRead(pathName)),
62  newDataset_(false),
63  rowDataBuffer_(0),
64  noMore_(false),
65  headerCounter_(0),
66  byteOrder_(BYTE_ORDER_INDICATOR),
67  refCount_(0)
68 {
69  ASSERT(f_);
70 
72 }
73 
75 
76  if (noMore_) return false;
77 
78  ASSERT(rowsRemainingInTable_ == 0);
79 
80  // Keep going until we find a valid header, or run out of data
81  // n.b. an empty table is legit, so we need a loop.
82 
83  while (true) {
84 
85  // Check the magic. If no more data, we are done
86 
87  if (!Header::readMagic(*f_)) {
88  noMore_ = true;
89  return false;
90  }
91 
92  // Read in the rest of the header
93 
94  Header header(columns_, properties_);
95  header.loadAfterMagic(*f_);
96 
97  byteOrder_ = header.byteOrder();
100 
101  // Ensure the decode buffers are all set up
102 
103  initRowBuffer();
104 
105  // Read in the data into a buffer and initialise the DataStream.
106 
107  size_t dataSize = header.dataSize();
108 
109  // It is perfectly legitimate to have zero rows in an ODB. If that is the case,
110  // then loop around again.
111 
112  if (dataSize == 0) {
113  ASSERT(header.rowsNumber() == 0);
114  } else {
115 
116  // Read the expected data into the rows buffer.
117 
118  ASSERT(header.rowsNumber() != 0);
119  ASSERT(dataSize >= 2);
120 
121  if (!readBuffer(dataSize)) {
122  // See ODB-376
123  throw SeriousBug("Expected row data to follow table header");
124  }
125 
126  // And we are done
127  newDataset_ = true;
129  return true;
130  }
131  }
132 }
133 
135 {
136  Log::debug() << "ReaderIterator::~ReaderIterator: headers read: " << headerCounter_ << " rows:" << nrows_ << std::endl;
137 
138  close();
139  delete [] lastValues_;
140  delete [] columnOffsets_;
141 }
142 
143 
145 {
146  //ASSERT(&other == 0);
147  return noMore_;
148 }
149 
151 {
152  int32_t numDoubles = rowDataSizeDoubles();
153  size_t nCols = columns().size();
154 
155  delete [] lastValues_;
156  lastValues_ = new double [numDoubles];
157 
158  codecs_.clear();
159  codecs_.resize(nCols, 0);
160 
161  delete [] columnOffsets_;
162  columnOffsets_ = new size_t[nCols];
163 
164  size_t offset = 0;
165  for(size_t i = 0; i < nCols; i++)
166  {
167  codecs_[i] = &columns()[i]->coder();
168  lastValues_[offset] = codecs_[i]->missingValue();
169  columnOffsets_[i] = offset;
170  offset += columns()[i]->dataSizeDoubles();
171  }
172 }
173 
174 size_t ReaderIterator::readBuffer(size_t dataSize) {
175 
176  // Ensure we have enough buffer space
177 
178  if (rowDataBuffer_.size() < dataSize) {
179  rowDataBuffer_ = eckit::Buffer(dataSize);
180  }
181 
182  // Read the data into a buffer
183 
184  size_t bytesRead = f_->read(rowDataBuffer_, dataSize);
185  if (bytesRead == 0) return 0;
186 
187  if (bytesRead != dataSize) {
188  std::stringstream ss;
189  ss << "Failed to read " << dataSize << " bytes of encoded data";
190  throw ODBIncomplete(ss.str(), Here());
191  }
192 
193  // Assign the data to a DataStream.
194 
196 
197  // Assign the appropriate data stream to each of the codecs.
198 
199  for (auto& codec : codecs_) codec->setDataStream(rowDataStream_);
200 
201  return bytesRead;
202 }
203 
205 {
206  newDataset_ = false;
207  if (noMore_)
208  return false;
209 
210  if (rowsRemainingInTable_ == 0) {
211  if (!loadHeaderAndBufferData()) return false;
212  ASSERT(rowsRemainingInTable_ != 0);
213  }
214 
215  unsigned char marker[2];
216  rowDataStream_.readBytes(marker, sizeof(marker));
217 
218  int startCol = (marker[0] * 256) + marker[1];
219 
220  size_t nCols = columns().size();
221  for(size_t i = startCol; i < nCols; i++) {
222  codecs_[i]->decode(&lastValues_[columnOffsets_[i]]);
223  }
224 
225  ++nrows_ ;
227  return nCols;
228 }
229 
231 
232  size_t total = 0;
233  for (const auto& column : columns()) {
234  total += column->dataSizeDoubles();
235  }
236  return total;
237 }
238 
239 
241 
242 double& ReaderIterator::data(size_t i)
243 {
244  ASSERT(i >= 0 && i < columns().size());
245  return lastValues_[columnOffsets_[i]];
246 }
247 
249 {
250  f_.reset();
251  return 0;
252 }
253 
254 
255 std::string ReaderIterator::property(std::string key)
256 {
257  return properties_[key];
258 }
259 
260 
261 api::ColumnType ReaderIterator::columnType(unsigned long index) { return columns_[index]->type(); }
262 const std::string& ReaderIterator::columnName(unsigned long index) const { return columns_[index]->name(); }
263 const std::string& ReaderIterator::codecName(unsigned long index) const { return columns_[index]->coder().name(); }
264 double ReaderIterator::columnMissingValue(unsigned long index) { return columns_[index]->missingValue(); }
265 const eckit::sql::BitfieldDef& ReaderIterator::bitfieldDef(unsigned long index) { return columns_[index]->bitfieldDef(); }
266 
267 } // namespace odc
268 
std::unique_ptr< eckit::DataHandle > f_
const std::string & codecName(unsigned long index) const
ReaderIterator(Reader &owner)
eckit::Buffer rowDataBuffer_
size_t rowDataSizeDoublesInternal() const
core::MetaData columns_
const std::string & columnName(unsigned long index) const
bool operator!=(const ReaderIterator &other)
std::vector< core::Codec * > codecs_
const eckit::sql::BitfieldDef & bitfieldDef(unsigned long index)
api::ColumnType columnType(unsigned long index)
unsigned long long nrows_
void property(std::string, std::string)
const double * data() const
unsigned long headerCounter_
size_t readBuffer(size_t dataSize)
core::Properties properties_
const core::MetaData & columns() const
size_t rowDataSizeDoubles() const
double columnMissingValue(unsigned long index)
core::GeneralDataStream rowDataStream_
eckit::DataHandle * dataHandle()
void readBytes(Args &&... args)
Definition: DataStream.h:144
size_t rowsNumber() const
Definition: Header.h:57
void loadAfterMagic(eckit::DataHandle &dh)
Definition: Header.cc:119
size_t dataSize() const
Definition: Header.h:55
int32_t byteOrder()
Definition: Header.h:59
const int32_t BYTE_ORDER_INDICATOR
Definition: Header.h:40
Definition: ColumnInfo.h:23