8 #include "ioda/distribution/ReplicaOfGeneralDistribution.h"
10 #include <boost/make_unique.hpp>
12 #include "oops/mpi/mpi.h"
13 #include "oops/util/DateTime.h"
14 #include "oops/util/Logger.h"
16 #include "ioda/distribution/GeneralDistributionAccumulator.h"
17 #include "eckit/exception/Exceptions.h"
27 const eckit::mpi::Comm & comm,
28 std::shared_ptr<const Distribution> masterDist,
29 const std::vector<std::size_t> &masterRecordNumbers)
31 masterDist_(std::move(masterDist)),
32 numMasterLocs_(masterRecordNumbers.size())
38 if (isMasterPatchObs[loc])
41 oops::Log::trace() <<
"ReplicaOfGeneralDistribution constructed" << std::endl;
46 oops::Log::trace() <<
"ReplicaOfGeneralDistribution destructed" << std::endl;
51 const eckit::geometry::Point2 & ) {
70 comm_.allReduceInPlace(nglocs, eckit::mpi::max());
73 std::vector<std::size_t> patchObsGlobalLocs;
78 oops::mpi::allGatherv(
comm_, patchObsGlobalLocs);
82 const std::size_t UNASSIGNED =
static_cast<size_t>(-1);
83 std::vector<std::size_t> consecutiveLocIndices(nglocs, UNASSIGNED);
84 for (std::size_t i = 0, n = patchObsGlobalLocs.size(); i < n; ++i)
85 consecutiveLocIndices.at(patchObsGlobalLocs[i]) = i;
89 for (std::size_t loc = 0; loc <
myGlobalLocs_.size(); ++loc) {
92 throw eckit::SeriousBug(
"A location does not belong to the patch of any process");
139 template <
typename T>
177 template <
typename T>
183 template <
typename T>
185 comm_.allReduceInPlace(x, op);
188 template <
typename T>
190 eckit::mpi::Operation::Code op)
const {
191 comm_.allReduceInPlace(x.begin(), x.end(), op);
195 std::unique_ptr<Accumulator<int>>
200 std::unique_ptr<Accumulator<std::size_t>>
205 std::unique_ptr<Accumulator<float>>
210 std::unique_ptr<Accumulator<double>>
215 std::unique_ptr<Accumulator<std::vector<int>>>
220 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
225 std::unique_ptr<Accumulator<std::vector<float>>>
230 std::unique_ptr<Accumulator<std::vector<double>>>
235 template <
typename T>
236 std::unique_ptr<Accumulator<T>>
238 return boost::make_unique<GeneralDistributionAccumulator<T>>(init,
comm_,
isMyPatchObs_);
271 std::vector<T> xAtPatchObs;
272 xAtPatchObs.reserve(x.size());
273 for (
size_t i = 0; i < x.size(); ++i)
275 xAtPatchObs.push_back(x[i]);
276 oops::mpi::allGatherv(
comm_, xAtPatchObs);
277 x = std::move(xAtPatchObs);
class for distributing obs across multiple process elements
const eckit::mpi::Comm & comm_
Local MPI communicator.
ReplicaOfGeneralDistribution(const eckit::mpi::Comm &comm, std::shared_ptr< const Distribution > masterDist, const std::vector< std::size_t > &masterRecordNumbers)
std::unordered_set< std::size_t > myRecords_
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
If the record RecNum has not yet been assigned to a PE, assigns it to the appropriate PE.
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
std::vector< size_t > globalUniqueConsecutiveLocIndices_
void reductionImpl(T &x, eckit::mpi::Operation::Code op) const
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
void computePatchLocs() override
If necessary, identifies locations of "patch obs", i.e.
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
void allGathervImpl(std::vector< T > &x) const
std::size_t numMasterLocs_
std::unordered_set< std::size_t > masterPatchRecords_
std::vector< std::size_t > myGlobalLocs_
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
~ReplicaOfGeneralDistribution() override
std::vector< bool > isMyPatchObs_
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
std::unique_ptr< Accumulator< T > > createAccumulatorImplT(const T &init) const
std::shared_ptr< const Distribution > masterDist_
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.