UFO
ParallelPoissonDiskThinning.h
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2020 Met Office UK
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  */
7 
8 #ifndef TEST_UFO_PARALLELPOISSONDISKTHINNING_H_
9 #define TEST_UFO_PARALLELPOISSONDISKTHINNING_H_
10 
11 #include <algorithm>
12 #include <limits>
13 #include <memory>
14 #include <set>
15 #include <string>
16 #include <thread>
17 #include <vector>
18 
19 #define ECKIT_TESTING_SELF_REGISTER_CASES 0
20 
21 #include "eckit/config/LocalConfiguration.h"
22 #include "eckit/testing/Test.h"
23 #include "ioda/ObsSpace.h"
24 #include "ioda/ObsVector.h"
25 #include "oops/mpi/mpi.h"
26 #include "oops/runs/Test.h"
27 #include "oops/util/AssociativeContainers.h"
28 #include "oops/util/Expect.h"
29 #include "test/TestEnvironment.h"
31 #include "ufo/filters/Variables.h"
32 #include "ufo/utils/StringUtils.h" // for splitVarGroup
33 
34 namespace eckit
35 {
36  // Don't use the contracted output for these types: the current implementation works only
37  // with integer types.
38  template <> struct VectorPrintSelector<float> { typedef VectorPrintSimple selector; };
39 } // namespace eckit
40 
41 namespace ufo {
42 namespace test {
43 
44 void testPoissonDiskThinning(const eckit::LocalConfiguration &conf,
45  bool expectValidationError = false) {
46  util::DateTime bgn(conf.getString("window begin"));
47  util::DateTime end(conf.getString("window end"));
48 
49  const eckit::LocalConfiguration obsSpaceConf(conf, "obs space");
50  ioda::ObsTopLevelParameters obsParams;
51  obsParams.validateAndDeserialize(obsSpaceConf);
52  ioda::ObsSpace obsspace(obsParams, oops::mpi::world(), bgn, end, oops::mpi::myself());
53 
54  std::shared_ptr<ioda::ObsDataVector<float>> obserr(new ioda::ObsDataVector<float>(
55  obsspace, obsspace.obsvariables(), "ObsError"));
56  std::shared_ptr<ioda::ObsDataVector<int>> qcflags(new ioda::ObsDataVector<int>(
57  obsspace, obsspace.obsvariables()));
58 
59  eckit::LocalConfiguration filterConf(conf, "Poisson Disk Thinning");
60  ufo::PoissonDiskThinningParameters filterParameters;
61  filterParameters.validateAndDeserialize(filterConf);
62  ufo::PoissonDiskThinning filter(obsspace, filterParameters, qcflags, obserr);
63 
64  // Stagger the start of the thinning process on different ranks to detect problems related to
65  // initialisation of the random generator seed from system time.
66  std::this_thread::sleep_for(obsspace.comm().rank() * std::chrono::milliseconds(1100));
67 
68  filter.preProcess();
69 
70  std::vector<int> isObsRetained(qcflags->nlocs(), 0);
71  for (size_t i = 0; i < qcflags->nlocs(); ++i)
72  isObsRetained[i] = ((*qcflags)[0][i] == ufo::QCflags::pass);
73 
74  if (obsSpaceConf.getString("distribution", "RoundRobin") == "InefficientDistribution") {
75  // PART 1: Verify that all ranks have retained the same observations.
76  const size_t rootRank = 0;
77  size_t isObsRetainedSizeOnRoot = isObsRetained.size();
78  obsspace.comm().broadcast(isObsRetainedSizeOnRoot, rootRank);
79 
80  std::vector<int> isObsRetainedOnRoot(isObsRetainedSizeOnRoot);
81  if (obsspace.comm().rank() == rootRank) {
82  isObsRetainedOnRoot = isObsRetained;
83  }
84  obsspace.comm().broadcast(isObsRetainedOnRoot, rootRank);
85 
86  EXPECT_EQUAL(isObsRetained, isObsRetainedOnRoot);
87  }
88 
89  // PART 2: Verify that all retained observations are sufficiently far from each other
90  // and that all rejected observations so close to a retained observation that they can't
91  // themselves be retained.
92 
93  // Collect status of observations on all processes
94  obsspace.distribution()->allGatherv(isObsRetained);
95  std::set<size_t> retainedGlobalObsIndices;
96  for (size_t i = 0; i < isObsRetained.size(); ++i)
97  if (isObsRetained[i])
98  retainedGlobalObsIndices.insert(i);
99 
100  // Collect pressures from all processes
101  std::vector<float> pressures(obsspace.nlocs());
102  obsspace.get_db("MetaData", "air_pressure", pressures);
103  obsspace.distribution()->allGatherv(pressures);
104 
105  // Collect categories from all processes
106  std::vector<int> categories(obsspace.nlocs(), 0);
107  if (filterConf.has("category_variable"))
108  {
109  std::string name, group;
110  splitVarGroup(filterConf.getString("category_variable.name"), name, group);
111  obsspace.get_db(group, name, categories);
112  }
113  obsspace.distribution()->allGatherv(categories);
114 
115  // Check distances between observations
116  const float minAllowedDistance = filterConf.getFloat("min_vertical_spacing");
117  std::vector<float> minDistanceToRetainedObs(pressures.size(),
118  std::numeric_limits<float>::max());
119  for (size_t i : retainedGlobalObsIndices) {
120  for (size_t j = 0; j < pressures.size(); ++j) {
121  if (j == i || categories[i] != categories[j])
122  continue;
123  const float distance = std::abs(pressures[i] - pressures[j]);
124  if (oops::contains(retainedGlobalObsIndices, j)) {
125  // Retained observations should be far from other retained observation
126  EXPECT(distance >= minAllowedDistance);
127  } else {
128  minDistanceToRetainedObs[j] = std::min(minDistanceToRetainedObs[j], distance);
129  }
130  }
131  }
132 
133  for (size_t j = 0; j < pressures.size(); ++j) {
134  if (!oops::contains(retainedGlobalObsIndices, j)) {
135  // Rejected observations should be close to some retained observation
136  EXPECT(minDistanceToRetainedObs[j] < minAllowedDistance);
137  }
138  }
139 }
140 
141 class ParallelPoissonDiskThinning : public oops::Test {
142  private:
143  std::string testid() const override {return "ufo::test::ParallelPoissonDiskThinning";}
144 
145  void register_tests() const override {
146  std::vector<eckit::testing::Test>& ts = eckit::testing::specification();
147 
148  const eckit::LocalConfiguration conf(::test::TestEnvironment::config());
149  for (const std::string & testCaseName : conf.keys())
150  {
151  const eckit::LocalConfiguration testCaseConf(::test::TestEnvironment::config(), testCaseName);
152  ts.emplace_back(CASE("ufo/ParallelPoissonDiskThinning/" + testCaseName, testCaseConf)
153  {
154  testPoissonDiskThinning(testCaseConf);
155  });
156  }
157  }
158 
159  void clear() const override {}
160 };
161 
162 } // namespace test
163 } // namespace ufo
164 
165 #endif // TEST_UFO_PARALLELPOISSONDISKTHINNING_H_
void preProcess() override
Thins observations by iterating over them in random order and retaining each observation lying outsid...
Options controlling the operation of the PoissonDiskThinning filter.
Forward declarations.
Definition: ObsAodExt.h:21
constexpr int pass
Definition: QCflags.h:14
float distance(const Point &a, const Point &b)
Returns the distance between the two cartesian-mapped Point arguments
CASE("ufo/DataExtractor/bilinearinterp/float_linear")
void testPoissonDiskThinning(const eckit::LocalConfiguration &conf, bool expectValidationError=false)
bool contains(const std::set< T > &set, const T &element)
Definition: RunCRTM.h:27
void splitVarGroup(const std::string &vargrp, std::string &var, std::string &grp)
Definition: StringUtils.cc:27
util::Duration abs(const util::Duration &duration)