8 #include "ioda/distribution/PairOfDistributions.h"
10 #include <boost/make_unique.hpp>
12 #include "oops/util/DateTime.h"
13 #include "oops/util/Logger.h"
15 #include "ioda/distribution/PairOfDistributionsAccumulator.h"
16 #include "eckit/exception/Exceptions.h"
26 const eckit::mpi::Comm & comm,
27 std::shared_ptr<const Distribution> first,
28 std::shared_ptr<const Distribution> second,
29 std::size_t firstNumLocs,
30 std::size_t secondRecordNumOffset)
32 first_(std::move(first)),
33 second_(std::move(second)),
34 firstNumLocs_(firstNumLocs),
35 secondRecordNumOffset_(secondRecordNumOffset)
44 oops::Log::trace() <<
"PairOfDistributions constructed" << std::endl;
49 oops::Log::trace() <<
"PairOfDistributions destructed" << std::endl;
55 const eckit::geometry::Point2 & ) {
56 throw eckit::NotImplemented(
"No new records should be assigned to PairOfDistributions "
57 "after its creation", Here());
63 return first_->isMyRecord(RecNum);
73 const std::size_t secondNumObs = patchObsVec.size() -
firstNumLocs_;
75 std::vector<bool> secondPatchObsVec(secondNumObs);
76 second_->patchObs(secondPatchObsVec);
79 first_->patchObs(patchObsVec);
81 patchObsVec.insert(patchObsVec.end(), secondPatchObsVec.begin(), secondPatchObsVec.end());
117 template <
typename T>
156 template <
typename T>
163 std::unique_ptr<Accumulator<int>>
165 return createScalarAccumulator<int>();
168 std::unique_ptr<Accumulator<std::size_t>>
170 return createScalarAccumulator<std::size_t>();
173 std::unique_ptr<Accumulator<float>>
175 return createScalarAccumulator<float>();
178 std::unique_ptr<Accumulator<double>>
180 return createScalarAccumulator<double>();
183 std::unique_ptr<Accumulator<std::vector<int>>>
185 return createVectorAccumulator<int>(init.size());
188 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
190 return createVectorAccumulator<std::size_t>(init.size());
193 std::unique_ptr<Accumulator<std::vector<float>>>
195 return createVectorAccumulator<float>(init.size());
198 std::unique_ptr<Accumulator<std::vector<double>>>
200 return createVectorAccumulator<double>(init.size());
203 template <
typename T>
204 std::unique_ptr<Accumulator<T>>
206 return boost::make_unique<PairOfDistributionsAccumulator<T>>(
207 first_->createAccumulator<T>(),
208 second_->createAccumulator<T>(),
212 template <
typename T>
213 std::unique_ptr<Accumulator<std::vector<T>>>
215 return boost::make_unique<PairOfDistributionsAccumulator<std::vector<T>>>(
216 first_->createAccumulator<T>(n),
217 second_->createAccumulator<T>(n),
250 first_->allGatherv(firstX);
253 x.insert(x.end(), secondX.begin(), secondX.end());
260 return first_->globalUniqueConsecutiveLocationIndex(loc);
class for distributing obs across multiple process elements
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
std::size_t secondGlobalUniqueConsecutiveLocationIndexOffset_
std::size_t firstNumLocs_
void allGathervImpl(std::vector< T > &x) const
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
std::shared_ptr< const Distribution > second_
~PairOfDistributions() override
std::size_t secondRecordNumOffset_
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
std::shared_ptr< const Distribution > first_
std::unique_ptr< Accumulator< T > > createScalarAccumulator() const
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.
std::unique_ptr< Accumulator< std::vector< T > > > createVectorAccumulator(std::size_t n) const
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
PairOfDistributions(const eckit::mpi::Comm &comm, std::shared_ptr< const Distribution > first, std::shared_ptr< const Distribution > second, std::size_t firstNumLocs, std::size_t secondRecordNumOffset)
Create a PairOfDistributions object.