8 #include "oops/util/abor1_cpp.h"
10 #include "oops/util/Logger.h"
12 #include "ioda/distribution/DistributionFactory.h"
13 #include "ioda/io/ObsFrameRead.h"
14 #include "ioda/io/ObsIoFactory.h"
30 eckit::LocalConfiguration distConf;
31 distConf.set(
"distribution",
"RoundRobin");
38 oops::Log::debug() <<
"ObsFrameRead: maximum frame size: " <<
max_frame_size_ << std::endl;
78 if (haveAnotherFrame) {
86 for (
auto & varNameObject :
obs_io_->varList()) {
87 std::string varName = varNameObject.first;
88 Variable sourceVar = varNameObject.second;
102 if (destVar.
isA<
int>()) {
103 std::vector<int> varValues;
104 sourceVar.
read<
int>(varValues, memBufferSelect, obsIoSelect);
105 destVar.
write<
int>(varValues, memBufferSelect, obsFrameSelect);
106 }
else if (destVar.
isA<
float>()) {
107 std::vector<float> varValues;
108 sourceVar.
read<
float>(varValues, memBufferSelect, obsIoSelect);
109 destVar.
write<
float>(varValues, memBufferSelect, obsFrameSelect);
110 }
else if (destVar.
isA<std::string>()) {
111 std::vector<std::string> varValues;
112 sourceVar.
read<std::string>(varValues, memBufferSelect, obsIoSelect);
113 destVar.
write<std::string>(varValues, memBufferSelect, obsFrameSelect);
125 if (
obs_io_->eachProcessGeneratesSeparateObs()) {
131 dist_->computePatchLocs();
133 return (haveAnotherFrame);
148 if (
obs_io_->isVarDimByNlocs(varName)) {
159 return readFrameVarHelper<int>(varName, varData);
162 return readFrameVarHelper<float>(varName, varData);
165 std::vector<std::string> & varData) {
166 return readFrameVarHelper<std::string>(varName, varData);
172 os <<
"ObsFrameRead: " << std::endl;
181 if (count < 0) { count = 0; }
193 indexedFrameSelect.
extent(varShape)
195 for (std::size_t i = 1; i < varShape.size(); ++i) {
196 std::vector<Dimensions_t> dimIndex(varShape[i]);
197 std::iota(dimIndex.begin(), dimIndex.end(), 0);
200 return indexedFrameSelect;
207 std::vector<Dimensions_t> locIndex;
208 std::vector<Dimensions_t> frameIndex;
215 if (
obs_io_->applyTimingWindow()) {
222 std::vector<Dimensions_t> records;
223 const std::vector<std::string> &obsGroupVarList =
obs_io_->obsGroupingVars();
224 if (obsGroupVarList.empty()) {
241 std::vector<Dimensions_t> & frameIndex) {
242 Dimensions_t locSize = this->
frameCount(
"nlocs");
245 locIndex.resize(locSize);
246 std::iota(locIndex.begin(), locIndex.end(), this->frameStart());
248 frameIndex.resize(locSize);
249 std::iota(frameIndex.begin(), frameIndex.end(), 0);
254 std::vector<Dimensions_t> & frameIndex) {
259 std::string dtVarName;
261 dtVarName = std::string(
"MetaData/datetime");
263 dtVarName = std::string(
"MetaData/time");
266 std::string(
"ERROR: ObsFrameRead::genFrameLocationsTimeWindow: ") +
267 std::string(
"date time information does not exist, ") +
268 std::string(
"cannot perform time window filtering");
269 throw eckit::UserError(ErrMsg, Here());
279 std::vector<util::DateTime> dtimeVals;
280 if (dtVarName ==
"MetaData/datetime") {
281 std::vector<std::string> dtValues;
282 dtVar.
read<std::string>(dtValues, memSelect, frameSelect);
287 std::vector<float> dtValues;
288 dtVar.
read<
float>(dtValues, memSelect, frameSelect);
292 this->
obs_io_->atts().open(
"date_time").read<
int>(refDtime);
301 std::size_t iloc = 0;
302 for (std::size_t i = 0; i <
frameCount; ++i) {
305 frameIndex[iloc] = i;
311 locIndex.resize(iloc);
312 frameIndex.resize(iloc);
318 std::vector<Dimensions_t> & records) {
320 Dimensions_t locSize = locIndex.size();
321 records.assign(locSize, 0);
322 for (std::size_t i = 0; i < locSize; ++i) {
330 const std::vector<Dimensions_t> & frameIndex,
331 std::vector<Dimensions_t> & records) {
337 std::size_t locSize = frameIndex.size();
338 records.assign(locSize, 0);
339 std::vector<std::string> obsGroupingKeys(locSize);
342 for (std::size_t i = 0; i < locSize; ++i) {
347 std::pair<std::string, std::size_t>(obsGroupingKeys[i],
next_rec_num_));
356 const std::vector<Dimensions_t> & frameIndex,
357 std::vector<std::string> & groupingKeys) {
360 for (std::size_t i = 0; i < obsGroupVarList.size(); ++i) {
364 std::string obsGroupVarName = obsGroupVarList[i];
365 std::string varName = std::string(
"MetaData/") + obsGroupVarName;
367 if (!
obs_io_->isVarDimByNlocs(varName)) {
369 std::string(
"ERROR: ObsFrameRead::genRecordNumbersGrouping: ") +
370 std::string(
"obs grouping variable (") + obsGroupVarName +
371 std::string(
") must have 'nlocs' as first dimension");
383 std::string keySegment;
384 if (groupVar.
isA<
int>()) {
385 std::vector<int> groupVarValues;
386 groupVar.
read<
int>(groupVarValues, memSelect, frameSelect);
388 for (std::size_t j = 0; j < frameIndex.size(); ++j) {
389 keySegment = std::to_string(groupVarValues[frameIndex[j]]);
391 groupingKeys[j] = keySegment;
393 groupingKeys[j] +=
":";
394 groupingKeys[j] += keySegment;
397 }
else if (groupVar.
isA<
float>()) {
398 std::vector<float> groupVarValues;
399 groupVar.
read<
float>(groupVarValues, memSelect, frameSelect);
401 for (std::size_t j = 0; j < frameIndex.size(); ++j) {
402 keySegment = std::to_string(groupVarValues[frameIndex[j]]);
404 groupingKeys[j] = keySegment;
406 groupingKeys[j] +=
":";
407 groupingKeys[j] += keySegment;
410 }
else if (groupVar.
isA<std::string>()) {
411 std::vector<std::string> groupVarValues;
412 groupVar.
read<std::string>(groupVarValues, memSelect, frameSelect);
414 for (std::size_t j = 0; j < frameIndex.size(); ++j) {
415 keySegment = groupVarValues[frameIndex[j]];
417 groupingKeys[j] = keySegment;
419 groupingKeys[j] +=
":";
420 groupingKeys[j] += keySegment;
429 const std::vector<Dimensions_t> & locIndex,
430 const std::vector<Dimensions_t> & records) {
432 Dimensions_t locSize = locIndex.size();
433 std::vector<float> lats(locSize, 0);
434 std::vector<float> lons(locSize, 0);
441 throw eckit::UserError(
"MetaData/longitude not found in observations file", Here());
448 latLonVar.
read(lons, memSelect, frameSelect);
452 throw eckit::UserError(
"MetaData/latitude not found in observations file", Here());
455 latLonVar.
read(lats, memSelect, frameSelect);
459 const std::size_t commSize =
params_.
comm().size();
460 const std::size_t commRank =
params_.
comm().rank();
462 for (std::size_t i = 0; i < locSize; ++i) {
463 std::size_t rowNum = locIndex[i];
464 std::size_t recNum = records[i];
468 eckit::geometry::Point2 point(lons[frameIndex], lats[frameIndex]);
470 std::size_t globalLocIndex = rowNum;
474 globalLocIndex = rowNum * commSize + commRank;
476 dist_->assignRecord(recNum, globalLocIndex, point);
478 if (dist->isMyRecord(recNum)) {
479 indx_.push_back(globalLocIndex);
static std::unique_ptr< Distribution > create(const eckit::mpi::Comm &comm, const eckit::Configuration &config)
Create a Distribution object implementing a particular method of distributing observations across mul...
Dimensions_t nlocs_
number of locations from source (file or generator)
std::shared_ptr< ObsIo > obs_io_
ObsIo object.
Selection createMemSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameCount)
create selection object for accessing a memory buffer
void createFrameFromObsGroup(const VarNameObjectList &varList, const VarNameObjectList &dimVarList, const VarDimMap &varDimMap)
create a frame object based on dimensions and variables from a source ObsGroup
Dimensions_t max_frame_size_
maximum frame size
Dimensions_t max_var_size_
maximum variable size
VarDimMap ioVarDimMap() const
return map from variables to their attached dimension scales
Selection createEntireFrameSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameCount)
create selection object for accessing the entire frame variable
Dimensions_t frame_start_
current frame starting index
Selection createObsIoSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameStart, const Dimensions_t frameCount)
create selection object for accessing an ObsIo variable
Dimensions_t gnlocs_outside_timewindow_
number of nlocs from the file (gnlocs) that are outside the time window
Dimensions_t gnlocs_
total number of locations from source (file or generator) that were selected after the timing window ...
Dimensions_t nrecs_
number of records from source (file or generator)
ObsSpaceParameters params_
ObsIo parameter specs.
ObsGroup obs_frame_
ObsGroup object (temporary storage for a single frame)
std::map< std::string, std::size_t > obs_grouping_
map for obs grouping via string keys
Dimensions_t adjusted_nlocs_frame_start_
current frame start for variable dimensioned along nlocs
Selection createIndexedFrameSelection(const std::vector< Dimensions_t > &varShape)
set up frontend and backend selection objects for the given variable
void frameNext() override
move to the next frame
void applyMpiDistribution(const std::shared_ptr< Distribution > &dist, const std::vector< Dimensions_t > &locIndex, const std::vector< Dimensions_t > &records)
apply MPI distribution
Dimensions_t adjusted_nlocs_frame_count_
current frame count for variable dimensioned along nlocs
Dimensions_t frameCount(const std::string &varName) override
return current frame count for variable
std::size_t rec_num_increment_
spacing between record numbers assigned on this process.
VarDimMap dims_attached_to_vars_
map showing association of dim names with each variable name
std::size_t next_rec_num_
next available record number
bool frameAvailable() override
true if a frame is available (not past end of frames)
void frameInit() override
initialize for walking through the frames
bool each_process_reads_separate_obs_
true if obs_io_ produces a different series of observations on each process, false if they are all th...
Dimensions_t basicFrameCount(const Variable &var)
return current frame count for variable
std::set< std::size_t > unique_rec_nums_
unique record numbers
bool readFrameVar(const std::string &varName, std::vector< int > &varData)
read a frame variable
void print(std::ostream &os) const override
print routine for oops::Printable base class
std::shared_ptr< Distribution > dist_
MPI distribution object.
void buildObsGroupingKeys(const std::vector< std::string > &obsGroupVarList, const std::vector< Dimensions_t > &frameIndex, std::vector< std::string > &groupingKeys)
generate string keys for record number assignment
void genRecordNumbersGrouping(const std::vector< std::string > &obsGroupVarList, const std::vector< Dimensions_t > &frameIndex, std::vector< Dimensions_t > &records)
generate record numbers considering obs grouping
std::map< std::vector< std::string >, Selection > known_frame_selections_
cache for frame selection
void genRecordNumbersAll(const std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &records)
generate record numbers where each location is a unique record (no grouping)
std::map< std::vector< std::string >, Selection > known_mem_selections_
cache for memory buffer selection
ObsFrameRead(const ObsSpaceParameters ¶ms)
Dimensions_t frameStart() override
return current frame starting index
void genFrameLocationsAll(std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &frameIndex)
generate indices for all locations in current frame
bool insideTimingWindow(const util::DateTime &ObsDt)
std::vector< std::size_t > recnums_
record numbers associated with the location indexes
std::vector< std::size_t > indx_
indexes of locations to extract from the input obs file
void genFrameLocationsTimeWindow(std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &frameIndex)
generate indices for locations in current frame after filtering out obs outside DA timing window
std::vector< Dimensions_t > frame_loc_index_
location indices for current frame
void genFrameIndexRecNums(std::shared_ptr< Distribution > &dist)
generate frame indices and corresponding record numbers
void resize(const std::vector< std::pair< Variable, ioda::Dimensions_t >> &newDims)
Resize a Dimension and every Variable that depends on it.
static std::shared_ptr< ObsIo > create(ObsIoModes mode, const ObsSpaceParameters ¶meters)
Create and return a new instance of an ObsIo subclass.
const util::DateTime & windowEnd() const
return the end of the DA timing window
const util::DateTime & windowStart() const
return the start of the DA timing window
const eckit::mpi::Comm & comm() const
return the associated MPI group communicator
A Selection represents the bounds of the data, in ioda or in userspace, that you are reading or writi...
Has_Variables vars
Use this to access variables.
virtual Variable open(const std::string &name) const
Open a Variable by name.
virtual bool exists(const std::string &name) const
Does a Variable with the specified name exist?
bool isA() const
Convenience function to check a Variable's storage type.
virtual bool isDimensionScale() const
Is this Variable used as a dimension scale?
virtual Dimensions getDimensions() const
virtual Variable read(gsl::span< char > data, const Type &in_memory_dataType, const Selection &mem_selection=Selection::all, const Selection &file_selection=Selection::all) const
Read the Variable - as char array. Ordering is row-major.
virtual Variable write(gsl::span< char > data, const Type &in_memory_dataType, const Selection &mem_selection=Selection::all, const Selection &file_selection=Selection::all)
The fundamental write function. Backends overload this function to implement all write operations.
virtual Variable resize(const std::vector< Dimensions_t > &newDims)
Resize the variable.
Selection & extent(const VecDimensions_t &sz)
Provide the dimensions of the object that you are selecting from.
Selection & select(const SingleSelection &s)
Append a new selection.
std::vector< util::DateTime > convertDtStringsToDtime(const std::vector< std::string > &dtStrings)
convert datetime strings to DateTime object
std::vector< util::DateTime > convertRefOffsetToDtime(const int refIntDtime, const std::vector< float > &timeOffsets)
convert reference, time to DateTime object
std::vector< Dimensions_t > dimsCur
The dimensions of the data.