8 #include "oops/util/abor1_cpp.h" 
   10 #include "oops/util/Logger.h" 
   12 #include "ioda/distribution/DistributionFactory.h" 
   13 #include "ioda/io/ObsFrameRead.h" 
   14 #include "ioda/io/ObsIoFactory.h" 
   24     eckit::LocalConfiguration distConf = 
params.top_level_.toConfiguration();
 
   25     distname_ = distConf.getString(
"distribution", 
"RoundRobin");
 
   35         eckit::LocalConfiguration rrConf;
 
   36         rrConf.set(
"distribution", 
"RoundRobin");
 
   44     oops::Log::debug() << 
"ObsFrameRead: maximum frame size: " << 
max_frame_size_ << std::endl;
 
   84     if (haveAnotherFrame) {
 
   92         for (
auto & varNameObject : 
obs_io_->varList()) {
 
   93             std::string varName = varNameObject.first;
 
   94             Variable sourceVar = varNameObject.second;
 
  108                 if (destVar.
isA<
int>()) {
 
  109                     std::vector<int> varValues;
 
  110                     sourceVar.
read<
int>(varValues, memBufferSelect, obsIoSelect);
 
  111                     destVar.
write<
int>(varValues, memBufferSelect, obsFrameSelect);
 
  112                 } 
else if (destVar.
isA<
float>()) {
 
  113                     std::vector<float> varValues;
 
  114                     sourceVar.
read<
float>(varValues, memBufferSelect, obsIoSelect);
 
  115                     destVar.
write<
float>(varValues, memBufferSelect, obsFrameSelect);
 
  116                 } 
else if (destVar.
isA<std::string>()) {
 
  117                     std::vector<std::string> varValues;
 
  118                     sourceVar.
read<std::string>(varValues, memBufferSelect, obsIoSelect);
 
  119                     destVar.
write<std::string>(varValues, memBufferSelect, obsFrameSelect);
 
  131       if (
obs_io_->eachProcessGeneratesSeparateObs()) {
 
  137       dist_->computePatchLocs();
 
  139     return (haveAnotherFrame);
 
  154         if (
obs_io_->isVarDimByNlocs(varName)) {
 
  165     return readFrameVarHelper<int>(varName, varData);
 
  168     return readFrameVarHelper<float>(varName, varData);
 
  171                                 std::vector<std::string> & varData) {
 
  172     return readFrameVarHelper<std::string>(varName, varData);
 
  178     os << 
"ObsFrameRead: " << std::endl;
 
  187         if (count < 0) { count = 0; }
 
  199     indexedFrameSelect.
extent(varShape)
 
  201     for (std::size_t i = 1; i < varShape.size(); ++i) {
 
  202         std::vector<Dimensions_t> dimIndex(varShape[i]);
 
  203         std::iota(dimIndex.begin(), dimIndex.end(), 0);
 
  206     return indexedFrameSelect;
 
  213     std::vector<Dimensions_t> locIndex;
 
  214     std::vector<Dimensions_t> frameIndex;
 
  221     if (
obs_io_->applyTimingWindow()) {
 
  228     std::vector<Dimensions_t> records;
 
  229     const std::vector<std::string> &obsGroupVarList = 
obs_io_->obsGroupingVars();
 
  230     if (obsGroupVarList.empty()) {
 
  247                                         std::vector<Dimensions_t> & frameIndex) {
 
  248     Dimensions_t locSize = this->
frameCount(
"nlocs");
 
  251     locIndex.resize(locSize);
 
  252     std::iota(locIndex.begin(), locIndex.end(), this->frameStart());
 
  254     frameIndex.resize(locSize);
 
  255     std::iota(frameIndex.begin(), frameIndex.end(), 0);
 
  260                                                std::vector<Dimensions_t> & frameIndex) {
 
  265     std::string dtVarName;
 
  267         dtVarName = std::string(
"MetaData/datetime");
 
  269         dtVarName = std::string(
"MetaData/time");
 
  272             std::string(
"ERROR: ObsFrameRead::genFrameLocationsTimeWindow: ") +
 
  273             std::string(
"date time information does not exist, ") +
 
  274             std::string(
"cannot perform time window filtering");
 
  275         throw eckit::UserError(ErrMsg, Here());
 
  285     std::vector<util::DateTime> dtimeVals;
 
  286     if (dtVarName == 
"MetaData/datetime") {
 
  287         std::vector<std::string> dtValues;
 
  288         dtVar.
read<std::string>(dtValues, memSelect, frameSelect);
 
  293         std::vector<float> dtValues;
 
  294         dtVar.
read<
float>(dtValues, memSelect, frameSelect);
 
  298         this->
obs_io_->atts().open(
"date_time").read<
int>(refDtime);
 
  307     std::size_t iloc = 0;
 
  308     for (std::size_t i = 0; i < 
frameCount; ++i) {
 
  311         frameIndex[iloc] = i;
 
  317     locIndex.resize(iloc);
 
  318     frameIndex.resize(iloc);
 
  324                                        std::vector<Dimensions_t> & records) {
 
  326     Dimensions_t locSize = locIndex.size();
 
  327     records.assign(locSize, 0);
 
  328     for (std::size_t i = 0; i < locSize; ++i) {
 
  336                                             const std::vector<Dimensions_t> & frameIndex,
 
  337                                      std::vector<Dimensions_t> & records) {
 
  343     std::size_t locSize = frameIndex.size();
 
  344     records.assign(locSize, 0);
 
  345     std::vector<std::string> obsGroupingKeys(locSize);
 
  348     for (std::size_t i = 0; i < locSize; ++i) {
 
  353             std::pair<std::string, std::size_t>(obsGroupingKeys[i], 
next_rec_num_));
 
  362                                         const std::vector<Dimensions_t> & frameIndex,
 
  363                                         std::vector<std::string> & groupingKeys) {
 
  366     for (std::size_t i = 0; i < obsGroupVarList.size(); ++i) {
 
  370         std::string obsGroupVarName = obsGroupVarList[i];
 
  371         std::string varName = std::string(
"MetaData/") + obsGroupVarName;
 
  373         if (!
obs_io_->isVarDimByNlocs(varName)) {
 
  375                 std::string(
"ERROR: ObsFrameRead::genRecordNumbersGrouping: ") +
 
  376                 std::string(
"obs grouping variable (") + obsGroupVarName +
 
  377                 std::string(
") must have 'nlocs' as first dimension");
 
  389         std::string keySegment;
 
  390         if (groupVar.
isA<
int>()) {
 
  391             std::vector<int> groupVarValues;
 
  392             groupVar.
read<
int>(groupVarValues, memSelect, frameSelect);
 
  394             for (std::size_t j = 0; j < frameIndex.size(); ++j) {
 
  395                 keySegment = std::to_string(groupVarValues[frameIndex[j]]);
 
  397                     groupingKeys[j] = keySegment;
 
  399                     groupingKeys[j] += 
":";
 
  400                     groupingKeys[j] += keySegment;
 
  403         } 
else if (groupVar.
isA<
float>()) {
 
  404             std::vector<float> groupVarValues;
 
  405             groupVar.
read<
float>(groupVarValues, memSelect, frameSelect);
 
  407             for (std::size_t j = 0; j < frameIndex.size(); ++j) {
 
  408                 keySegment = std::to_string(groupVarValues[frameIndex[j]]);
 
  410                     groupingKeys[j] = keySegment;
 
  412                     groupingKeys[j] += 
":";
 
  413                     groupingKeys[j] += keySegment;
 
  416         } 
else if (groupVar.
isA<std::string>()) {
 
  417             std::vector<std::string> groupVarValues;
 
  418             groupVar.
read<std::string>(groupVarValues, memSelect, frameSelect);
 
  420             for (std::size_t j = 0; j < frameIndex.size(); ++j) {
 
  421                 keySegment = groupVarValues[frameIndex[j]];
 
  423                     groupingKeys[j] = keySegment;
 
  425                     groupingKeys[j] += 
":";
 
  426                     groupingKeys[j] += keySegment;
 
  435                                         const std::vector<Dimensions_t> & locIndex,
 
  436                                         const std::vector<Dimensions_t> & records) {
 
  438     Dimensions_t locSize = locIndex.size();
 
  439     std::vector<float> lats(locSize, 0);
 
  440     std::vector<float> lons(locSize, 0);
 
  447         throw eckit::UserError(
"MetaData/longitude not found in observations file", Here());
 
  454     latLonVar.
read(lons, memSelect, frameSelect);
 
  458         throw eckit::UserError(
"MetaData/latitude not found in observations file", Here());
 
  461     latLonVar.
read(lats, memSelect, frameSelect);
 
  473     std::vector<int> saved_index(locSize, 0);
 
  474     std::vector<int> saved_record_number(locSize, 0);
 
  479         throw eckit::UserError(
"MetaData/saved_record_number not found in observations file",
 
  486       locVar.
read(saved_record_number, locMemSelect, locFrameSelect);
 
  490         throw eckit::UserError(
"MetaData/saved_index not found in observations file", Here());
 
  493       locVar.
read(saved_index, locMemSelect, locFrameSelect);
 
  498     const std::size_t commSize = 
params_.
comm().size();
 
  499     const std::size_t commRank = 
params_.
comm().rank();
 
  500     std::size_t rowNum = 0;
 
  501     std::size_t recNum = 0;
 
  502     std::size_t frameIndex = 0;
 
  503     std::size_t globalLocIndex = 0;
 
  505     for (std::size_t i = 0; i < locSize; ++i) {
 
  507           rowNum = saved_index[i];
 
  508           recNum = saved_record_number[i];
 
  511           rowNum = locIndex[i];
 
  518         eckit::geometry::Point2 point(lons[frameIndex], lats[frameIndex]);
 
  520         std::size_t globalLocIndex = rowNum;
 
  524           globalLocIndex = rowNum * commSize + commRank;
 
  526         dist_->assignRecord(recNum, globalLocIndex, point);
 
  528         if (dist->isMyRecord(recNum)) {
 
  529             indx_.push_back(globalLocIndex);
 
static std::unique_ptr< Distribution > create(const eckit::mpi::Comm &comm, const eckit::Configuration &config)
Create a Distribution object implementing a particular method of distributing observations across mul...
 
Dimensions_t nlocs_
number of locations from source (file or generator)
 
std::shared_ptr< ObsIo > obs_io_
ObsIo object.
 
Selection createMemSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameCount)
create selection object for accessing a memory buffer
 
void createFrameFromObsGroup(const VarNameObjectList &varList, const VarNameObjectList &dimVarList, const VarDimMap &varDimMap)
create a frame object based on dimensions and variables from a source ObsGroup
 
Dimensions_t max_frame_size_
maximum frame size
 
Dimensions_t max_var_size_
maximum variable size
 
VarDimMap ioVarDimMap() const
return map from variables to their attached dimension scales
 
Selection createEntireFrameSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameCount)
create selection object for accessing the entire frame variable
 
Dimensions_t frame_start_
current frame starting index
 
Selection createObsIoSelection(const std::vector< Dimensions_t > &varShape, const Dimensions_t frameStart, const Dimensions_t frameCount)
create selection object for accessing an ObsIo variable
 
Dimensions_t gnlocs_outside_timewindow_
number of nlocs from the file (gnlocs) that are outside the time window
 
Dimensions_t gnlocs_
total number of locations from source (file or generator) that were selected after the timing window ...
 
Dimensions_t nrecs_
number of records from source (file or generator)
 
ObsSpaceParameters params_
ObsIo parameter specs.
 
ObsGroup obs_frame_
ObsGroup object (temporary storage for a single frame)
 
std::map< std::string, std::size_t > obs_grouping_
map for obs grouping via string keys
 
Dimensions_t adjusted_nlocs_frame_start_
current frame start for variable dimensioned along nlocs
 
Selection createIndexedFrameSelection(const std::vector< Dimensions_t > &varShape)
set up frontend and backend selection objects for the given variable
 
void frameNext() override
move to the next frame
 
void applyMpiDistribution(const std::shared_ptr< Distribution > &dist, const std::vector< Dimensions_t > &locIndex, const std::vector< Dimensions_t > &records)
apply MPI distribution
 
Dimensions_t adjusted_nlocs_frame_count_
current frame count for variable dimensioned along nlocs
 
Dimensions_t frameCount(const std::string &varName) override
return current frame count for variable
 
std::size_t rec_num_increment_
spacing between record numbers assigned on this process.
 
VarDimMap dims_attached_to_vars_
map showing association of dim names with each variable name
 
std::size_t next_rec_num_
next available record number
 
std::string distname_
\Brief Distribution Name
 
bool frameAvailable() override
true if a frame is available (not past end of frames)
 
void frameInit() override
initialize for walking through the frames
 
bool each_process_reads_separate_obs_
true if obs_io_ produces a different series of observations on each process, false if they are all th...
 
Dimensions_t basicFrameCount(const Variable &var)
return current frame count for variable
 
std::set< std::size_t > unique_rec_nums_
unique record numbers
 
bool readFrameVar(const std::string &varName, std::vector< int > &varData)
read a frame variable
 
void print(std::ostream &os) const override
print routine for oops::Printable base class
 
std::shared_ptr< Distribution > dist_
MPI distribution object.
 
void buildObsGroupingKeys(const std::vector< std::string > &obsGroupVarList, const std::vector< Dimensions_t > &frameIndex, std::vector< std::string > &groupingKeys)
generate string keys for record number assignment
 
void genRecordNumbersGrouping(const std::vector< std::string > &obsGroupVarList, const std::vector< Dimensions_t > &frameIndex, std::vector< Dimensions_t > &records)
generate record numbers considering obs grouping
 
std::map< std::vector< std::string >, Selection > known_frame_selections_
cache for frame selection
 
void genRecordNumbersAll(const std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &records)
generate record numbers where each location is a unique record (no grouping)
 
std::map< std::vector< std::string >, Selection > known_mem_selections_
cache for memory buffer selection
 
ObsFrameRead(const ObsSpaceParameters ¶ms)
 
Dimensions_t frameStart() override
return current frame starting index
 
void genFrameLocationsAll(std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &frameIndex)
generate indices for all locations in current frame
 
bool insideTimingWindow(const util::DateTime &ObsDt)
 
std::vector< std::size_t > recnums_
record numbers associated with the location indexes
 
std::vector< std::size_t > indx_
indexes of locations to extract from the input obs file
 
void genFrameLocationsTimeWindow(std::vector< Dimensions_t > &locIndex, std::vector< Dimensions_t > &frameIndex)
generate indices for locations in current frame after filtering out obs outside DA timing window
 
std::vector< Dimensions_t > frame_loc_index_
location indices for current frame
 
void genFrameIndexRecNums(std::shared_ptr< Distribution > &dist)
generate frame indices and corresponding record numbers
 
void resize(const std::vector< std::pair< Variable, ioda::Dimensions_t >> &newDims)
Resize a Dimension and every Variable that depends on it.
 
static std::shared_ptr< ObsIo > create(ObsIoModes mode, const ObsSpaceParameters ¶meters)
Create and return a new instance of an ObsIo subclass.
 
const util::DateTime & windowEnd() const
return the end of the DA timing window
 
const util::DateTime & windowStart() const
return the start of the DA timing window
 
const eckit::mpi::Comm & comm() const
return the associated MPI group communicator
 
A Selection represents the bounds of the data, in ioda or in userspace, that you are reading or writi...
 
Has_Variables vars
Use this to access variables.
 
virtual Variable open(const std::string &name) const
Open a Variable by name.
 
virtual bool exists(const std::string &name) const
Does a Variable with the specified name exist?
 
bool isA() const
Convenience function to check a Variable's storage type.
 
virtual bool isDimensionScale() const
Is this Variable used as a dimension scale?
 
virtual Dimensions getDimensions() const
 
virtual Variable read(gsl::span< char > data, const Type &in_memory_dataType, const Selection &mem_selection=Selection::all, const Selection &file_selection=Selection::all) const
Read the Variable - as char array. Ordering is row-major.
 
virtual Variable write(gsl::span< char > data, const Type &in_memory_dataType, const Selection &mem_selection=Selection::all, const Selection &file_selection=Selection::all)
The fundamental write function. Backends overload this function to implement all write operations.
 
virtual Variable resize(const std::vector< Dimensions_t > &newDims)
Resize the variable.
 
Selection & extent(const VecDimensions_t &sz)
Provide the dimensions of the object that you are selecting from.
 
Selection & select(const SingleSelection &s)
Append a new selection.
 
std::vector< util::DateTime > convertRefOffsetToDtime(const int refIntDtime, const std::vector< float > &timeOffsets)
convert reference, time to DateTime object
 
std::vector< util::DateTime > convertDtStringsToDtime(const std::vector< std::string > &dtStrings)
convert datetime strings to DateTime object
 
std::vector< Dimensions_t > dimsCur
The dimensions of the data.