8 #include "ioda/core/ObsData.h"
22 #include "eckit/config/Configuration.h"
23 #include "eckit/exception/Exceptions.h"
25 #include "oops/mpi/mpi.h"
26 #include "oops/util/abor1_cpp.h"
27 #include "oops/util/DateTime.h"
28 #include "oops/util/Duration.h"
29 #include "oops/util/Logger.h"
30 #include "oops/util/Random.h"
31 #include "oops/util/stringFunctions.h"
33 #include "ioda/distribution/DistributionFactory.h"
34 #include "ioda/io/IodaIOfactory.h"
36 #include "atlas/util/Earth.h"
52 const util::DateTime & bgn,
const util::DateTime & end,
53 const eckit::mpi::Comm & timeComm)
54 : config_(config), winbgn_(bgn), winend_(end), commMPI_(comm),
55 gnlocs_(0), nlocs_(0), nvars_(0), nrecs_(0), file_unexpected_dtypes_(false),
56 file_excess_dims_(false), in_max_frame_size_(0), out_max_frame_size_(0),
57 int_database_(), float_database_(), string_database_(), datetime_database_(),
58 obsvars_(), next_rec_num_(0)
60 oops::Log::trace() <<
"ObsData::ObsData config = " << config << std::endl;
63 distname_ = config.getString(
"distribution",
"RoundRobin");
65 obsvars_ = oops::Variables(config,
"simulated variables");
69 std::unique_ptr<DistributionFactory> distFactory;
70 dist_.reset(distFactory->createDistribution(this->comm(),
distname_));
73 if (config.has(
"obsdatain")) {
77 obs_sort_order_ = config.getString(
"obsdatain.obsgrouping.sort order",
"ascending");
80 std::string(
"ObsData::ObsData: Must use one of 'ascending' or 'descending' ") +
81 std::string(
"for the 'sort order:' YAML configuration keyword.");
84 filein_ = config.getString(
"obsdatain.obsfile");
87 oops::Log::trace() <<
obsname_ <<
" file in = " <<
filein_ << std::endl;
92 <<
"ObsData::ObsData:: WARNING: Input file contains variables "
93 <<
"with unexpected data types" << std::endl
94 <<
" Input file: " <<
filein_ << std::endl;
101 <<
"ObsData::ObsData:: WARNING: Input file contains variables "
102 <<
"with excess number of dimensions (these variables were skipped)" << std::endl
103 <<
" Input file: " <<
filein_ << std::endl;
110 }
else if (config.has(
"generate")) {
112 eckit::LocalConfiguration genconfig(config,
"generate");
116 std::string ErrorMsg =
117 "ObsData::ObsData: Must use one of 'obsdatain' or 'generate' in the YAML configuration.";
123 if (config.has(
"obsdataout.obsfile")) {
124 std::string filename = config.getString(
"obsdataout.obsfile");
129 util::stringfunctions::swapNameMember(config, filename);
133 std::size_t found = filename.find_last_of(
".");
134 if (found == std::string::npos)
135 found = filename.length();
138 std::ostringstream ss;
139 ss <<
"_" << std::setw(4) << std::setfill(
'0') << this->
comm().rank();
140 if (timeComm.size() > 1) ss <<
"_" << timeComm.rank();
143 fileout_ = filename.insert(found, ss.str());
148 if (infile.good() && (
commMPI_.rank() == 0)) {
149 oops::Log::warning() <<
"ObsData::ObsData WARNING: Overwriting output file "
153 oops::Log::debug() <<
"ObsData::ObsData output file is not required " << std::endl;
156 oops::Log::trace() <<
"ObsData::ObsData contructed name = " <<
obsname() << std::endl;
168 oops::Log::trace() <<
"ObsData::ObsData destructor begin" << std::endl;
170 oops::Log::info() <<
obsname() <<
": save database to " <<
fileout_ << std::endl;
173 oops::Log::info() <<
obsname() <<
" : no output" << std::endl;
175 oops::Log::trace() <<
"ObsData::ObsData destructor end" << std::endl;
192 std::vector<int> & vdata)
const {
193 std::vector<std::size_t> vshape(1, vdata.size());
198 std::vector<float> & vdata)
const {
199 std::vector<std::size_t> vshape(1, vdata.size());
204 std::vector<double> & vdata)
const {
205 std::vector<std::size_t> vshape(1, vdata.size());
207 std::vector<float> FloatData(vdata.size(), 0.0);
209 ConvertVarType<float, double>(FloatData, vdata);
213 std::vector<std::string> & vdata)
const {
214 std::vector<std::size_t> vshape(1, vdata.size());
219 std::vector<util::DateTime> & vdata)
const {
220 std::vector<std::size_t> vshape(1, vdata.size());
239 const std::vector<int> & vdata) {
240 std::vector<std::size_t> vshape(1, vdata.size());
245 const std::vector<float> & vdata) {
246 std::vector<std::size_t> vshape(1, vdata.size());
251 const std::vector<double> & vdata) {
252 std::vector<std::size_t> vshape(1, vdata.size());
254 std::vector<float> FloatData(vdata.size());
255 ConvertVarType<double, float>(vdata, FloatData);
260 const std::vector<std::string> & vdata) {
261 std::vector<std::size_t> vshape(1, vdata.size());
266 const std::vector<util::DateTime> & vdata) {
267 std::vector<std::size_t> vshape(1, vdata.size());
278 bool ObsData::has(
const std::string & group,
const std::string & name)
const {
298 VarType = ObsDtype::DateTime;
418 return (irec !=
recidx_.end());
448 "ObsData::recidx_vector: Record number, " + std::to_string(RecNum) +
449 ", does not exist in record index map.";
461 std::vector<std::size_t> RecNums(
nrecs_);
464 RecNums[
recnum] = Irec->first;
484 std::vector<float> latitude;
485 std::vector<float> longitude;
486 std::vector<util::DateTime> obs_datetimes;
487 if (conf.has(
"random")) {
489 }
else if (conf.has(
"list")) {
490 genDistList(conf, latitude, longitude, obs_datetimes);
492 std::string ErrorMsg =
493 std::string(
"ObsData::generateDistribution: Must specify either ") +
494 std::string(
"'random' or 'list' with 'generate' configuration keyword");
502 const std::vector<float> err = conf.getFloatVector(
"obs errors");
503 ASSERT(
nvars_ == err.size());
505 put_db(
"MetaData",
"datetime", obs_datetimes);
506 put_db(
"MetaData",
"latitude", latitude);
507 put_db(
"MetaData",
"longitude", longitude);
508 for (std::size_t ivar = 0; ivar <
nvars_; ivar++) {
509 std::vector<float> obserr(
nlocs_, err[ivar]);
532 std::vector<float> & Lons, std::vector<util::DateTime> & Dtimes) {
533 gnlocs_ = conf.getInt(
"random.nobs");
534 float lat1 = conf.getFloat(
"random.lat1");
535 float lat2 = conf.getFloat(
"random.lat2");
536 float lon1 = conf.getFloat(
"random.lon1");
537 float lon2 = conf.getFloat(
"random.lon2");
542 unsigned int ran_seed;
543 if (conf.has(
"random.random seed")) {
544 ran_seed = conf.getInt(
"random.random seed");
546 ran_seed = std::time(0);
550 std::unique_ptr<IodaIO> NoIO(
nullptr);
570 std::vector<float> RanVals(
gnlocs_, 0.0);
571 std::vector<float> RanVals2(
gnlocs_, 0.0);
572 if (this->
comm().rank() == 0) {
573 util::UniformDistribution<float> RanUD(
gnlocs_, 0.0, 1.0, ran_seed);
574 util::UniformDistribution<float> RanUD2(
gnlocs_, 0.0, 1.0, ran_seed+1);
576 RanVals = RanUD.data();
577 RanVals2 = RanUD2.data();
579 this->
comm().broadcast(RanVals, 0);
580 this->
comm().broadcast(RanVals2, 0);
583 float LatRange = lat2 - lat1;
584 float LonRange = lon2 - lon1;
586 float TimeRange =
static_cast<float>(WindowDuration.toSeconds());
594 util::Duration DurZero(0);
595 util::Duration DurOneSec(1);
596 for (std::size_t ii = 0; ii <
nlocs_; ii++) {
598 Lats[ii] = lat1 + (RanVals[
index] * LatRange);
599 Lons[ii] = lon1 + (RanVals2[
index] * LonRange);
607 util::Duration OffsetDt(
static_cast<int64_t
>(RanVals[
index] * TimeRange));
608 if (OffsetDt == DurZero) {
609 OffsetDt = DurOneSec;
612 Dtimes[ii] += OffsetDt;
629 std::vector<float> & Lons, std::vector<util::DateTime> & Dtimes) {
630 std::vector<float> latitudes = conf.getFloatVector(
"list.lats");
631 std::vector<float> longitudes = conf.getFloatVector(
"list.lons");
632 std::vector<std::string> DtStrings = conf.getStringVector(
"list.datetimes");
633 std::vector<util::DateTime> datetimes;
634 for (std::size_t i = 0; i < DtStrings.size(); ++i) {
635 util::DateTime TempDt(DtStrings[i]);
636 datetimes.push_back(TempDt);
641 std::unique_ptr<IodaIO> NoIO(
nullptr);
650 for (std::size_t ii = 0; ii <
nlocs_; ii++) {
652 Lats[ii] = latitudes[
index];
653 Lons[ii] = longitudes[
index];
654 Dtimes[ii] = datetimes[
index];
664 os <<
"ObsData::print not implemented";
680 oops::Log::trace() <<
"ObsData::InitFromFile opening file: " << filename << std::endl;
691 bool FirstFrame =
true;
692 fileio->frame_initialize();
694 iframe != fileio->frame_end(); ++iframe) {
695 std::size_t FrameStart = fileio->frame_start(iframe);
696 std::size_t FrameSize = fileio->frame_size(iframe);
699 fileio->frame_read(iframe);
708 idata != fileio->frame_int_end(); ++idata) {
709 std::string GroupName = fileio->frame_int_get_gname(idata);
710 if (FirstFrame && (GroupName ==
"ObsValue")) {
nvars_++; }
711 std::string VarName = fileio->frame_int_get_vname(idata);
712 std::vector<std::size_t> VarShape = fileio->var_shape(GroupName, VarName);
713 std::vector<int> FrameData;
714 fileio->frame_int_get_data(GroupName, VarName, FrameData);
715 std::vector<std::size_t> FrameShape = VarShape;
716 FrameShape[0] = FrameData.size();
718 std::vector<std::size_t> IndexedShape;
719 std::vector<int> SelectedData =
720 ApplyIndex(FrameData, VarShape, FrameIndex, FrameShape);
729 idata != fileio->frame_float_end(); ++idata) {
730 std::string GroupName = fileio->frame_float_get_gname(idata);
731 if (FirstFrame && (GroupName ==
"ObsValue")) {
nvars_++; }
732 std::string VarName = fileio->frame_float_get_vname(idata);
733 std::vector<std::size_t> VarShape = fileio->var_shape(GroupName, VarName);
734 std::vector<float> FrameData;
735 fileio->frame_float_get_data(GroupName, VarName, FrameData);
736 std::vector<std::size_t> FrameShape = VarShape;
737 FrameShape[0] = FrameData.size();
739 std::vector<std::size_t> IndexedShape;
740 std::vector<float> SelectedData =
741 ApplyIndex(FrameData, VarShape, FrameIndex, FrameShape);
750 idata != fileio->frame_string_end(); ++idata) {
751 std::string GroupName = fileio->frame_string_get_gname(idata);
752 if (FirstFrame && (GroupName ==
"ObsValue")) {
nvars_++; }
753 std::string VarName = fileio->frame_string_get_vname(idata);
754 std::vector<std::size_t> VarShape = fileio->var_shape(GroupName, VarName);
755 std::vector<std::string> FrameData;
756 fileio->frame_string_get_data(GroupName, VarName, FrameData);
757 std::vector<std::size_t> FrameShape = VarShape;
758 FrameShape[0] = FrameData.size();
760 std::vector<std::size_t> IndexedShape;
761 std::vector<std::string> SelectedData =
762 ApplyIndex(FrameData, VarShape, FrameIndex, FrameShape);
763 if (VarName ==
"datetime") {
765 std::vector<util::DateTime> DtData;
766 for (std::size_t i = 0; i < SelectedData.size(); ++i) {
767 util::DateTime ObsDt(SelectedData[i]);
768 DtData.push_back(ObsDt);
780 fileio->frame_finalize();
785 oops::Log::trace() <<
"ObsData::InitFromFile opening file ends " << std::endl;
800 const std::size_t FrameStart,
const std::size_t FrameSize) {
805 std::size_t LocSize = FrameSize;
806 if ((FrameStart + FrameSize) >
gnlocs_) { LocSize =
gnlocs_ - FrameStart; }
820 std::vector<std::size_t> LocIndex;
821 std::vector<std::size_t> FrameIndex;
822 if (FileIO !=
nullptr) {
824 std::string DtGroupName =
"MetaData";
825 std::string DtVarName =
"datetime";
826 std::vector<std::string> DtStrings;
827 FileIO->frame_string_get_data(DtGroupName, DtVarName, DtStrings);
830 std::vector<util::DateTime> ObsDtimes;
831 for (std::size_t i = 0; i < DtStrings.size(); ++i) {
832 util::DateTime ObsDt(DtStrings[i]);
833 ObsDtimes.push_back(ObsDt);
837 for (std::size_t i = 0; i < LocSize; ++i) {
839 LocIndex.push_back(FrameStart + i);
840 FrameIndex.push_back(i);
843 LocSize = LocIndex.size();
846 LocIndex.assign(LocSize, 0);
847 std::iota(LocIndex.begin(), LocIndex.end(), FrameStart);
849 FrameIndex.assign(LocSize, 0);
850 std::iota(FrameIndex.begin(), FrameIndex.end(), 0);
854 std::vector<std::size_t> Records(LocSize);
859 for (std::size_t i = 0; i < LocSize; ++i) {
860 int RecValue = LocIndex[i];
870 std::string GroupName =
"MetaData";
872 std::string VarType = FileIO->var_dtype(GroupName, VarName);
874 if (VarType ==
"int") {
875 std::vector<int> GroupVar;
876 FileIO->frame_int_get_data(GroupName, VarName, GroupVar);
877 for (std::size_t i = 0; i < LocSize; ++i) {
878 int RecValue = GroupVar[FrameIndex[i]];
886 }
else if (VarType ==
"float") {
887 std::vector<float> GroupVar;
888 FileIO->frame_float_get_data(GroupName, VarName, GroupVar);
889 for (std::size_t i = 0; i < LocSize; ++i) {
890 float RecValue = GroupVar[FrameIndex[i]];
897 }
else if (VarType ==
"string") {
898 std::vector<std::string> GroupVar;
899 FileIO->frame_string_get_data(GroupName, VarName, GroupVar);
900 for (std::size_t i = 0; i < LocSize; ++i) {
901 std::string RecValue = GroupVar[FrameIndex[i]];
915 std::set<std::size_t> UniqueRecNums;
916 for (std::size_t i = 0; i < LocSize; ++i) {
917 std::size_t RowNum = LocIndex[i];
918 std::size_t RecNum = Records[i];
919 if (
dist_->isMyRecord(RecNum)) {
920 indx_.push_back(RowNum);
923 FrameIndex.push_back(RowNum - FrameStart);
927 nlocs_ += FrameIndex.size();
949 typedef std::map<std::size_t, std::vector<std::pair<float, std::size_t>>> TmpRecIdxMap;
950 typedef TmpRecIdxMap::iterator TmpRecIdxIter;
953 std::vector<float> SortValues(
nlocs_);
955 std::vector<util::DateTime> Dates(
nlocs_);
957 for (std::size_t iloc = 0; iloc <
nlocs_; iloc++) {
958 SortValues[iloc] = (Dates[iloc] - Dates[0]).toSeconds();
966 TmpRecIdxMap TmpRecIdx;
967 for (
size_t iloc = 0; iloc <
nlocs_; iloc++) {
968 TmpRecIdx[
recnums_[iloc]].push_back(std::make_pair(SortValues[iloc], iloc));
971 for (TmpRecIdxIter irec = TmpRecIdx.begin(); irec != TmpRecIdx.end(); ++irec) {
973 sort(irec->second.begin(), irec->second.end());
978 sort(irec->second.begin(), irec->second.end(),
979 [](
const std::pair<float, std::size_t> & p1,
980 const std::pair<float, std::size_t> & p2){
981 return (p2.first < p1.first ||
982 (!(p1.first < p2.first) && p2.second > p1.second));});
987 for (TmpRecIdxIter irec = TmpRecIdx.begin(); irec != TmpRecIdx.end(); ++irec) {
988 recidx_[irec->first].resize(irec->second.size());
989 for (std::size_t iloc = 0; iloc < irec->second.size(); iloc++) {
990 recidx_[irec->first][iloc] = irec->second[iloc].second;
1006 std::unique_ptr<IodaIO> fileio
1011 fileio->dim_insert(
"nvars",
nvars_);
1015 std::size_t MaxVarSize = 0;
1020 std::string GrpVarName = VarName +
"@" + GroupName;
1022 if (VarShape[0] > MaxVarSize) { MaxVarSize = VarShape[0]; }
1023 fileio->grp_var_insert(GroupName, VarName,
"int", VarShape, GrpVarName,
"int");
1029 std::string GrpVarName = VarName +
"@" + GroupName;
1031 if (VarShape[0] > MaxVarSize) { MaxVarSize = VarShape[0]; }
1032 fileio->grp_var_insert(GroupName, VarName,
"float", VarShape, GrpVarName,
"float");
1038 std::string GrpVarName = VarName +
"@" + GroupName;
1040 if (VarShape[0] > MaxVarSize) { MaxVarSize = VarShape[0]; }
1041 std::vector<std::string> DbData(VarShape[0],
"");
1044 fileio->grp_var_insert(GroupName, VarName,
"string", VarShape, GrpVarName,
"string",
1051 std::string GrpVarName = VarName +
"@" + GroupName;
1053 if (VarShape[0] > MaxVarSize) { MaxVarSize = VarShape[0]; }
1054 fileio->grp_var_insert(GroupName, VarName,
"string", VarShape, GrpVarName,
"string", 20);
1058 fileio->frame_info_init(MaxVarSize);
1062 iframe != fileio->frame_end(); ++iframe) {
1063 fileio->frame_data_init();
1064 std::size_t FrameStart = fileio->frame_start(iframe);
1065 std::size_t FrameSize = fileio->frame_size(iframe);
1074 if (VarShape[0] > FrameStart) {
1075 std::size_t Count = FrameSize;
1076 if ((FrameStart + FrameSize) > VarShape[0]) { Count = VarShape[0] - FrameStart; }
1077 std::vector<int> FrameData(Count, 0);
1079 fileio->frame_int_put_data(GroupName, VarName, FrameData);
1090 if (VarShape[0] > FrameStart) {
1091 std::size_t Count = FrameSize;
1092 if ((FrameStart + FrameSize) > VarShape[0]) { Count = VarShape[0] - FrameStart; }
1093 std::vector<float> FrameData(Count, 0.0);
1095 fileio->frame_float_put_data(GroupName, VarName, FrameData);
1106 if (VarShape[0] > FrameStart) {
1107 std::size_t Count = FrameSize;
1108 if ((FrameStart + FrameSize) > VarShape[0]) { Count = VarShape[0] - FrameStart; }
1109 std::vector<std::string> FrameData(Count,
"");
1112 fileio->frame_string_put_data(GroupName, VarName, FrameData);
1122 if (VarShape[0] > FrameStart) {
1123 std::size_t Count = FrameSize;
1124 if ((FrameStart + FrameSize) > VarShape[0]) { Count = VarShape[0] - FrameStart; }
1125 util::DateTime TempDt(
"0000-01-01T00:00:00Z");
1126 std::vector<util::DateTime> FrameData(Count, TempDt);
1131 std::vector<std::string> StringVector(FrameData.size(),
"");
1132 for (std::size_t i = 0; i < FrameData.size(); i++) {
1133 StringVector[i] = FrameData[i].toString();
1135 fileio->frame_string_put_data(GroupName, VarName, StringVector);
1139 fileio->frame_write(iframe);
1154 template<
typename VarType>
1156 const std::vector<std::size_t> & FullShape,
1157 const std::vector<std::size_t> & Index,
1158 std::vector<std::size_t> & IndexedShape)
const {
1159 std::vector<VarType> SelectedData;
1160 for (std::size_t i = 0; i < Index.size(); ++i) {
1161 std::size_t isrc = Index[i];
1162 SelectedData.push_back(FullData[isrc]);
1164 IndexedShape = FullShape;
1165 IndexedShape[0] = SelectedData.size();
1166 return SelectedData;
1185 std::string DbVarType = FileVarType;
1187 if (GroupName ==
"PreQC") {
1189 }
else if (FileVarType ==
"double") {
1190 DbVarType =
"float";
1202 oops::Log::info() <<
"ObsData::printJo not implemented" << std::endl;
1211 kd_ = std::shared_ptr<ObsData::KDTree> (
new KDTree() );
1213 std::vector<float> lats(
nlocs_);
1214 std::vector<float> lons(
nlocs_);
1217 this ->
get_db(
"MetaData",
"longitude", lons);
1218 this ->
get_db(
"MetaData",
"latitude", lats);
1221 typedef KDTree::PointType Point;
1222 std::vector<KDTree::Value> points;
1223 for (
unsigned int i = 0; i <
nlocs_; i++) {
1224 eckit::geometry::Point2 lonlat(lons[i], lats[i]);
1225 Point xyz = Point();
1227 atlas::util::Earth::convertSphericalToCartesian(lonlat, xyz);
1228 double index =
static_cast<double>(i);
1229 KDTree::Value v(xyz,
index);
1230 points.push_back(v);
1234 kd_->build(points.begin(), points.end());