IODA
test/distribution/Distribution.h
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2009-2016 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #ifndef TEST_DISTRIBUTION_DISTRIBUTION_H_
12 #define TEST_DISTRIBUTION_DISTRIBUTION_H_
13 
14 #include <algorithm>
15 #include <cmath>
16 #include <memory>
17 #include <numeric>
18 #include <set>
19 #include <string>
20 #include <vector>
21 
22 #define ECKIT_TESTING_SELF_REGISTER_CASES 0
23 
24 #include <boost/noncopyable.hpp>
25 
26 #include "eckit/config/LocalConfiguration.h"
27 #include "eckit/geometry/Point2.h"
28 #include "eckit/mpi/Comm.h"
29 #include "eckit/testing/Test.h"
30 
31 #include "oops/mpi/mpi.h"
32 #include "oops/runs/Test.h"
33 #include "oops/test/TestEnvironment.h"
34 #include "oops/util/DateTime.h"
35 #include "oops/util/Logger.h"
36 
37 #include "ioda/distribution/Distribution.h"
38 #include "ioda/distribution/DistributionFactory.h"
39 #include "ioda/ObsSpace.h"
40 
41 namespace eckit
42 {
43  // Don't use the contracted output for these types: the current implementation works only
44  // with integer types.
45  // TODO(wsmigaj) Report this (especially for floats) as a bug in eckit?
46  template <> struct VectorPrintSelector<float> { typedef VectorPrintSimple selector; };
47  template <> struct VectorPrintSelector<util::DateTime> { typedef VectorPrintSimple selector; };
48  template <> struct VectorPrintSelector<util::Duration> { typedef VectorPrintSimple selector; };
49 } // namespace eckit
50 
51 namespace ioda {
52 namespace test {
53 
54 // -----------------------------------------------------------------------------
55 
57  const eckit::LocalConfiguration conf(::test::TestEnvironment::config());
58 
59  const eckit::mpi::Comm & MpiComm = oops::mpi::world();
60  const std::size_t MyRank = MpiComm.rank();
61 
62  // Walk through the different distribution types and try constructing.
63  std::vector<eckit::LocalConfiguration> dist_types;
64  conf.get("distribution types", dist_types);
65  for (std::size_t i = 0; i < dist_types.size(); ++i) {
66  oops::Log::debug() << "Distribution::DistributionTypes: conf: " << dist_types[i] << std::endl;
67 
68  const std::string MyRankCfgName = "specs.rank" + std::to_string(MyRank) + ".config";
69  eckit::LocalConfiguration DistConfig(dist_types[i], MyRankCfgName);
70  const std::string TestDistType = DistConfig.getString("distribution");
71  oops::Log::debug() << "Distribution::DistType: " << TestDistType << std::endl;
72 
73  std::unique_ptr<ioda::Distribution> TestDist = DistributionFactory::create(MpiComm, DistConfig);
74  EXPECT(TestDist.get());
75  }
76 }
77 
78 // -----------------------------------------------------------------------------
79 
80 void testDistribution(const eckit::Configuration &config,
81  const eckit::Configuration &MyRankConfig,
82  const ioda::Distribution *TestDist,
83  const std::vector<std::size_t> &Index,
84  const std::vector<std::size_t> &Recnums) {
85  // expected answers
86  std::size_t ExpectedNlocs = MyRankConfig.getUnsigned("nlocs");
87  std::size_t ExpectedNrecs = MyRankConfig.getUnsigned("nrecs");
88  std::size_t ExpectedNPatchLocs = MyRankConfig.getUnsigned("nPatchLocs");
89  std::vector<std::size_t> ExpectedIndex =
90  MyRankConfig.getUnsignedVector("index");
91  std::vector<std::size_t> ExpectedRecnums =
92  MyRankConfig.getUnsignedVector("recnums");
93  std::vector<std::size_t> ExpectedPatchIndex =
94  MyRankConfig.getUnsignedVector("patchIndex");
95  const std::vector<std::size_t> ExpectedAllGatherv =
96  config.getUnsignedVector("specs.allgatherv");
97 
98  std::vector<bool> patchBool(Index.size());
99  std::vector<std::size_t> PatchLocsThisPE;
100  TestDist->patchObs(patchBool);
101  for (std::size_t j = 0; j < Index.size(); ++j) {
102  if (patchBool[j]) {PatchLocsThisPE.push_back(Index[j]);}
103  }
104 
105  // Check the location and record counts
106  std::size_t Nlocs = Index.size();
107  std::size_t Nrecs = std::set<std::size_t>(Recnums.begin(), Recnums.end()).size();
108  std::size_t NPatchLocs = PatchLocsThisPE.size();
109 
110  oops::Log::debug() << "Location Index: " << Index << std::endl;
111  oops::Log::debug() << "PatchLocsThisPE: " << PatchLocsThisPE << std::endl;
112  oops::Log::debug() << "Nlocs: " << Nlocs << " Nrecs: " << Nrecs
113  << " NPatchLocs" << NPatchLocs << std::endl;
114 
115  EXPECT_EQUAL(Nlocs, ExpectedNlocs);
116  EXPECT_EQUAL(Nrecs, ExpectedNrecs);
117  EXPECT_EQUAL(NPatchLocs, ExpectedNPatchLocs);
118 
119  // Check the resulting index and recnum vectors
120  EXPECT_EQUAL(Index, ExpectedIndex);
121  EXPECT_EQUAL(Recnums, ExpectedRecnums);
122  EXPECT_EQUAL(PatchLocsThisPE, ExpectedPatchIndex);
123 
124  // Test overloads of the allGatherv() method. We will pass to it vectors derived from the
125  // Index vector and compare the results against vectors derived from ExpectedAllGatherv.
126 
127  // Overload taking an std::vector<size_t>
128  {
129  const std::vector<std::size_t> ExpectedAllGathervSizeT = ExpectedAllGatherv;
130  std::vector<size_t> AllGathervSizeT = Index;
131  TestDist->allGatherv(AllGathervSizeT);
132  EXPECT_EQUAL(AllGathervSizeT, ExpectedAllGathervSizeT);
133 
134  // Take advantage of the output produced by allGatherv() to test
135  // globalUniqueConsecutiveLocationIndex(). This function is expected to map the index of each
136  // location held on the calling process to the index of the corresponding element of the vector
137  // produced by allGatherv().
138 
139  std::vector<size_t> ExpectedGlobalUniqueConsecutiveLocationIndices(Nlocs);
140  std::vector<size_t> GlobalUniqueConsecutiveLocationIndices(Nlocs);
141  for (size_t loc = 0; loc < Nlocs; ++loc) {
142  const std::vector<size_t>::iterator it = std::find(
143  AllGathervSizeT.begin(), AllGathervSizeT.end(), Index[loc]);
144  ASSERT(it != AllGathervSizeT.end());
145  ExpectedGlobalUniqueConsecutiveLocationIndices[loc] = it - AllGathervSizeT.begin();
146  GlobalUniqueConsecutiveLocationIndices[loc] =
148  }
149 
150  EXPECT_EQUAL(GlobalUniqueConsecutiveLocationIndices,
151  ExpectedGlobalUniqueConsecutiveLocationIndices);
152  }
153 
154  // Overload taking an std::vector<int>
155  {
156  std::vector<int> ExpectedAllGathervInt(ExpectedAllGatherv.begin(),
157  ExpectedAllGatherv.end());
158  std::vector<int> AllGathervInt(Index.begin(), Index.end());
159  TestDist->allGatherv(AllGathervInt);
160  EXPECT_EQUAL(AllGathervInt, ExpectedAllGathervInt);
161  }
162 
163  // Overload taking an std::vector<float>
164  {
165  std::vector<float> ExpectedAllGathervFloat(ExpectedAllGatherv.begin(),
166  ExpectedAllGatherv.end());
167  std::vector<float> AllGathervFloat(Index.begin(), Index.end());
168  TestDist->allGatherv(AllGathervFloat);
169  EXPECT_EQUAL(AllGathervFloat, ExpectedAllGathervFloat);
170  }
171 
172  // Overload taking an std::vector<double>
173  {
174  std::vector<double> ExpectedAllGathervDouble(ExpectedAllGatherv.begin(),
175  ExpectedAllGatherv.end());
176  std::vector<double> AllGathervDouble(Index.begin(), Index.end());
177  TestDist->allGatherv(AllGathervDouble);
178  EXPECT_EQUAL(AllGathervDouble, ExpectedAllGathervDouble);
179  }
180 
181  // Overload taking an std::vector<std::string>
182  {
183  auto numberToString = [](std::size_t x) { return std::to_string(x); };
184  std::vector<std::string> ExpectedAllGathervString;
185  std::transform(ExpectedAllGatherv.begin(), ExpectedAllGatherv.end(),
186  std::back_inserter(ExpectedAllGathervString), numberToString);
187  std::vector<std::string> AllGathervString;
188  std::transform(Index.begin(), Index.end(),
189  std::back_inserter(AllGathervString), numberToString);
190  TestDist->allGatherv(AllGathervString);
191  EXPECT_EQUAL(AllGathervString, ExpectedAllGathervString);
192  }
193 
194  // Overload taking an std::vector<util::DateTime>
195  {
196  auto numberToDateTime = [](std::size_t x) { return util::DateTime(2000, 1, 1, 0, 0, x); };
197  std::vector<util::DateTime> ExpectedAllGathervDateTime;
198  std::transform(ExpectedAllGatherv.begin(), ExpectedAllGatherv.end(),
199  std::back_inserter(ExpectedAllGathervDateTime), numberToDateTime);
200  std::vector<util::DateTime> AllGathervDateTime;
201  std::transform(Index.begin(), Index.end(),
202  std::back_inserter(AllGathervDateTime), numberToDateTime);
203  TestDist->allGatherv(AllGathervDateTime);
204  EXPECT_EQUAL(AllGathervDateTime, ExpectedAllGathervDateTime);
205  }
206 }
207 
208 // -----------------------------------------------------------------------------
209 
211  const eckit::LocalConfiguration conf(::test::TestEnvironment::config());
212 
213  const eckit::mpi::Comm & MpiComm = oops::mpi::world();
214  const std::size_t MyRank = MpiComm.rank();
215 
216  std::string TestDistType;
217  std::string DistName;
218  std::unique_ptr<ioda::Distribution> TestDist;
219 
220  // Walk through the different distribution types and try constructing.
221  std::vector<eckit::LocalConfiguration> dist_types;
222  conf.get("distribution types", dist_types);
223  for (std::size_t i = 0; i < dist_types.size(); ++i) {
224  oops::Log::debug() << "Distribution::DistributionTypes: conf: "
225  << dist_types[i] << std::endl;
226 
227  // Expected results are listed in "specs.rank*", where * stands for the MPI rank number
228  const std::string MyRankCfgName = "specs.rank" + std::to_string(MyRank);
229  const eckit::LocalConfiguration MyRankConfig = dist_types[i].getSubConfiguration(MyRankCfgName);
230  oops::Log::debug() << "Distribution::DistributionTypes: "
231  << MyRankCfgName << ": " << MyRankConfig << std::endl;
232 
233  const eckit::LocalConfiguration DistConfig(MyRankConfig, "config");
234  const std::string DistName = DistConfig.getString("distribution");
235  oops::Log::debug() << "Distribution::DistType: " << DistName << std::endl;
236 
237  std::unique_ptr<ioda::Distribution> TestDist = DistributionFactory::create(MpiComm, DistConfig);
238  EXPECT(TestDist.get());
239 
240  // read lat/lon
241  std::size_t Gnlocs = dist_types[i].getInt("specs.gnlocs");
242  std::vector<double> glats(Gnlocs, 0);
243  std::vector<double> glons(Gnlocs, 0);
244 
245  dist_types[i].get("specs.latitude", glats);
246  dist_types[i].get("specs.longitude", glons);
247 
248  // If obsgrouping is specified then read the record grouping directly from
249  // the config file. Otherwise, assign 0 to Gnlocs-1 into the record grouping
250  // vector.
251  std::vector<std::size_t> Groups(Gnlocs, 0);
252  if (dist_types[i].has("specs.obsgrouping")) {
253  Groups = dist_types[i].getUnsignedVector("specs.obsgrouping");
254  } else {
255  std::iota(Groups.begin(), Groups.end(), 0);
256  }
257 
258  // Loop on gnlocs, and keep the indecies according to the distribution type.
259  std::vector<std::size_t> Index;
260  std::vector<std::size_t> Recnums;
261  for (std::size_t j = 0; j < Gnlocs; ++j) {
262  std::size_t RecNum = Groups[j];
263  eckit::geometry::Point2 point(glons[j], glats[j]);
264  TestDist->assignRecord(RecNum, j, point);
265  if (TestDist->isMyRecord(RecNum)) {
266  Index.push_back(j);
267  Recnums.push_back(RecNum);
268  }
269  }
270  TestDist->computePatchLocs();
271 
272  testDistribution(dist_types[i], MyRankConfig, TestDist.get(), Index, Recnums);
273  } // loop distributions
274 } // testDistributionConstructedManually
275 
276 // -----------------------------------------------------------------------------
277 
278 // This test can be used to test distributions that cannot be constructed by the
279 // DistributionFactory, but need to be constructed by an ObsSpace. For example, the
280 // MasterAndReplicaDistribution.
282  const eckit::LocalConfiguration topLevelConf(::test::TestEnvironment::config());
283 
284  const util::DateTime winBegin(topLevelConf.getString("window begin"));
285  const util::DateTime winEnd(topLevelConf.getString("window end"));
286 
287  const eckit::mpi::Comm & MpiComm = oops::mpi::world();
288  const std::size_t MyRank = MpiComm.rank();
289 
290  const eckit::LocalConfiguration & obsConf = topLevelConf.getSubConfiguration("observations");
291 
292  for (const eckit::LocalConfiguration & conf : obsConf.getSubConfigurations()) {
293  eckit::LocalConfiguration obsspaceConf(conf, "obs space");
294  ioda::ObsTopLevelParameters obsParams;
295  obsParams.validateAndDeserialize(obsspaceConf);
296  ioda::ObsSpace obsspace(obsParams, MpiComm, winBegin, winEnd, oops::mpi::myself());
297 
298  // Expected results are listed in "specs.index" with the MPI rank number
299  // appended on the end.
300  std::string MyRankConfName = "specs.rank" + std::to_string(MyRank);
301  eckit::LocalConfiguration MyRankConf(conf, MyRankConfName);
302  oops::Log::debug() << "MyRankConf: "
303  << MyRankConfName << ": " << MyRankConf << std::endl;
304 
305  testDistribution(conf, MyRankConf, obsspace.distribution().get(),
306  obsspace.index(), obsspace.recnum());
307  }
308 }
309 
310 // -----------------------------------------------------------------------------
311 
312 class Distribution : public oops::Test {
313  public:
315  virtual ~Distribution() {}
316  private:
317  std::string testid() const override {return "test::Distribution";}
318 
319  void register_tests() const override {
320  std::vector<eckit::testing::Test>& ts = eckit::testing::specification();
321 
322  ts.emplace_back(CASE("distribution/Distribution/testConstructor")
323  { testConstructor(); });
324  ts.emplace_back(CASE("distribution/Distribution/testDistributionConstructedManually")
326  ts.emplace_back(CASE("distribution/Distribution/testDistributionConstructedByObsSpace")
328  }
329 
330  void clear() const override {}
331 };
332 
333 // -----------------------------------------------------------------------------
334 
335 } // namespace test
336 } // namespace ioda
337 
338 #endif // TEST_DISTRIBUTION_DISTRIBUTION_H_
static std::unique_ptr< Distribution > create(const eckit::mpi::Comm &comm, const eckit::Configuration &config)
Create a Distribution object implementing a particular method of distributing observations across mul...
class for distributing obs across multiple process elements
virtual void allGatherv(std::vector< size_t > &x) const =0
Gather observation data from all processes and deliver the combined data to all processes.
virtual void patchObs(std::vector< bool > &isPatchObs) const =0
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
virtual size_t globalUniqueConsecutiveLocationIndex(size_t loc) const =0
Map the index of a location held on the calling process to the index of the corresponding element of ...
std::string testid() const override
CASE("Derived variable, unit conversion, and exception checking methods")
void testDistribution(const eckit::Configuration &config, const eckit::Configuration &MyRankConfig, const ioda::Distribution *TestDist, const std::vector< std::size_t > &Index, const std::vector< std::size_t > &Recnums)