8 #include "oops/util/abor1_cpp.h"
10 #include "oops/util/Logger.h"
12 #include "ioda/distribution/DistributionFactory.h"
13 #include "ioda/io/ObsFrameRead.h"
14 #include "ioda/io/ObsIoFactory.h"
24 eckit::LocalConfiguration distConf =
params.top_level_.toConfiguration();
25 distname_ = distConf.getString(
"distribution",
"RoundRobin");
35 eckit::LocalConfiguration rrConf;
36 rrConf.set(
"distribution",
"RoundRobin");
44 oops::Log::debug() <<
"ObsFrameRead: maximum frame size: " <<
max_frame_size_ << std::endl;
84 if (haveAnotherFrame) {
92 for (
auto & varNameObject :
obs_io_->varList()) {
93 std::string varName = varNameObject.first;
94 Variable sourceVar = varNameObject.second;
108 if (destVar.
isA<
int>()) {
109 std::vector<int> varValues;
110 sourceVar.
read<
int>(varValues, memBufferSelect, obsIoSelect);
111 destVar.
write<
int>(varValues, memBufferSelect, obsFrameSelect);
112 }
else if (destVar.
isA<
float>()) {
113 std::vector<float> varValues;
114 sourceVar.
read<
float>(varValues, memBufferSelect, obsIoSelect);
115 destVar.
write<
float>(varValues, memBufferSelect, obsFrameSelect);
116 }
else if (destVar.
isA<std::string>()) {
117 std::vector<std::string> varValues;
118 sourceVar.
read<std::string>(varValues, memBufferSelect, obsIoSelect);
119 destVar.
write<std::string>(varValues, memBufferSelect, obsFrameSelect);
131 if (
obs_io_->eachProcessGeneratesSeparateObs()) {
137 dist_->computePatchLocs();
139 return (haveAnotherFrame);
154 if (
obs_io_->isVarDimByNlocs(varName)) {
165 return readFrameVarHelper<int>(varName, varData);
168 return readFrameVarHelper<float>(varName, varData);
171 std::vector<std::string> & varData) {
172 return readFrameVarHelper<std::string>(varName, varData);
178 os <<
"ObsFrameRead: " << std::endl;
187 if (count < 0) { count = 0; }
199 indexedFrameSelect.
extent(varShape)
201 for (std::size_t i = 1; i < varShape.size(); ++i) {
202 std::vector<Dimensions_t> dimIndex(varShape[i]);
203 std::iota(dimIndex.begin(), dimIndex.end(), 0);
206 return indexedFrameSelect;
213 std::vector<Dimensions_t> locIndex;
214 std::vector<Dimensions_t> frameIndex;
221 if (
obs_io_->applyTimingWindow()) {
228 std::vector<Dimensions_t> records;
229 const std::vector<std::string> &obsGroupVarList =
obs_io_->obsGroupingVars();
230 if (obsGroupVarList.empty()) {
247 std::vector<Dimensions_t> & frameIndex) {
248 Dimensions_t locSize = this->
frameCount(
"nlocs");
251 locIndex.resize(locSize);
252 std::iota(locIndex.begin(), locIndex.end(), this->frameStart());
254 frameIndex.resize(locSize);
255 std::iota(frameIndex.begin(), frameIndex.end(), 0);
260 std::vector<Dimensions_t> & frameIndex) {
265 std::string dtVarName;
267 dtVarName = std::string(
"MetaData/datetime");
269 dtVarName = std::string(
"MetaData/time");
272 std::string(
"ERROR: ObsFrameRead::genFrameLocationsTimeWindow: ") +
273 std::string(
"date time information does not exist, ") +
274 std::string(
"cannot perform time window filtering");
275 throw eckit::UserError(ErrMsg, Here());
285 std::vector<util::DateTime> dtimeVals;
286 if (dtVarName ==
"MetaData/datetime") {
287 std::vector<std::string> dtValues;
288 dtVar.
read<std::string>(dtValues, memSelect, frameSelect);
293 std::vector<float> dtValues;
294 dtVar.
read<
float>(dtValues, memSelect, frameSelect);
298 this->
obs_io_->atts().open(
"date_time").read<
int>(refDtime);
307 std::size_t iloc = 0;
308 for (std::size_t i = 0; i <
frameCount; ++i) {
311 frameIndex[iloc] = i;
317 locIndex.resize(iloc);
318 frameIndex.resize(iloc);
324 std::vector<Dimensions_t> & records) {
326 Dimensions_t locSize = locIndex.size();
327 records.assign(locSize, 0);
328 for (std::size_t i = 0; i < locSize; ++i) {
336 const std::vector<Dimensions_t> & frameIndex,
337 std::vector<Dimensions_t> & records) {
343 std::size_t locSize = frameIndex.size();
344 records.assign(locSize, 0);
345 std::vector<std::string> obsGroupingKeys(locSize);
348 for (std::size_t i = 0; i < locSize; ++i) {
353 std::pair<std::string, std::size_t>(obsGroupingKeys[i],
next_rec_num_));
362 const std::vector<Dimensions_t> & frameIndex,
363 std::vector<std::string> & groupingKeys) {
366 for (std::size_t i = 0; i < obsGroupVarList.size(); ++i) {
370 std::string obsGroupVarName = obsGroupVarList[i];
371 std::string varName = std::string(
"MetaData/") + obsGroupVarName;
373 if (!
obs_io_->isVarDimByNlocs(varName)) {
375 std::string(
"ERROR: ObsFrameRead::genRecordNumbersGrouping: ") +
376 std::string(
"obs grouping variable (") + obsGroupVarName +
377 std::string(
") must have 'nlocs' as first dimension");
389 std::string keySegment;
390 if (groupVar.
isA<
int>()) {
391 std::vector<int> groupVarValues;
392 groupVar.
read<
int>(groupVarValues, memSelect, frameSelect);
394 for (std::size_t j = 0; j < frameIndex.size(); ++j) {
395 keySegment = std::to_string(groupVarValues[frameIndex[j]]);
397 groupingKeys[j] = keySegment;
399 groupingKeys[j] +=
":";
400 groupingKeys[j] += keySegment;
403 }
else if (groupVar.
isA<
float>()) {
404 std::vector<float> groupVarValues;
405 groupVar.
read<
float>(groupVarValues, memSelect, frameSelect);
407 for (std::size_t j = 0; j < frameIndex.size(); ++j) {
408 keySegment = std::to_string(groupVarValues[frameIndex[j]]);
410 groupingKeys[j] = keySegment;
412 groupingKeys[j] +=
":";
413 groupingKeys[j] += keySegment;
416 }
else if (groupVar.
isA<std::string>()) {
417 std::vector<std::string> groupVarValues;
418 groupVar.
read<std::string>(groupVarValues, memSelect, frameSelect);
420 for (std::size_t j = 0; j < frameIndex.size(); ++j) {
421 keySegment = groupVarValues[frameIndex[j]];
423 groupingKeys[j] = keySegment;
425 groupingKeys[j] +=
":";
426 groupingKeys[j] += keySegment;
435 const std::vector<Dimensions_t> & locIndex,
436 const std::vector<Dimensions_t> & records) {
438 Dimensions_t locSize = locIndex.size();
439 std::vector<float> lats(locSize, 0);
440 std::vector<float> lons(locSize, 0);
447 throw eckit::UserError(
"MetaData/longitude not found in observations file", Here());
454 latLonVar.
read(lons, memSelect, frameSelect);
458 throw eckit::UserError(
"MetaData/latitude not found in observations file", Here());
461 latLonVar.
read(lats, memSelect, frameSelect);
473 std::vector<int> saved_index(locSize, 0);
474 std::vector<int> saved_record_number(locSize, 0);
479 throw eckit::UserError(
"MetaData/saved_record_number not found in observations file",
486 locVar.
read(saved_record_number, locMemSelect, locFrameSelect);
490 throw eckit::UserError(
"MetaData/saved_index not found in observations file", Here());
493 locVar.
read(saved_index, locMemSelect, locFrameSelect);
498 const std::size_t commSize =
params_.
comm().size();
499 const std::size_t commRank =
params_.
comm().rank();
500 std::size_t rowNum = 0;
501 std::size_t recNum = 0;
502 std::size_t frameIndex = 0;
503 std::size_t globalLocIndex = 0;
505 for (std::size_t i = 0; i < locSize; ++i) {
507 rowNum = saved_index[i];
508 recNum = saved_record_number[i];
511 rowNum = locIndex[i];
518 eckit::geometry::Point2 point(lons[frameIndex], lats[frameIndex]);
520 std::size_t globalLocIndex = rowNum;
524 globalLocIndex = rowNum * commSize + commRank;
526 dist_->assignRecord(recNum, globalLocIndex, point);
528 if (dist->isMyRecord(recNum)) {
529 indx_.push_back(globalLocIndex);
static std::unique_ptr< Distribution > create(const eckit::mpi::Comm &comm, const eckit::Configuration &config)
Create a Distribution object implementing a particular method of distributing observations across mul...
Dimensions_t nlocs_
number of locations from source (file or generator)
std::shared_ptr< ObsIo > obs_io_
ObsIo object.
Selection createMemSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameCount)
create selection object for accessing a memory buffer
void createFrameFromObsGroup(const VarNameObjectList &varList, const VarNameObjectList &dimVarList, const VarDimMap &varDimMap)
create a frame object based on dimensions and variables from a source ObsGroup
Dimensions_t max_frame_size_
maximum frame size
Dimensions_t max_var_size_
maximum variable size
VarDimMap ioVarDimMap() const
return map from variables to their attached dimension scales
Selection createEntireFrameSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameCount)
create selection object for accessing the entire frame variable
Dimensions_t frame_start_
current frame starting index
Selection createObsIoSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameStart, const Dimensions_t frameCount)
create selection object for accessing an ObsIo variable
Dimensions_t gnlocs_outside_timewindow_
number of nlocs from the file (gnlocs) that are outside the time window
Dimensions_t gnlocs_
total number of locations from source (file or generator) that were selected after the timing window ...
Dimensions_t nrecs_
number of records from source (file or generator)
ObsSpaceParameters params_
ObsIo parameter specs.
ObsGroup obs_frame_
ObsGroup object (temporary storage for a single frame)
std::map< std::string, std::size_t > obs_grouping_
map for obs grouping via string keys
Dimensions_t adjusted_nlocs_frame_start_
current frame start for variable dimensioned along nlocs
Selection createIndexedFrameSelection(const std::vector< Dimensions_t > &varShape)
set up frontend and backend selection objects for the given variable
void frameNext() override
move to the next frame
void applyMpiDistribution(const std::shared_ptr< Distribution > &dist, const std::vector< Dimensions_t > &locIndex, const std::vector< Dimensions_t > &records)
apply MPI distribution
Dimensions_t adjusted_nlocs_frame_count_
current frame count for variable dimensioned along nlocs
Dimensions_t frameCount(const std::string &varName) override
return current frame count for variable
std::size_t rec_num_increment_
spacing between record numbers assigned on this process.
VarDimMap dims_attached_to_vars_
map showing association of dim names with each variable name
std::size_t next_rec_num_
next available record number
std::string distname_
\Brief Distribution Name
bool frameAvailable() override
true if a frame is available (not past end of frames)
void frameInit() override
initialize for walking through the frames
bool each_process_reads_separate_obs_
true if obs_io_ produces a different series of observations on each process, false if they are all th...
Dimensions_t basicFrameCount(const Variable &var)
return current frame count for variable
std::set< std::size_t > unique_rec_nums_
unique record numbers
bool readFrameVar(const std::string &varName, std::vector< int > &varData)
read a frame variable
void print(std::ostream &os) const override
print routine for oops::Printable base class
std::shared_ptr< Distribution > dist_
MPI distribution object.
void buildObsGroupingKeys(const std::vector< std::string > &obsGroupVarList, const std::vector< Dimensions_t > &frameIndex, std::vector< std::string > &groupingKeys)
generate string keys for record number assignment
void genRecordNumbersGrouping(const std::vector< std::string > &obsGroupVarList, const std::vector< Dimensions_t > &frameIndex, std::vector< Dimensions_t > &records)
generate record numbers considering obs grouping
std::map< std::vector< std::string >, Selection > known_frame_selections_
cache for frame selection
void genRecordNumbersAll(const std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &records)
generate record numbers where each location is a unique record (no grouping)
std::map< std::vector< std::string >, Selection > known_mem_selections_
cache for memory buffer selection
ObsFrameRead(const ObsSpaceParameters ¶ms)
Dimensions_t frameStart() override
return current frame starting index
void genFrameLocationsAll(std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &frameIndex)
generate indices for all locations in current frame
bool insideTimingWindow(const util::DateTime &ObsDt)
std::vector< std::size_t > recnums_
record numbers associated with the location indexes
std::vector< std::size_t > indx_
indexes of locations to extract from the input obs file
void genFrameLocationsTimeWindow(std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &frameIndex)
generate indices for locations in current frame after filtering out obs outside DA timing window
std::vector< Dimensions_t > frame_loc_index_
location indices for current frame
void genFrameIndexRecNums(std::shared_ptr< Distribution > &dist)
generate frame indices and corresponding record numbers
void resize(const std::vector< std::pair< Variable, ioda::Dimensions_t >> &newDims)
Resize a Dimension and every Variable that depends on it.
static std::shared_ptr< ObsIo > create(ObsIoModes mode, const ObsSpaceParameters ¶meters)
Create and return a new instance of an ObsIo subclass.
const util::DateTime & windowEnd() const
return the end of the DA timing window
const util::DateTime & windowStart() const
return the start of the DA timing window
const eckit::mpi::Comm & comm() const
return the associated MPI group communicator
A Selection represents the bounds of the data, in ioda or in userspace, that you are reading or writi...
Has_Variables vars
Use this to access variables.
virtual Variable open(const std::string &name) const
Open a Variable by name.
virtual bool exists(const std::string &name) const
Does a Variable with the specified name exist?
bool isA() const
Convenience function to check a Variable's storage type.
virtual bool isDimensionScale() const
Is this Variable used as a dimension scale?
virtual Dimensions getDimensions() const
virtual Variable read(gsl::span< char > data, const Type &in_memory_dataType, const Selection &mem_selection=Selection::all, const Selection &file_selection=Selection::all) const
Read the Variable - as char array. Ordering is row-major.
virtual Variable write(gsl::span< char > data, const Type &in_memory_dataType, const Selection &mem_selection=Selection::all, const Selection &file_selection=Selection::all)
The fundamental write function. Backends overload this function to implement all write operations.
virtual Variable resize(const std::vector< Dimensions_t > &newDims)
Resize the variable.
Selection & extent(const VecDimensions_t &sz)
Provide the dimensions of the object that you are selecting from.
Selection & select(const SingleSelection &s)
Append a new selection.
std::vector< util::DateTime > convertRefOffsetToDtime(const int refIntDtime, const std::vector< float > &timeOffsets)
convert reference, time to DateTime object
std::vector< util::DateTime > convertDtStringsToDtime(const std::vector< std::string > &dtStrings)
convert datetime strings to DateTime object
std::vector< Dimensions_t > dimsCur
The dimensions of the data.