IODA
Halo.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2017-2019 UCAR
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  */
7 
8 #include "ioda/distribution/Halo.h"
9 
10 #include <algorithm>
11 #include <iostream>
12 #include <numeric>
13 #include <set>
14 #include <vector>
15 
16 #include <boost/make_unique.hpp>
17 
18 #include "oops/mpi/mpi.h"
19 #include "oops/util/DateTime.h"
20 #include "oops/util/Logger.h"
21 
22 #include "ioda/distribution/DistributionFactory.h"
23 #include "ioda/distribution/GeneralDistributionAccumulator.h"
24 #include "eckit/config/LocalConfiguration.h"
25 #include "eckit/exception/Exceptions.h"
26 
27 namespace ioda {
28 
29 // -----------------------------------------------------------------------------
31 
32 // -----------------------------------------------------------------------------
33 /*!
34  * \brief Halo selector
35  *
36  * \details This distribution puts observations within a halo on the same
37  * processor
38  *
39  *
40  */
41 // -----------------------------------------------------------------------------
42 Halo::Halo(const eckit::mpi::Comm & Comm,
43  const eckit::Configuration & config) :
44  Distribution(Comm) {
45  // extract center point from configuration
46  // if no patch center defined, distribute centers equi-distant along the equator
47  std::vector<double> centerd(2, 0.0);
48  if (config.has("center")) {
49  centerd = config.getDoubleVector("center", centerd);
50  } else {
51  centerd[0] = static_cast<double>(comm_.rank())*
52  (360.0/static_cast<double>(comm_.size()));
53  centerd[1] = 0.0;
54  }
55  eckit::geometry::Point2 center(centerd[0], centerd[1]);
56  center_ = center;
57 
58  // assign radius that is a sum of the patch and localization radii
59  // (1) this radius is the radius of the "patch"
60  // if not specified, use radius big enough to encompass all obs. on Earth
61  double radius = config.getDouble("radius", 50000000.0);
62 
63  // (2) add localization radius (i.e. the "halo" radius)
64  double locRadius = config.getDouble("obs localization.lengthscale", 0.0);
65  radius += locRadius;
66 
67  radius_ = radius;
68 
69  oops::Log::debug() << "Halo constructed: center: " << center_ << " radius: "
70  << radius_ << std::endl;
71 }
72 
73 // -----------------------------------------------------------------------------
75  oops::Log::trace() << "Halo destructed" << std::endl;
76 }
77 
78 // -----------------------------------------------------------------------------
79 void Halo::assignRecord(const std::size_t RecNum, const std::size_t LocNum,
80  const eckit::geometry::Point2 & point) {
81  if (recordsOutsideHalo_.find(RecNum) != recordsOutsideHalo_.end()) {
82  // We've already seen the first location in this record, and it was too far away from center_.
83  return;
84  }
85 
86  if (recordsInHalo_.find(RecNum) == recordsInHalo_.end()) {
87  // This is the first location from this record. Find out whether to assign it to this PE.
88 
89  const double dist = eckit::geometry::Sphere::distance(radius_earth_, center_, point);
90  oops::Log::debug() << "Point: " << point << " distance to center: " << center_
91  << " = " << dist << std::endl;
92  if (dist <= radius_) {
93  // Yes!
94  recordsInHalo_.insert(RecNum);
95  recordDistancesFromCenter_[RecNum] = dist;
96  } else {
97  // No, it's too far from center_.
98  recordsOutsideHalo_.insert(RecNum);
99  return;
100  }
101  }
102 
103  // Now we know this record has been assigned to this PE. Store information about location LocNum.
104  haloLocVector_.push_back(LocNum);
105  haloLocRecords_.push_back(RecNum);
106 }
107 
108 // -----------------------------------------------------------------------------
109 bool Halo::isMyRecord(std::size_t RecNum) const {
110  return (recordsInHalo_.count(RecNum) > 0);
111 }
112 
113 // -----------------------------------------------------------------------------
115  // define some constants for this PE
116  double inf = std::numeric_limits<double>::infinity();
117  size_t myRank = comm_.rank();
118 
119  // All records have now been assigned, so this container is no longer needed.
120  recordsOutsideHalo_.clear();
121 
122  // Find the maximum global location index (plus 1)
123  size_t nglocs = 0;
124  if (!haloLocVector_.empty())
125  nglocs = haloLocVector_.back() + 1;
126  comm_.allReduceInPlace(nglocs, eckit::mpi::max());
127 
128  if ( nglocs > 0 ) {
129  // make structures holding pairs of {distance,rank} for reduce operation later
130  // initialize the local array to dist == inf
131  std::vector<std::pair<double, int>> dist_and_lidx_loc(nglocs);
132  std::vector<std::pair<double, int>> dist_and_lidx_glb(nglocs);
133  for ( size_t jj = 0; jj < nglocs; ++jj ) {
134  dist_and_lidx_loc[jj] = std::make_pair(inf, myRank);
135  }
136 
137  // assign actual distances to local obs (stored in haloLocVector_)
138  for (size_t loc = 0; loc < haloLocVector_.size(); ++loc) {
139  const size_t gloc = haloLocVector_[loc];
140  const size_t recNum = haloLocRecords_[loc];
141  const double dist = recordDistancesFromCenter_.at(recNum);
142  dist_and_lidx_loc[gloc] = std::make_pair(dist, myRank);
143  }
144 
145  // use reduce operation to find PE rank with minimal distance
146  comm_.allReduce(dist_and_lidx_loc, dist_and_lidx_glb, eckit::mpi::minloc());
147 
148  // ids of patch observations owned by this PE
149  std::unordered_set<std::size_t> patchObsLoc;
150 
151  // if this PE has the minimum distance then this PE owns this ob. as patch
152  for (auto i : haloLocVector_) {
153  if ( dist_and_lidx_glb[i].second == myRank ) {
154  patchObsLoc.insert(i);
155  }
156  }
157 
158  // convert storage from unodered sets to a bool vector
159  for (size_t jj = 0; jj < haloLocVector_.size(); ++jj) {
160  if ( patchObsLoc.count(haloLocVector_[jj]) ) {
161  patchObsBool_.push_back(true);
162  } else {
163  patchObsBool_.push_back(false);
164  }
165  }
166 
167  size_t npatchobs = std::count(patchObsBool_.begin(), patchObsBool_.end(), true);
168  oops::Log::debug() << "npatchobs: " << npatchobs << std::endl;
169  oops::Log::debug() << "patchObsBool_.size(): " << patchObsBool_.size() << std::endl;
170 
171  // now that we have patchObsBool_ computed we can free memory occupied by some temp objects
173  haloLocRecords_.clear();
174  haloLocRecords_.shrink_to_fit();
175 
176  computeGlobalUniqueConsecutiveLocIndices(dist_and_lidx_glb);
177 
178  // and now the remaining temp object
179  haloLocVector_.clear();
180  haloLocVector_.shrink_to_fit();
181  }
182 }
183 
184 // -----------------------------------------------------------------------------
186  const std::vector<std::pair<double, int>> &dist_and_lidx_glb) {
187  const double inf = std::numeric_limits<double>::infinity();
188 
190 
191  // Step 0: enable quick checks of whether a location belongs to this rank's halo.
192  std::unordered_set<size_t> haloLocSet(haloLocVector_.begin(), haloLocVector_.end());
193 
194  // Step 1: index patch observations owned by each rank consecutively (starting from 0 on each
195  // rank). For each observation i held on this rank, set globalConsecutiveLocIndices_[i] to
196  // the index of the corresponding patch observation on the rank that owns it.
197  std::vector<size_t> patchObsCountOnRank(comm_.size(), 0);
198  for (size_t gloc = 0, nglocs = dist_and_lidx_glb.size(); gloc < nglocs; ++gloc) {
199  if (dist_and_lidx_glb[gloc].first < inf) {
200  // This obs is held on at least one rank (this won't be the case e.g. for observations
201  // outside the assimilation window)
202  const size_t rankOwningPatchObs = dist_and_lidx_glb[gloc].second;
203  if (haloLocSet.find(gloc) != haloLocSet.end()) {
204  // This obs is held on the current PE (but not necessarily as a patch obs)
205  globalUniqueConsecutiveLocIndices_.push_back(patchObsCountOnRank[rankOwningPatchObs]);
206  }
207  ++patchObsCountOnRank[rankOwningPatchObs];
208  }
209  }
210 
211  // Step 2: make indices of patch observations globally unique by incrementing the index
212  // of each patch observation held by rank r by the total number of patch observations owned
213  // by ranks r' < r.
214 
215  // Perform an exclusive scan
216  std::vector<size_t> patchObsCountOnPreviousRanks(patchObsCountOnRank.size(), 0);
217  for (size_t rank = 1; rank < patchObsCountOnRank.size(); ++rank) {
218  patchObsCountOnPreviousRanks[rank] =
219  patchObsCountOnPreviousRanks[rank - 1] + patchObsCountOnRank[rank - 1];
220  }
221 
222  // Increment patch observation indices
223  for (size_t loc = 0; loc < globalUniqueConsecutiveLocIndices_.size(); ++loc) {
224  const size_t gloc = haloLocVector_[loc];
225  const size_t rankOwningPatchObs = dist_and_lidx_glb[gloc].second;
226  globalUniqueConsecutiveLocIndices_[loc] += patchObsCountOnPreviousRanks[rankOwningPatchObs];
227  }
228 }
229 
230 // -----------------------------------------------------------------------------
231 void Halo::patchObs(std::vector<bool> & patchObsVec) const {
232  patchObsVec = patchObsBool_;
233 }
234 
235 // -----------------------------------------------------------------------------
236 void Halo::min(int & x) const {
237  minImpl(x);
238 }
239 
240 void Halo::min(std::size_t & x) const {
241  minImpl(x);
242 }
243 
244 void Halo::min(float & x) const {
245  minImpl(x);
246 }
247 
248 void Halo::min(double & x) const {
249  minImpl(x);
250 }
251 
252 void Halo::min(std::vector<int> & x) const {
253  minImpl(x);
254 }
255 
256 void Halo::min(std::vector<std::size_t> & x) const {
257  minImpl(x);
258 }
259 
260 void Halo::min(std::vector<float> & x) const {
261  minImpl(x);
262 }
263 
264 void Halo::min(std::vector<double> & x) const {
265  minImpl(x);
266 }
267 
268 template <typename T>
269 void Halo::minImpl(T & x) const {
270  reductionImpl(x, eckit::mpi::min());
271 }
272 
273 // -----------------------------------------------------------------------------
274 void Halo::max(int & x) const {
275  maxImpl(x);
276 }
277 
278 void Halo::max(std::size_t & x) const {
279  maxImpl(x);
280 }
281 
282 void Halo::max(float & x) const {
283  maxImpl(x);
284 }
285 
286 void Halo::max(double & x) const {
287  maxImpl(x);
288 }
289 
290 void Halo::max(std::vector<int> & x) const {
291  maxImpl(x);
292 }
293 
294 void Halo::max(std::vector<std::size_t> & x) const {
295  maxImpl(x);
296 }
297 
298 void Halo::max(std::vector<float> & x) const {
299  maxImpl(x);
300 }
301 
302 void Halo::max(std::vector<double> & x) const {
303  maxImpl(x);
304 }
305 
306 template <typename T>
307 void Halo::maxImpl(T & x) const {
308  reductionImpl(x, eckit::mpi::max());
309 }
310 
311 // -----------------------------------------------------------------------------
312 template <typename T>
313 void Halo::reductionImpl(T & x, eckit::mpi::Operation::Code op) const {
314  comm_.allReduceInPlace(x, op);
315 }
316 
317 template <typename T>
318 void Halo::reductionImpl(std::vector<T> & x, eckit::mpi::Operation::Code op) const {
319  comm_.allReduceInPlace(x.begin(), x.end(), op);
320 }
321 
322 // -----------------------------------------------------------------------------
323 std::unique_ptr<Accumulator<int>>
325  return createAccumulatorImplT(init);
326 }
327 
328 std::unique_ptr<Accumulator<std::size_t>>
329 Halo::createAccumulatorImpl(std::size_t init) const {
330  return createAccumulatorImplT(init);
331 }
332 
333 std::unique_ptr<Accumulator<float>>
334 Halo::createAccumulatorImpl(float init) const {
335  return createAccumulatorImplT(init);
336 }
337 
338 std::unique_ptr<Accumulator<double>>
339 Halo::createAccumulatorImpl(double init) const {
340  return createAccumulatorImplT(init);
341 }
342 
343 std::unique_ptr<Accumulator<std::vector<int>>>
344 Halo::createAccumulatorImpl(const std::vector<int> &init) const {
345  return createAccumulatorImplT(init);
346 }
347 
348 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
349 Halo::createAccumulatorImpl(const std::vector<std::size_t> &init) const {
350  return createAccumulatorImplT(init);
351 }
352 
353 std::unique_ptr<Accumulator<std::vector<float>>>
354 Halo::createAccumulatorImpl(const std::vector<float> &init) const {
355  return createAccumulatorImplT(init);
356 }
357 
358 std::unique_ptr<Accumulator<std::vector<double>>>
359 Halo::createAccumulatorImpl(const std::vector<double> &init) const {
360  return createAccumulatorImplT(init);
361 }
362 
363 template <typename T>
364 std::unique_ptr<Accumulator<T>>
365 Halo::createAccumulatorImplT(const T &init) const {
366  return boost::make_unique<GeneralDistributionAccumulator<T>>(init, comm_, patchObsBool_);
367 }
368 
369 // -----------------------------------------------------------------------------
370 void Halo::allGatherv(std::vector<size_t> &x) const {
371  allGathervImpl(x);
372 }
373 
374 void Halo::allGatherv(std::vector<int> &x) const {
375  allGathervImpl(x);
376 }
377 
378 void Halo::allGatherv(std::vector<float> &x) const {
379  allGathervImpl(x);
380 }
381 
382 void Halo::allGatherv(std::vector<double> &x) const {
383  allGathervImpl(x);
384 }
385 
386 /// overloading for util::DateTime
387 void Halo::allGatherv(std::vector<util::DateTime> &x) const {
388  allGathervImpl(x);
389 }
390 
391 /// overloading for std::string
392 void Halo::allGatherv(std::vector<std::string> &x) const {
393  allGathervImpl(x);
394 }
395 
396 template <typename T>
397 void Halo::allGathervImpl(std::vector<T> &x) const {
398  // operation is only well defined if size x is the size of local obs
399  ASSERT(x.size() == patchObsBool_.size());
400 
401  // make a temp vector that only holds patch obs.
402  std::vector<T> xtmp(std::count(patchObsBool_.begin(), patchObsBool_.end(), true));
403  size_t counter = 0;
404  for (size_t ii = 0; ii < x.size(); ++ii) {
405  if ( patchObsBool_[ii] ) {
406  xtmp[counter] = x[ii];
407  ++counter;
408  }
409  }
410  // gather all patch obs into a single vector
411  oops::mpi::allGatherv(comm_, xtmp);
412 
413  // return the result
414  x = xtmp;
415 }
416 
417 // -----------------------------------------------------------------------------
418 
421 }
422 
423 // -----------------------------------------------------------------------------
424 
425 } // namespace ioda
class for distributing obs across multiple process elements
const eckit::mpi::Comm & comm_
Local MPI communicator.
size_t rank() const
Accessor to MPI rank.
A class able to instantiate objects of type T, which should be a subclass of Distribution.
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
Definition: Halo.cc:324
void computeGlobalUniqueConsecutiveLocIndices(const std::vector< std::pair< double, int >> &dist_and_lidx_glb)
Definition: Halo.cc:185
std::vector< bool > patchObsBool_
Definition: Halo.h:119
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
Definition: Halo.cc:236
std::vector< size_t > haloLocRecords_
Definition: Halo.h:133
void computePatchLocs() override
If necessary, identifies locations of "patch obs", i.e.
Definition: Halo.cc:114
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
Definition: Halo.cc:109
const double radius_earth_
Definition: Halo.h:138
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
Definition: Halo.cc:370
std::unordered_set< std::size_t > recordsInHalo_
Definition: Halo.h:117
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
Definition: Halo.cc:419
void reductionImpl(T &x, eckit::mpi::Operation::Code op) const
Definition: Halo.cc:313
std::unordered_set< std::size_t > recordsOutsideHalo_
Definition: Halo.h:128
void minImpl(T &x) const
Definition: Halo.cc:269
~Halo()
Definition: Halo.cc:74
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
Definition: Halo.cc:231
std::unique_ptr< Accumulator< T > > createAccumulatorImplT(const T &init) const
Definition: Halo.cc:365
std::unordered_map< std::size_t, double > recordDistancesFromCenter_
Definition: Halo.h:131
double radius_
Definition: Halo.h:114
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.
Definition: Halo.cc:274
void allGathervImpl(std::vector< T > &x) const
Definition: Halo.cc:397
std::vector< size_t > haloLocVector_
Definition: Halo.h:135
Halo(const eckit::mpi::Comm &Comm, const eckit::Configuration &config)
Halo selector.
Definition: Halo.cc:42
eckit::geometry::Point2 center_
Definition: Halo.h:115
void maxImpl(T &x) const
Definition: Halo.cc:307
std::vector< size_t > globalUniqueConsecutiveLocIndices_
Definition: Halo.h:122
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
If the record RecNum has not yet been assigned to a PE, assigns it to the appropriate PE.
Definition: Halo.cc:79
static DistributionMaker< AtlasDistribution > maker(DIST_NAME)