IODA
ReplicaOfGeneralDistribution.cc
Go to the documentation of this file.
1 /*
2  * (C) Crown copyright 2021, Met Office
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  */
7 
8 #include "ioda/distribution/ReplicaOfGeneralDistribution.h"
9 
10 #include <boost/make_unique.hpp>
11 
12 #include "oops/mpi/mpi.h"
13 #include "oops/util/DateTime.h"
14 #include "oops/util/Logger.h"
15 
16 #include "ioda/distribution/GeneralDistributionAccumulator.h"
17 #include "eckit/exception/Exceptions.h"
18 
19 namespace ioda {
20 
21 // -----------------------------------------------------------------------------
22 // Note: we don't declare an instance of DistributionMaker<ReplicaOfGeneralDistribution>,
23 // since this distribution must be created programmatically (not from YAML).
24 
25 // -----------------------------------------------------------------------------
27  const eckit::mpi::Comm & comm,
28  std::shared_ptr<const Distribution> masterDist,
29  const std::vector<std::size_t> &masterRecordNumbers)
30  : Distribution(comm),
31  masterDist_(std::move(masterDist)),
32  numMasterLocs_(masterRecordNumbers.size())
33 {
34  // Identify records belonging to the master distribution's patch.
35  std::vector<bool> isMasterPatchObs(numMasterLocs_);
36  masterDist_->patchObs(isMasterPatchObs);
37  for (size_t loc = 0; loc < numMasterLocs_; ++loc)
38  if (isMasterPatchObs[loc])
39  masterPatchRecords_.insert(masterRecordNumbers[loc]);
40 
41  oops::Log::trace() << "ReplicaOfGeneralDistribution constructed" << std::endl;
42 }
43 
44 // -----------------------------------------------------------------------------
46  oops::Log::trace() << "ReplicaOfGeneralDistribution destructed" << std::endl;
47 }
48 
49 // -----------------------------------------------------------------------------
50 void ReplicaOfGeneralDistribution::assignRecord(const std::size_t RecNum, const std::size_t LocNum,
51  const eckit::geometry::Point2 & /*point*/) {
52  if (masterDist_->isMyRecord(RecNum)) {
53  myRecords_.insert(RecNum);
54  myGlobalLocs_.push_back(LocNum);
55  isMyPatchObs_.push_back(masterPatchRecords_.find(RecNum) != masterPatchRecords_.end());
56  }
57 }
58 
59 // -----------------------------------------------------------------------------
60 bool ReplicaOfGeneralDistribution::isMyRecord(std::size_t RecNum) const {
61  return myRecords_.find(RecNum) != myRecords_.end();
62 }
63 
64 // -----------------------------------------------------------------------------
66  // Find the maximum global location index (plus 1)
67  size_t nglocs = 0;
68  if (!myGlobalLocs_.empty())
69  nglocs = myGlobalLocs_.back() + 1;
70  comm_.allReduceInPlace(nglocs, eckit::mpi::max());
71 
72  // Collect the global location indices of all patch obs on the current process.
73  std::vector<std::size_t> patchObsGlobalLocs;
74  for (std::size_t i = 0, n = myGlobalLocs_.size(); i < n; ++i)
75  if (isMyPatchObs_[i])
76  patchObsGlobalLocs.push_back(myGlobalLocs_[i]);
77  // Merge with vectors collected on other processes (ordered by MPI rank).
78  oops::mpi::allGatherv(comm_, patchObsGlobalLocs);
79 
80  // Assign consecutive indices to patch obs ordered by MPI rank.
81  // (It is assumed that each location belongs to the patch of some process).
82  const std::size_t UNASSIGNED = static_cast<size_t>(-1);
83  std::vector<std::size_t> consecutiveLocIndices(nglocs, UNASSIGNED);
84  for (std::size_t i = 0, n = patchObsGlobalLocs.size(); i < n; ++i)
85  consecutiveLocIndices.at(patchObsGlobalLocs[i]) = i;
86 
87  // Find and save the indices of all obs held on the current process.
89  for (std::size_t loc = 0; loc < myGlobalLocs_.size(); ++loc) {
90  globalUniqueConsecutiveLocIndices_[loc] = consecutiveLocIndices[myGlobalLocs_[loc]];
91  if (globalUniqueConsecutiveLocIndices_[loc] == UNASSIGNED)
92  throw eckit::SeriousBug("A location does not belong to the patch of any process");
93  }
94 
95  // Free memory.
96  masterPatchRecords_.clear();
97  myGlobalLocs_.clear();
98  myGlobalLocs_.shrink_to_fit();
99 }
100 
101 // -----------------------------------------------------------------------------
102 void ReplicaOfGeneralDistribution::patchObs(std::vector<bool> & patchObsVec) const {
103  patchObsVec = isMyPatchObs_;
104 }
105 
106 // -----------------------------------------------------------------------------
108  minImpl(x);
109 }
110 
111 void ReplicaOfGeneralDistribution::min(std::size_t & x) const {
112  minImpl(x);
113 }
114 
115 void ReplicaOfGeneralDistribution::min(float & x) const {
116  minImpl(x);
117 }
118 
119 void ReplicaOfGeneralDistribution::min(double & x) const {
120  minImpl(x);
121 }
122 
123 void ReplicaOfGeneralDistribution::min(std::vector<int> & x) const {
124  minImpl(x);
125 }
126 
127 void ReplicaOfGeneralDistribution::min(std::vector<std::size_t> & x) const {
128  minImpl(x);
129 }
130 
131 void ReplicaOfGeneralDistribution::min(std::vector<float> & x) const {
132  minImpl(x);
133 }
134 
135 void ReplicaOfGeneralDistribution::min(std::vector<double> & x) const {
136  minImpl(x);
137 }
138 
139 template <typename T>
141  reductionImpl(x, eckit::mpi::min());
142 }
143 
144 // -----------------------------------------------------------------------------
146  maxImpl(x);
147 }
148 
149 void ReplicaOfGeneralDistribution::max(std::size_t & x) const {
150  maxImpl(x);
151 }
152 
153 void ReplicaOfGeneralDistribution::max(float & x) const {
154  maxImpl(x);
155 }
156 
157 void ReplicaOfGeneralDistribution::max(double & x) const {
158  maxImpl(x);
159 }
160 
161 void ReplicaOfGeneralDistribution::max(std::vector<int> & x) const {
162  maxImpl(x);
163 }
164 
165 void ReplicaOfGeneralDistribution::max(std::vector<std::size_t> & x) const {
166  maxImpl(x);
167 }
168 
169 void ReplicaOfGeneralDistribution::max(std::vector<float> & x) const {
170  maxImpl(x);
171 }
172 
173 void ReplicaOfGeneralDistribution::max(std::vector<double> & x) const {
174  maxImpl(x);
175 }
176 
177 template <typename T>
179  reductionImpl(x, eckit::mpi::max());
180 }
181 
182 // -----------------------------------------------------------------------------
183 template <typename T>
184 void ReplicaOfGeneralDistribution::reductionImpl(T & x, eckit::mpi::Operation::Code op) const {
185  comm_.allReduceInPlace(x, op);
186 }
187 
188 template <typename T>
190  eckit::mpi::Operation::Code op) const {
191  comm_.allReduceInPlace(x.begin(), x.end(), op);
192 }
193 
194 // -----------------------------------------------------------------------------
195 std::unique_ptr<Accumulator<int>>
197  return createAccumulatorImplT(init);
198 }
199 
200 std::unique_ptr<Accumulator<std::size_t>>
202  return createAccumulatorImplT(init);
203 }
204 
205 std::unique_ptr<Accumulator<float>>
207  return createAccumulatorImplT(init);
208 }
209 
210 std::unique_ptr<Accumulator<double>>
212  return createAccumulatorImplT(init);
213 }
214 
215 std::unique_ptr<Accumulator<std::vector<int>>>
216 ReplicaOfGeneralDistribution::createAccumulatorImpl(const std::vector<int> &init) const {
217  return createAccumulatorImplT(init);
218 }
219 
220 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
221 ReplicaOfGeneralDistribution::createAccumulatorImpl(const std::vector<std::size_t> &init) const {
222  return createAccumulatorImplT(init);
223 }
224 
225 std::unique_ptr<Accumulator<std::vector<float>>>
226 ReplicaOfGeneralDistribution::createAccumulatorImpl(const std::vector<float> &init) const {
227  return createAccumulatorImplT(init);
228 }
229 
230 std::unique_ptr<Accumulator<std::vector<double>>>
231 ReplicaOfGeneralDistribution::createAccumulatorImpl(const std::vector<double> &init) const {
232  return createAccumulatorImplT(init);
233 }
234 
235 template <typename T>
236 std::unique_ptr<Accumulator<T>>
238  return boost::make_unique<GeneralDistributionAccumulator<T>>(init, comm_, isMyPatchObs_);
239 }
240 
241 // -----------------------------------------------------------------------------
242 void ReplicaOfGeneralDistribution::allGatherv(std::vector<size_t> &x) const {
243  allGathervImpl(x);
244 }
245 
246 void ReplicaOfGeneralDistribution::allGatherv(std::vector<int> &x) const {
247  allGathervImpl(x);
248 }
249 
250 void ReplicaOfGeneralDistribution::allGatherv(std::vector<float> &x) const {
251  allGathervImpl(x);
252 }
253 
254 void ReplicaOfGeneralDistribution::allGatherv(std::vector<double> &x) const {
255  allGathervImpl(x);
256 }
257 
258 void ReplicaOfGeneralDistribution::allGatherv(std::vector<util::DateTime> &x) const {
259  allGathervImpl(x);
260 }
261 
262 void ReplicaOfGeneralDistribution::allGatherv(std::vector<std::string> &x) const {
263  allGathervImpl(x);
264 }
265 
266 template<typename T>
267 void ReplicaOfGeneralDistribution::allGathervImpl(std::vector<T> &x) const {
268  // As Halo
269  ASSERT(x.size() == isMyPatchObs_.size());
270 
271  std::vector<T> xAtPatchObs;
272  xAtPatchObs.reserve(x.size());
273  for (size_t i = 0; i < x.size(); ++i)
274  if (isMyPatchObs_[i])
275  xAtPatchObs.push_back(x[i]);
276  oops::mpi::allGatherv(comm_, xAtPatchObs);
277  x = std::move(xAtPatchObs);
278 }
279 
280 // -----------------------------------------------------------------------------
281 
284 }
285 
286 // -----------------------------------------------------------------------------
287 
288 } // namespace ioda
class for distributing obs across multiple process elements
const eckit::mpi::Comm & comm_
Local MPI communicator.
ReplicaOfGeneralDistribution(const eckit::mpi::Comm &comm, std::shared_ptr< const Distribution > masterDist, const std::vector< std::size_t > &masterRecordNumbers)
std::unordered_set< std::size_t > myRecords_
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
If the record RecNum has not yet been assigned to a PE, assigns it to the appropriate PE.
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
void reductionImpl(T &x, eckit::mpi::Operation::Code op) const
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
void computePatchLocs() override
If necessary, identifies locations of "patch obs", i.e.
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
void allGathervImpl(std::vector< T > &x) const
std::unordered_set< std::size_t > masterPatchRecords_
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
std::unique_ptr< Accumulator< T > > createAccumulatorImplT(const T &init) const
std::shared_ptr< const Distribution > masterDist_
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.