IODA
Halo.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2017-2019 UCAR
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  */
7 
8 #include "ioda/distribution/Halo.h"
9 
10 #include <algorithm>
11 #include <iostream>
12 #include <numeric>
13 #include <set>
14 #include <vector>
15 
16 #include <boost/make_unique.hpp>
17 
18 #include "oops/mpi/mpi.h"
19 #include "oops/util/DateTime.h"
20 #include "oops/util/Logger.h"
21 
22 #include "ioda/distribution/DistributionFactory.h"
23 #include "ioda/distribution/GeneralDistributionAccumulator.h"
24 #include "eckit/config/LocalConfiguration.h"
25 #include "eckit/exception/Exceptions.h"
26 
27 namespace ioda {
28 
29 // -----------------------------------------------------------------------------
31 
32 // -----------------------------------------------------------------------------
33 /*!
34  * \brief Halo selector
35  *
36  * \details This distribution puts observations within a halo on the same
37  * processor
38  *
39  *
40  */
41 // -----------------------------------------------------------------------------
42 Halo::Halo(const eckit::mpi::Comm & Comm,
43  const eckit::Configuration & config) :
44  Distribution(Comm) {
45  // extract center point from configuration
46  // if no patch center defined, distribute centers equi-distant along the equator
47  std::vector<double> centerd(2, 0.0);
48  if (config.has("center")) {
49  centerd = config.getDoubleVector("center", centerd);
50  } else {
51  centerd[0] = static_cast<double>(comm_.rank())*
52  (360.0/static_cast<double>(comm_.size()));
53  centerd[1] = 0.0;
54  }
55  eckit::geometry::Point2 center(centerd[0], centerd[1]);
56  center_ = center;
57 
58  // assign radius that is a sum of the patch and localization radii
59  // (1) this radius is the radius of the "patch"
60  // if not specified, use radius big enough to encompass all obs. on Earth
61  double radius = config.getDouble("radius", 50000000.0);
62 
63  // (2) add localization radius (i.e. the "halo" radius)
64  double locRadius = config.getDouble("obs localization.lengthscale", 0.0);
65  radius += locRadius;
66 
67  radius_ = radius;
68 
69  oops::Log::debug() << "Halo constructed: center: " << center_ << " radius: "
70  << radius_ << std::endl;
71 }
72 
73 // -----------------------------------------------------------------------------
75  oops::Log::trace() << "Halo destructed" << std::endl;
76 }
77 
78 // -----------------------------------------------------------------------------
79 void Halo::assignRecord(const std::size_t RecNum, const std::size_t LocNum,
80  const eckit::geometry::Point2 & point) {
81  if (recordsOutsideHalo_.find(RecNum) != recordsOutsideHalo_.end()) {
82  // We've already seen the first location in this record, and it was too far away from center_.
83  return;
84  }
85 
86  if (recordsInHalo_.find(RecNum) == recordsInHalo_.end()) {
87  // This is the first location from this record. Find out whether to assign it to this PE.
88 
89  const double dist = eckit::geometry::Sphere::distance(radius_earth_, center_, point);
90  oops::Log::debug() << "Point: " << point << " distance to center: " << center_
91  << " = " << dist << std::endl;
92  if (dist <= radius_) {
93  // Yes!
94  recordsInHalo_.insert(RecNum);
95  recordDistancesFromCenter_[RecNum] = dist;
96  } else {
97  // No, it's too far from center_.
98  recordsOutsideHalo_.insert(RecNum);
99  return;
100  }
101  }
102 
103  // Now we know this record has been assigned to this PE. Store information about location LocNum.
104  haloLocVector_.push_back(LocNum);
105  haloLocRecords_.push_back(RecNum);
106 }
107 
108 // -----------------------------------------------------------------------------
109 bool Halo::isMyRecord(std::size_t RecNum) const {
110  return (recordsInHalo_.count(RecNum) > 0);
111 }
112 
113 // -----------------------------------------------------------------------------
115  // define some constants for this PE
116  double inf = std::numeric_limits<double>::infinity();
117  size_t myRank = comm_.rank();
118 
119  // All records have now been assigned, so this container is no longer needed.
120  recordsOutsideHalo_.clear();
121 
122  // Find the maximum global location index (plus 1)
123  size_t nglocs = 0;
124  if (!haloLocVector_.empty())
125  nglocs = haloLocVector_.back() + 1;
126  comm_.allReduceInPlace(nglocs, eckit::mpi::max());
127 
128  if ( nglocs > 0 ) {
129  // make structures holding pairs of {distance,rank} for reduce operation later
130  // initialize the local array to dist == inf
131  std::vector<std::pair<double, int>> dist_and_lidx_loc(nglocs);
132  std::vector<std::pair<double, int>> dist_and_lidx_glb(nglocs);
133  for ( size_t jj = 0; jj < nglocs; ++jj ) {
134  dist_and_lidx_loc[jj] = std::make_pair(inf, myRank);
135  }
136 
137  // assign actual distances to local obs (stored in haloLocVector_)
138  for (size_t loc = 0; loc < haloLocVector_.size(); ++loc) {
139  const size_t gloc = haloLocVector_[loc];
140  const size_t recNum = haloLocRecords_[loc];
141  const double dist = recordDistancesFromCenter_.at(recNum);
142  dist_and_lidx_loc[gloc] = std::make_pair(dist, myRank);
143  }
144 
145  // use reduce operation to find PE rank with minimal distance
146  comm_.allReduce(dist_and_lidx_loc, dist_and_lidx_glb, eckit::mpi::minloc());
147 
148  // ids of patch observations owned by this PE
149  std::unordered_set<std::size_t> patchObsLoc;
150 
151  // if this PE has the minimum distance then this PE owns this ob. as patch
152  for (auto i : haloLocVector_) {
153  if ( dist_and_lidx_glb[i].second == myRank ) {
154  patchObsLoc.insert(i);
155  }
156  }
157 
158  // convert storage from unodered sets to a bool vector
159  for (size_t jj = 0; jj < haloLocVector_.size(); ++jj) {
160  if ( patchObsLoc.count(haloLocVector_[jj]) ) {
161  patchObsBool_.push_back(true);
162  } else {
163  patchObsBool_.push_back(false);
164  }
165  }
166 
167  // now that we have patchObsBool_ computed we can free memory occupied by some temp objects
169  haloLocRecords_.clear();
170  haloLocRecords_.shrink_to_fit();
171 
172  computeGlobalUniqueConsecutiveLocIndices(dist_and_lidx_glb);
173 
174  // and now the remaining temp object
175  haloLocVector_.clear();
176  haloLocVector_.shrink_to_fit();
177  }
178 }
179 
180 // -----------------------------------------------------------------------------
182  const std::vector<std::pair<double, int>> &dist_and_lidx_glb) {
183  const double inf = std::numeric_limits<double>::infinity();
184 
186 
187  // Step 0: enable quick checks of whether a location belongs to this rank's halo.
188  std::unordered_set<size_t> haloLocSet(haloLocVector_.begin(), haloLocVector_.end());
189 
190  // Step 1: index patch observations owned by each rank consecutively (starting from 0 on each
191  // rank). For each observation i held on this rank, set globalConsecutiveLocIndices_[i] to
192  // the index of the corresponding patch observation on the rank that owns it.
193  std::vector<size_t> patchObsCountOnRank(comm_.size(), 0);
194  for (size_t gloc = 0, nglocs = dist_and_lidx_glb.size(); gloc < nglocs; ++gloc) {
195  if (dist_and_lidx_glb[gloc].first < inf) {
196  // This obs is held on at least one rank (this won't be the case e.g. for observations
197  // outside the assimilation window)
198  const size_t rankOwningPatchObs = dist_and_lidx_glb[gloc].second;
199  if (haloLocSet.find(gloc) != haloLocSet.end()) {
200  // This obs is held on the current PE (but not necessarily as a patch obs)
201  globalUniqueConsecutiveLocIndices_.push_back(patchObsCountOnRank[rankOwningPatchObs]);
202  }
203  ++patchObsCountOnRank[rankOwningPatchObs];
204  }
205  }
206 
207  // Step 2: make indices of patch observations globally unique by incrementing the index
208  // of each patch observation held by rank r by the total number of patch observations owned
209  // by ranks r' < r.
210 
211  // Perform an exclusive scan
212  std::vector<size_t> patchObsCountOnPreviousRanks(patchObsCountOnRank.size(), 0);
213  for (size_t rank = 1; rank < patchObsCountOnRank.size(); ++rank) {
214  patchObsCountOnPreviousRanks[rank] =
215  patchObsCountOnPreviousRanks[rank - 1] + patchObsCountOnRank[rank - 1];
216  }
217 
218  // Increment patch observation indices
219  for (size_t loc = 0; loc < globalUniqueConsecutiveLocIndices_.size(); ++loc) {
220  const size_t gloc = haloLocVector_[loc];
221  const size_t rankOwningPatchObs = dist_and_lidx_glb[gloc].second;
222  globalUniqueConsecutiveLocIndices_[loc] += patchObsCountOnPreviousRanks[rankOwningPatchObs];
223  }
224 }
225 
226 // -----------------------------------------------------------------------------
227 void Halo::patchObs(std::vector<bool> & patchObsVec) const {
228  patchObsVec = patchObsBool_;
229 }
230 
231 // -----------------------------------------------------------------------------
232 void Halo::min(int & x) const {
233  minImpl(x);
234 }
235 
236 void Halo::min(std::size_t & x) const {
237  minImpl(x);
238 }
239 
240 void Halo::min(float & x) const {
241  minImpl(x);
242 }
243 
244 void Halo::min(double & x) const {
245  minImpl(x);
246 }
247 
248 void Halo::min(std::vector<int> & x) const {
249  minImpl(x);
250 }
251 
252 void Halo::min(std::vector<std::size_t> & x) const {
253  minImpl(x);
254 }
255 
256 void Halo::min(std::vector<float> & x) const {
257  minImpl(x);
258 }
259 
260 void Halo::min(std::vector<double> & x) const {
261  minImpl(x);
262 }
263 
264 template <typename T>
265 void Halo::minImpl(T & x) const {
266  reductionImpl(x, eckit::mpi::min());
267 }
268 
269 // -----------------------------------------------------------------------------
270 void Halo::max(int & x) const {
271  maxImpl(x);
272 }
273 
274 void Halo::max(std::size_t & x) const {
275  maxImpl(x);
276 }
277 
278 void Halo::max(float & x) const {
279  maxImpl(x);
280 }
281 
282 void Halo::max(double & x) const {
283  maxImpl(x);
284 }
285 
286 void Halo::max(std::vector<int> & x) const {
287  maxImpl(x);
288 }
289 
290 void Halo::max(std::vector<std::size_t> & x) const {
291  maxImpl(x);
292 }
293 
294 void Halo::max(std::vector<float> & x) const {
295  maxImpl(x);
296 }
297 
298 void Halo::max(std::vector<double> & x) const {
299  maxImpl(x);
300 }
301 
302 template <typename T>
303 void Halo::maxImpl(T & x) const {
304  reductionImpl(x, eckit::mpi::max());
305 }
306 
307 // -----------------------------------------------------------------------------
308 template <typename T>
309 void Halo::reductionImpl(T & x, eckit::mpi::Operation::Code op) const {
310  comm_.allReduceInPlace(x, op);
311 }
312 
313 template <typename T>
314 void Halo::reductionImpl(std::vector<T> & x, eckit::mpi::Operation::Code op) const {
315  comm_.allReduceInPlace(x.begin(), x.end(), op);
316 }
317 
318 // -----------------------------------------------------------------------------
319 std::unique_ptr<Accumulator<int>>
321  return createAccumulatorImplT(init);
322 }
323 
324 std::unique_ptr<Accumulator<std::size_t>>
325 Halo::createAccumulatorImpl(std::size_t init) const {
326  return createAccumulatorImplT(init);
327 }
328 
329 std::unique_ptr<Accumulator<float>>
330 Halo::createAccumulatorImpl(float init) const {
331  return createAccumulatorImplT(init);
332 }
333 
334 std::unique_ptr<Accumulator<double>>
335 Halo::createAccumulatorImpl(double init) const {
336  return createAccumulatorImplT(init);
337 }
338 
339 std::unique_ptr<Accumulator<std::vector<int>>>
340 Halo::createAccumulatorImpl(const std::vector<int> &init) const {
341  return createAccumulatorImplT(init);
342 }
343 
344 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
345 Halo::createAccumulatorImpl(const std::vector<std::size_t> &init) const {
346  return createAccumulatorImplT(init);
347 }
348 
349 std::unique_ptr<Accumulator<std::vector<float>>>
350 Halo::createAccumulatorImpl(const std::vector<float> &init) const {
351  return createAccumulatorImplT(init);
352 }
353 
354 std::unique_ptr<Accumulator<std::vector<double>>>
355 Halo::createAccumulatorImpl(const std::vector<double> &init) const {
356  return createAccumulatorImplT(init);
357 }
358 
359 template <typename T>
360 std::unique_ptr<Accumulator<T>>
361 Halo::createAccumulatorImplT(const T &init) const {
362  return boost::make_unique<GeneralDistributionAccumulator<T>>(init, comm_, patchObsBool_);
363 }
364 
365 // -----------------------------------------------------------------------------
366 void Halo::allGatherv(std::vector<size_t> &x) const {
367  allGathervImpl(x);
368 }
369 
370 void Halo::allGatherv(std::vector<int> &x) const {
371  allGathervImpl(x);
372 }
373 
374 void Halo::allGatherv(std::vector<float> &x) const {
375  allGathervImpl(x);
376 }
377 
378 void Halo::allGatherv(std::vector<double> &x) const {
379  allGathervImpl(x);
380 }
381 
382 /// overloading for util::DateTime
383 void Halo::allGatherv(std::vector<util::DateTime> &x) const {
384  allGathervImpl(x);
385 }
386 
387 /// overloading for std::string
388 void Halo::allGatherv(std::vector<std::string> &x) const {
389  allGathervImpl(x);
390 }
391 
392 template <typename T>
393 void Halo::allGathervImpl(std::vector<T> &x) const {
394  // operation is only well defined if size x is the size of local obs
395  ASSERT(x.size() == patchObsBool_.size());
396 
397  // make a temp vector that only holds patch obs.
398  std::vector<T> xtmp(std::count(patchObsBool_.begin(), patchObsBool_.end(), true));
399  size_t counter = 0;
400  for (size_t ii = 0; ii < x.size(); ++ii) {
401  if ( patchObsBool_[ii] ) {
402  xtmp[counter] = x[ii];
403  ++counter;
404  }
405  }
406  // gather all patch obs into a single vector
407  oops::mpi::allGatherv(comm_, xtmp);
408 
409  // return the result
410  x = xtmp;
411 }
412 
413 // -----------------------------------------------------------------------------
414 
417 }
418 
419 // -----------------------------------------------------------------------------
420 
421 } // namespace ioda
class for distributing obs across multiple process elements
const eckit::mpi::Comm & comm_
Local MPI communicator.
size_t rank() const
Accessor to MPI rank.
A class able to instantiate objects of type T, which should be a subclass of Distribution.
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
Definition: Halo.cc:320
void computeGlobalUniqueConsecutiveLocIndices(const std::vector< std::pair< double, int >> &dist_and_lidx_glb)
Definition: Halo.cc:181
std::vector< bool > patchObsBool_
Definition: Halo.h:119
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
Definition: Halo.cc:232
std::vector< size_t > haloLocRecords_
Definition: Halo.h:133
void computePatchLocs() override
If necessary, identifies locations of "patch obs", i.e.
Definition: Halo.cc:114
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
Definition: Halo.cc:109
const double radius_earth_
Definition: Halo.h:138
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
Definition: Halo.cc:366
std::unordered_set< std::size_t > recordsInHalo_
Definition: Halo.h:117
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
Definition: Halo.cc:415
void reductionImpl(T &x, eckit::mpi::Operation::Code op) const
Definition: Halo.cc:309
std::unordered_set< std::size_t > recordsOutsideHalo_
Definition: Halo.h:128
void minImpl(T &x) const
Definition: Halo.cc:265
~Halo()
Definition: Halo.cc:74
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
Definition: Halo.cc:227
std::unique_ptr< Accumulator< T > > createAccumulatorImplT(const T &init) const
Definition: Halo.cc:361
std::unordered_map< std::size_t, double > recordDistancesFromCenter_
Definition: Halo.h:131
double radius_
Definition: Halo.h:114
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.
Definition: Halo.cc:270
void allGathervImpl(std::vector< T > &x) const
Definition: Halo.cc:393
std::vector< size_t > haloLocVector_
Definition: Halo.h:135
Halo(const eckit::mpi::Comm &Comm, const eckit::Configuration &config)
Halo selector.
Definition: Halo.cc:42
eckit::geometry::Point2 center_
Definition: Halo.h:115
void maxImpl(T &x) const
Definition: Halo.cc:303
std::vector< size_t > globalUniqueConsecutiveLocIndices_
Definition: Halo.h:122
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
If the record RecNum has not yet been assigned to a PE, assigns it to the appropriate PE.
Definition: Halo.cc:79
static DistributionMaker< Halo > maker("Halo")