IODA
PairOfDistributions.cc
Go to the documentation of this file.
1 /*
2  * (C) Crown copyright 2021, Met Office
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  */
7 
8 #include "ioda/distribution/PairOfDistributions.h"
9 
10 #include <boost/make_unique.hpp>
11 
12 #include "oops/util/DateTime.h"
13 #include "oops/util/Logger.h"
14 
15 #include "ioda/distribution/PairOfDistributionsAccumulator.h"
16 #include "eckit/exception/Exceptions.h"
17 
18 namespace ioda {
19 
20 // -----------------------------------------------------------------------------
21 // Note: we don't declare an instance of DistributionMaker<PairOfDistributions>,
22 // since this distribution must be created programmatically (not from YAML).
23 
24 // -----------------------------------------------------------------------------
26  const eckit::mpi::Comm & comm,
27  std::shared_ptr<const Distribution> first,
28  std::shared_ptr<const Distribution> second,
29  std::size_t firstNumLocs,
30  std::size_t secondRecordNumOffset)
31  : Distribution(comm),
32  first_(std::move(first)),
33  second_(std::move(second)),
34  firstNumLocs_(firstNumLocs),
35  secondRecordNumOffset_(secondRecordNumOffset)
36 {
37  if (firstNumLocs_ > 0)
39  first_->globalUniqueConsecutiveLocationIndex(firstNumLocs_ - 1) + 1;
40  else
43 
44  oops::Log::trace() << "PairOfDistributions constructed" << std::endl;
45 }
46 
47 // -----------------------------------------------------------------------------
49  oops::Log::trace() << "PairOfDistributions destructed" << std::endl;
50 }
51 
52 // -----------------------------------------------------------------------------
53 void PairOfDistributions::assignRecord(const std::size_t /*RecNum*/,
54  const std::size_t /*LocNum*/,
55  const eckit::geometry::Point2 & /*point*/) {
56  throw eckit::NotImplemented("No new records should be assigned to PairOfDistributions "
57  "after its creation", Here());
58 }
59 
60 // -----------------------------------------------------------------------------
61 bool PairOfDistributions::isMyRecord(std::size_t RecNum) const {
62  if (RecNum < secondRecordNumOffset_)
63  return first_->isMyRecord(RecNum);
64  else
65  return second_->isMyRecord(RecNum - secondRecordNumOffset_);
66 }
67 
68 // -----------------------------------------------------------------------------
69 void PairOfDistributions::patchObs(std::vector<bool> & patchObsVec) const {
70  // Concatenate vectors produced by the first and second distributions.
71 
72  ASSERT(patchObsVec.size() >= firstNumLocs_);
73  const std::size_t secondNumObs = patchObsVec.size() - firstNumLocs_;
74 
75  std::vector<bool> secondPatchObsVec(secondNumObs);
76  second_->patchObs(secondPatchObsVec);
77 
78  patchObsVec.resize(firstNumLocs_);
79  first_->patchObs(patchObsVec);
80 
81  patchObsVec.insert(patchObsVec.end(), secondPatchObsVec.begin(), secondPatchObsVec.end());
82 }
83 
84 // -----------------------------------------------------------------------------
85 void PairOfDistributions::min(int & x) const {
86  minImpl(x);
87 }
88 
89 void PairOfDistributions::min(std::size_t & x) const {
90  minImpl(x);
91 }
92 
93 void PairOfDistributions::min(float & x) const {
94  minImpl(x);
95 }
96 
97 void PairOfDistributions::min(double & x) const {
98  minImpl(x);
99 }
100 
101 void PairOfDistributions::min(std::vector<int> & x) const {
102  minImpl(x);
103 }
104 
105 void PairOfDistributions::min(std::vector<std::size_t> & x) const {
106  minImpl(x);
107 }
108 
109 void PairOfDistributions::min(std::vector<float> & x) const {
110  minImpl(x);
111 }
112 
113 void PairOfDistributions::min(std::vector<double> & x) const {
114  minImpl(x);
115 }
116 
117 template <typename T>
118 void PairOfDistributions::minImpl(T & x) const {
119  first_->min(x);
120  second_->min(x);
121 }
122 
123 // -----------------------------------------------------------------------------
124 void PairOfDistributions::max(int & x) const {
125  maxImpl(x);
126 }
127 
128 void PairOfDistributions::max(std::size_t & x) const {
129  maxImpl(x);
130 }
131 
132 void PairOfDistributions::max(float & x) const {
133  maxImpl(x);
134 }
135 
136 void PairOfDistributions::max(double & x) const {
137  maxImpl(x);
138 }
139 
140 void PairOfDistributions::max(std::vector<int> & x) const {
141  maxImpl(x);
142 }
143 
144 void PairOfDistributions::max(std::vector<std::size_t> & x) const {
145  maxImpl(x);
146 }
147 
148 void PairOfDistributions::max(std::vector<float> & x) const {
149  maxImpl(x);
150 }
151 
152 void PairOfDistributions::max(std::vector<double> & x) const {
153  maxImpl(x);
154 }
155 
156 template <typename T>
157 void PairOfDistributions::maxImpl(T & x) const {
158  first_->max(x);
159  second_->max(x);
160 }
161 
162 // -----------------------------------------------------------------------------
163 std::unique_ptr<Accumulator<int>>
165  return createScalarAccumulator<int>();
166 }
167 
168 std::unique_ptr<Accumulator<std::size_t>>
170  return createScalarAccumulator<std::size_t>();
171 }
172 
173 std::unique_ptr<Accumulator<float>>
175  return createScalarAccumulator<float>();
176 }
177 
178 std::unique_ptr<Accumulator<double>>
180  return createScalarAccumulator<double>();
181 }
182 
183 std::unique_ptr<Accumulator<std::vector<int>>>
184 PairOfDistributions::createAccumulatorImpl(const std::vector<int> &init) const {
185  return createVectorAccumulator<int>(init.size());
186 }
187 
188 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
189 PairOfDistributions::createAccumulatorImpl(const std::vector<std::size_t> &init) const {
190  return createVectorAccumulator<std::size_t>(init.size());
191 }
192 
193 std::unique_ptr<Accumulator<std::vector<float>>>
194 PairOfDistributions::createAccumulatorImpl(const std::vector<float> &init) const {
195  return createVectorAccumulator<float>(init.size());
196 }
197 
198 std::unique_ptr<Accumulator<std::vector<double>>>
199 PairOfDistributions::createAccumulatorImpl(const std::vector<double> &init) const {
200  return createVectorAccumulator<double>(init.size());
201 }
202 
203 template <typename T>
204 std::unique_ptr<Accumulator<T>>
206  return boost::make_unique<PairOfDistributionsAccumulator<T>>(
207  first_->createAccumulator<T>(),
208  second_->createAccumulator<T>(),
209  firstNumLocs_);
210 }
211 
212 template <typename T>
213 std::unique_ptr<Accumulator<std::vector<T>>>
215  return boost::make_unique<PairOfDistributionsAccumulator<std::vector<T>>>(
216  first_->createAccumulator<T>(n),
217  second_->createAccumulator<T>(n),
218  firstNumLocs_);
219 }
220 
221 // -----------------------------------------------------------------------------
222 void PairOfDistributions::allGatherv(std::vector<size_t> &x) const {
223  allGathervImpl(x);
224 }
225 
226 void PairOfDistributions::allGatherv(std::vector<int> &x) const {
227  allGathervImpl(x);
228 }
229 
230 void PairOfDistributions::allGatherv(std::vector<float> &x) const {
231  allGathervImpl(x);
232 }
233 
234 void PairOfDistributions::allGatherv(std::vector<double> &x) const {
235  allGathervImpl(x);
236 }
237 
238 void PairOfDistributions::allGatherv(std::vector<util::DateTime> &x) const {
239  allGathervImpl(x);
240 }
241 
242 void PairOfDistributions::allGatherv(std::vector<std::string> &x) const {
243  allGathervImpl(x);
244 }
245 
246 template<typename T>
247 void PairOfDistributions::allGathervImpl(std::vector<T> &x) const {
248  std::vector<T> firstX(x.begin(), x.begin() + firstNumLocs_);
249  std::vector<T> secondX(x.begin() + firstNumLocs_, x.end());
250  first_->allGatherv(firstX);
251  second_->allGatherv(secondX);
252  x = firstX;
253  x.insert(x.end(), secondX.begin(), secondX.end());
254 }
255 
256 // -----------------------------------------------------------------------------
257 
259  if (loc < firstNumLocs_)
260  return first_->globalUniqueConsecutiveLocationIndex(loc);
261  else
263  second_->globalUniqueConsecutiveLocationIndex(loc - firstNumLocs_);
264 }
265 
266 // -----------------------------------------------------------------------------
267 
268 } // namespace ioda
class for distributing obs across multiple process elements
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
std::size_t secondGlobalUniqueConsecutiveLocationIndexOffset_
void allGathervImpl(std::vector< T > &x) const
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
std::shared_ptr< const Distribution > second_
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
std::shared_ptr< const Distribution > first_
std::unique_ptr< Accumulator< T > > createScalarAccumulator() const
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.
std::unique_ptr< Accumulator< std::vector< T > > > createVectorAccumulator(std::size_t n) const
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
PairOfDistributions(const eckit::mpi::Comm &comm, std::shared_ptr< const Distribution > first, std::shared_ptr< const Distribution > second, std::size_t firstNumLocs, std::size_t secondRecordNumOffset)
Create a PairOfDistributions object.