IODA
test/distribution/Distribution.h
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2009-2016 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #ifndef TEST_DISTRIBUTION_DISTRIBUTION_H_
12 #define TEST_DISTRIBUTION_DISTRIBUTION_H_
13 
14 #include <algorithm>
15 #include <cmath>
16 #include <memory>
17 #include <numeric>
18 #include <set>
19 #include <string>
20 #include <vector>
21 
22 #define ECKIT_TESTING_SELF_REGISTER_CASES 0
23 
24 #include <boost/noncopyable.hpp>
25 
26 #include "eckit/config/LocalConfiguration.h"
27 #include "eckit/geometry/Point2.h"
28 #include "eckit/mpi/Comm.h"
29 #include "eckit/testing/Test.h"
30 
31 #include "oops/mpi/mpi.h"
32 #include "oops/runs/Test.h"
33 #include "oops/test/TestEnvironment.h"
34 #include "oops/util/DateTime.h"
35 #include "oops/util/Logger.h"
36 
37 #include "ioda/distribution/Distribution.h"
38 #include "ioda/distribution/DistributionFactory.h"
39 #include "ioda/ObsSpace.h"
40 
41 namespace eckit
42 {
43  // Don't use the contracted output for these types: the current implementation works only
44  // with integer types.
45  // TODO(wsmigaj) Report this (especially for floats) as a bug in eckit?
46  template <> struct VectorPrintSelector<float> { typedef VectorPrintSimple selector; };
47  template <> struct VectorPrintSelector<util::DateTime> { typedef VectorPrintSimple selector; };
48  template <> struct VectorPrintSelector<util::Duration> { typedef VectorPrintSimple selector; };
49 } // namespace eckit
50 
51 namespace ioda {
52 namespace test {
53 
54 // -----------------------------------------------------------------------------
55 
57  const eckit::LocalConfiguration conf(::test::TestEnvironment::config());
58  std::vector<eckit::LocalConfiguration> dist_types;
59  const eckit::mpi::Comm & MpiComm = oops::mpi::world();
60 
61  std::string TestDistType;
62  std::string DistName;
63  std::unique_ptr<ioda::Distribution> TestDist;
64  DistributionFactory * DistFactory = nullptr;
65 
66  // Walk through the different distribution types and try constructing.
67  conf.get("distribution types", dist_types);
68  for (std::size_t i = 0; i < dist_types.size(); ++i) {
69  oops::Log::debug() << "Distribution::DistributionTypes: conf: " << dist_types[i] << std::endl;
70 
71  TestDistType = dist_types[i].getString("distribution");
72  oops::Log::debug() << "Distribution::DistType: " << TestDistType << std::endl;
73 
74  DistName = dist_types[i].getString("specs.name");
75  eckit::LocalConfiguration DistConfig;
76  DistConfig.set("distribution", DistName);
77  TestDist = DistributionFactory::create(MpiComm, DistConfig);
78  EXPECT(TestDist.get());
79  }
80 }
81 
82 // -----------------------------------------------------------------------------
83 
84 void testDistribution(const eckit::Configuration &config,
85  const eckit::Configuration &MyRankConfig,
86  const ioda::Distribution *TestDist,
87  const std::vector<std::size_t> &Index,
88  const std::vector<std::size_t> &Recnums) {
89  // expected answers
90  std::size_t ExpectedNlocs = MyRankConfig.getUnsigned("nlocs");
91  std::size_t ExpectedNrecs = MyRankConfig.getUnsigned("nrecs");
92  std::size_t ExpectedNPatchLocs = MyRankConfig.getUnsigned("nPatchLocs");
93  std::vector<std::size_t> ExpectedIndex =
94  MyRankConfig.getUnsignedVector("index");
95  std::vector<std::size_t> ExpectedRecnums =
96  MyRankConfig.getUnsignedVector("recnums");
97  std::vector<std::size_t> ExpectedPatchIndex =
98  MyRankConfig.getUnsignedVector("patchIndex");
99  const std::vector<std::size_t> ExpectedAllGatherv =
100  config.getUnsignedVector("specs.allgatherv");
101 
102  std::vector<bool> patchBool(Index.size());
103  std::vector<std::size_t> PatchLocsThisPE;
104  TestDist->patchObs(patchBool);
105  for (std::size_t j = 0; j < Index.size(); ++j) {
106  if (patchBool[j]) {PatchLocsThisPE.push_back(Index[j]);}
107  }
108 
109  // Check the location and record counts
110  std::size_t Nlocs = Index.size();
111  std::size_t Nrecs = std::set<std::size_t>(Recnums.begin(), Recnums.end()).size();
112  std::size_t NPatchLocs = PatchLocsThisPE.size();
113 
114  oops::Log::debug() << "Location Index: " << Index << std::endl;
115  oops::Log::debug() << "PatchLocsThisPE: " << PatchLocsThisPE << std::endl;
116  oops::Log::debug() << "Nlocs: " << Nlocs << " Nrecs: " << Nrecs
117  << " NPatchLocs" << NPatchLocs << std::endl;
118 
119  EXPECT_EQUAL(Nlocs, ExpectedNlocs);
120  EXPECT_EQUAL(Nrecs, ExpectedNrecs);
121  EXPECT_EQUAL(NPatchLocs, ExpectedNPatchLocs);
122 
123  // Check the resulting index and recnum vectors
124  EXPECT_EQUAL(Index, ExpectedIndex);
125  EXPECT_EQUAL(Recnums, ExpectedRecnums);
126  EXPECT_EQUAL(PatchLocsThisPE, ExpectedPatchIndex);
127 
128  // Test overloads of the allGatherv() method. We will pass to it vectors derived from the
129  // Index vector and compare the results against vectors derived from ExpectedAllGatherv.
130 
131  // Overload taking an std::vector<size_t>
132  {
133  const std::vector<std::size_t> ExpectedAllGathervSizeT = ExpectedAllGatherv;
134  std::vector<size_t> AllGathervSizeT = Index;
135  TestDist->allGatherv(AllGathervSizeT);
136  EXPECT_EQUAL(AllGathervSizeT, ExpectedAllGathervSizeT);
137 
138  // Take advantage of the output produced by allGatherv() to test
139  // globalUniqueConsecutiveLocationIndex(). This function is expected to map the index of each
140  // location held on the calling process to the index of the corresponding element of the vector
141  // produced by allGatherv().
142 
143  std::vector<size_t> ExpectedGlobalUniqueConsecutiveLocationIndices(Nlocs);
144  std::vector<size_t> GlobalUniqueConsecutiveLocationIndices(Nlocs);
145  for (size_t loc = 0; loc < Nlocs; ++loc) {
146  const std::vector<size_t>::iterator it = std::find(
147  AllGathervSizeT.begin(), AllGathervSizeT.end(), Index[loc]);
148  ASSERT(it != AllGathervSizeT.end());
149  ExpectedGlobalUniqueConsecutiveLocationIndices[loc] = it - AllGathervSizeT.begin();
150  GlobalUniqueConsecutiveLocationIndices[loc] =
152  }
153 
154  EXPECT_EQUAL(GlobalUniqueConsecutiveLocationIndices,
155  ExpectedGlobalUniqueConsecutiveLocationIndices);
156  }
157 
158  // Overload taking an std::vector<int>
159  {
160  std::vector<int> ExpectedAllGathervInt(ExpectedAllGatherv.begin(),
161  ExpectedAllGatherv.end());
162  std::vector<int> AllGathervInt(Index.begin(), Index.end());
163  TestDist->allGatherv(AllGathervInt);
164  EXPECT_EQUAL(AllGathervInt, ExpectedAllGathervInt);
165  }
166 
167  // Overload taking an std::vector<float>
168  {
169  std::vector<float> ExpectedAllGathervFloat(ExpectedAllGatherv.begin(),
170  ExpectedAllGatherv.end());
171  std::vector<float> AllGathervFloat(Index.begin(), Index.end());
172  TestDist->allGatherv(AllGathervFloat);
173  EXPECT_EQUAL(AllGathervFloat, ExpectedAllGathervFloat);
174  }
175 
176  // Overload taking an std::vector<double>
177  {
178  std::vector<double> ExpectedAllGathervDouble(ExpectedAllGatherv.begin(),
179  ExpectedAllGatherv.end());
180  std::vector<double> AllGathervDouble(Index.begin(), Index.end());
181  TestDist->allGatherv(AllGathervDouble);
182  EXPECT_EQUAL(AllGathervDouble, ExpectedAllGathervDouble);
183  }
184 
185  // Overload taking an std::vector<std::string>
186  {
187  auto numberToString = [](std::size_t x) { return std::to_string(x); };
188  std::vector<std::string> ExpectedAllGathervString;
189  std::transform(ExpectedAllGatherv.begin(), ExpectedAllGatherv.end(),
190  std::back_inserter(ExpectedAllGathervString), numberToString);
191  std::vector<std::string> AllGathervString;
192  std::transform(Index.begin(), Index.end(),
193  std::back_inserter(AllGathervString), numberToString);
194  TestDist->allGatherv(AllGathervString);
195  EXPECT_EQUAL(AllGathervString, ExpectedAllGathervString);
196  }
197 
198  // Overload taking an std::vector<util::DateTime>
199  {
200  auto numberToDateTime = [](std::size_t x) { return util::DateTime(2000, 1, 1, 0, 0, x); };
201  std::vector<util::DateTime> ExpectedAllGathervDateTime;
202  std::transform(ExpectedAllGatherv.begin(), ExpectedAllGatherv.end(),
203  std::back_inserter(ExpectedAllGathervDateTime), numberToDateTime);
204  std::vector<util::DateTime> AllGathervDateTime;
205  std::transform(Index.begin(), Index.end(),
206  std::back_inserter(AllGathervDateTime), numberToDateTime);
207  TestDist->allGatherv(AllGathervDateTime);
208  EXPECT_EQUAL(AllGathervDateTime, ExpectedAllGathervDateTime);
209  }
210 }
211 
212 // -----------------------------------------------------------------------------
213 
215  const eckit::LocalConfiguration conf(::test::TestEnvironment::config());
216  std::vector<eckit::LocalConfiguration> dist_types;
217  const eckit::mpi::Comm & MpiComm = oops::mpi::world();
218 
219  std::string TestDistType;
220  std::string DistName;
221  std::unique_ptr<ioda::Distribution> TestDist;
222 
223  std::size_t MyRank = MpiComm.rank();
224 
225  // Walk through the different distribution types and try constructing.
226  conf.get("distribution types", dist_types);
227  for (std::size_t i = 0; i < dist_types.size(); ++i) {
228  oops::Log::debug() << "Distribution::DistributionTypes: conf: "
229  << dist_types[i] << std::endl;
230 
231  TestDistType = dist_types[i].getString("distribution");
232  oops::Log::debug() << "Distribution::DistType: " << TestDistType << std::endl;
233 
234  // Expected results are listed in "specs.index" with the MPI rank number
235  // appended on the end.
236  std::string MyRankCfgName = "specs.rank" + std::to_string(MyRank);
237  eckit::LocalConfiguration MyRankConfig = dist_types[i].getSubConfiguration(MyRankCfgName);
238  oops::Log::debug() << "Distribution::DistributionTypes: "
239  << MyRankCfgName << ": " << MyRankConfig << std::endl;
240 
241  // read lat/lon
242  std::size_t Gnlocs = dist_types[i].getInt("specs.gnlocs");
243  std::vector<double> glats(Gnlocs, 0);
244  std::vector<double> glons(Gnlocs, 0);
245 
246  DistName = dist_types[i].getString("specs.name");
247  if (DistName == "Halo") {
248  glats = dist_types[i].getDoubleVector("specs.latitude");
249  glons = dist_types[i].getDoubleVector("specs.longitude");
250  }
251  MyRankConfig.set("distribution", DistName);
252  TestDist = DistributionFactory::create(MpiComm, MyRankConfig);
253  EXPECT(TestDist.get());
254 
255  // If obsgrouping is specified then read the record grouping directly from
256  // the config file. Otherwise, assign 0 to Gnlocs-1 into the record grouping
257  // vector.
258  std::vector<std::size_t> Groups(Gnlocs, 0);
259  if (dist_types[i].has("specs.obsgrouping")) {
260  Groups = dist_types[i].getUnsignedVector("specs.obsgrouping");
261  } else {
262  std::iota(Groups.begin(), Groups.end(), 0);
263  }
264 
265  // Loop on gnlocs, and keep the indecies according to the distribution type.
266  std::vector<std::size_t> Index;
267  std::vector<std::size_t> Recnums;
268  for (std::size_t j = 0; j < Gnlocs; ++j) {
269  std::size_t RecNum = Groups[j];
270  eckit::geometry::Point2 point(glons[j], glats[j]);
271  TestDist->assignRecord(RecNum, j, point);
272  if (TestDist->isMyRecord(RecNum)) {
273  Index.push_back(j);
274  Recnums.push_back(RecNum);
275  }
276  }
277  TestDist->computePatchLocs();
278 
279  testDistribution(dist_types[i], MyRankConfig, TestDist.get(), Index, Recnums);
280  } // loop distributions
281 } // testDistributionConstructedManually
282 
283 // -----------------------------------------------------------------------------
284 
285 // This test can be used to test distributions that cannot be constructed by the
286 // DistributionFactory, but need to be constructed by an ObsSpace. For example, the
287 // MasterAndReplicaDistribution.
289  const eckit::LocalConfiguration topLevelConf(::test::TestEnvironment::config());
290 
291  const util::DateTime winBegin(topLevelConf.getString("window begin"));
292  const util::DateTime winEnd(topLevelConf.getString("window end"));
293 
294  const eckit::mpi::Comm & MpiComm = oops::mpi::world();
295  const std::size_t MyRank = MpiComm.rank();
296 
297  const eckit::LocalConfiguration & obsConf = topLevelConf.getSubConfiguration("observations");
298 
299  for (const eckit::LocalConfiguration & conf : obsConf.getSubConfigurations()) {
300  eckit::LocalConfiguration obsspaceConf(conf, "obs space");
301  ioda::ObsSpace obsspace(obsspaceConf, MpiComm, winBegin, winEnd, oops::mpi::myself());
302 
303  // Expected results are listed in "specs.index" with the MPI rank number
304  // appended on the end.
305  std::string MyRankConfName = "specs.rank" + std::to_string(MyRank);
306  eckit::LocalConfiguration MyRankConf(conf, MyRankConfName);
307  oops::Log::debug() << "MyRankConf: "
308  << MyRankConfName << ": " << MyRankConf << std::endl;
309 
310  testDistribution(conf, MyRankConf, obsspace.distribution().get(),
311  obsspace.index(), obsspace.recnum());
312  }
313 }
314 
315 // -----------------------------------------------------------------------------
316 
317 class Distribution : public oops::Test {
318  public:
320  virtual ~Distribution() {}
321  private:
322  std::string testid() const override {return "test::Distribution";}
323 
324  void register_tests() const override {
325  std::vector<eckit::testing::Test>& ts = eckit::testing::specification();
326 
327  ts.emplace_back(CASE("distribution/Distribution/testConstructor")
328  { testConstructor(); });
329  ts.emplace_back(CASE("distribution/Distribution/testDistributionConstructedManually")
331  ts.emplace_back(CASE("distribution/Distribution/testDistributionConstructedByObsSpace")
333  }
334 
335  void clear() const override {}
336 };
337 
338 // -----------------------------------------------------------------------------
339 
340 } // namespace test
341 } // namespace ioda
342 
343 #endif // TEST_DISTRIBUTION_DISTRIBUTION_H_
Distribution factory.
static std::unique_ptr< Distribution > create(const eckit::mpi::Comm &comm, const eckit::Configuration &config)
Create a Distribution object implementing a particular method of distributing observations across mul...
class for distributing obs across multiple process elements
virtual void allGatherv(std::vector< size_t > &x) const =0
Gather observation data from all processes and deliver the combined data to all processes.
virtual void patchObs(std::vector< bool > &isPatchObs) const =0
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
virtual size_t globalUniqueConsecutiveLocationIndex(size_t loc) const =0
Map the index of a location held on the calling process to the index of the corresponding element of ...
Observation data class for IODA.
Definition: src/ObsSpace.h:116
const std::vector< std::size_t > & recnum() const
return reference to the record number vector
Definition: src/ObsSpace.h:222
const std::vector< std::size_t > & index() const
return reference to the index vector
Definition: src/ObsSpace.h:241
std::shared_ptr< const Distribution > distribution() const
return MPI distribution object
Definition: src/ObsSpace.h:335
std::string testid() const override
void testDistribution(const eckit::Configuration &config, const eckit::Configuration &MyRankConfig, const ioda::Distribution *TestDist, const std::vector< std::size_t > &Index, const std::vector< std::size_t > &Recnums)
CASE("Derived variable and unit conversion methods")