8 #include "ioda/distribution/Halo.h"
16 #include <boost/make_unique.hpp>
18 #include "oops/mpi/mpi.h"
19 #include "oops/util/DateTime.h"
20 #include "oops/util/Logger.h"
22 #include "ioda/distribution/DistributionFactory.h"
23 #include "ioda/distribution/GeneralDistributionAccumulator.h"
24 #include "eckit/config/LocalConfiguration.h"
25 #include "eckit/exception/Exceptions.h"
43 const eckit::Configuration & config) :
47 std::vector<double> centerd(2, 0.0);
48 if (config.has(
"center")) {
49 centerd = config.getDoubleVector(
"center", centerd);
51 centerd[0] =
static_cast<double>(
comm_.rank())*
52 (360.0/
static_cast<double>(
comm_.size()));
55 eckit::geometry::Point2 center(centerd[0], centerd[1]);
61 double radius = config.getDouble(
"radius", 50000000.0);
64 double locRadius = config.getDouble(
"obs localization.lengthscale", 0.0);
69 oops::Log::debug() <<
"Halo constructed: center: " <<
center_ <<
" radius: "
75 oops::Log::trace() <<
"Halo destructed" << std::endl;
80 const eckit::geometry::Point2 & point) {
90 oops::Log::debug() <<
"Point: " << point <<
" distance to center: " <<
center_
91 <<
" = " << dist << std::endl;
116 double inf = std::numeric_limits<double>::infinity();
117 size_t myRank =
comm_.rank();
126 comm_.allReduceInPlace(nglocs, eckit::mpi::max());
131 std::vector<std::pair<double, int>> dist_and_lidx_loc(nglocs);
132 std::vector<std::pair<double, int>> dist_and_lidx_glb(nglocs);
133 for (
size_t jj = 0; jj < nglocs; ++jj ) {
134 dist_and_lidx_loc[jj] = std::make_pair(inf, myRank);
142 dist_and_lidx_loc[gloc] = std::make_pair(dist, myRank);
146 comm_.allReduce(dist_and_lidx_loc, dist_and_lidx_glb, eckit::mpi::minloc());
149 std::unordered_set<std::size_t> patchObsLoc;
153 if ( dist_and_lidx_glb[i].second == myRank ) {
154 patchObsLoc.insert(i);
168 oops::Log::debug() <<
"npatchobs: " << npatchobs << std::endl;
169 oops::Log::debug() <<
"patchObsBool_.size(): " <<
patchObsBool_.size() << std::endl;
186 const std::vector<std::pair<double, int>> &dist_and_lidx_glb) {
187 const double inf = std::numeric_limits<double>::infinity();
197 std::vector<size_t> patchObsCountOnRank(
comm_.size(), 0);
198 for (
size_t gloc = 0, nglocs = dist_and_lidx_glb.size(); gloc < nglocs; ++gloc) {
199 if (dist_and_lidx_glb[gloc].first < inf) {
202 const size_t rankOwningPatchObs = dist_and_lidx_glb[gloc].second;
203 if (haloLocSet.find(gloc) != haloLocSet.end()) {
207 ++patchObsCountOnRank[rankOwningPatchObs];
216 std::vector<size_t> patchObsCountOnPreviousRanks(patchObsCountOnRank.size(), 0);
217 for (
size_t rank = 1;
rank < patchObsCountOnRank.size(); ++
rank) {
218 patchObsCountOnPreviousRanks[
rank] =
219 patchObsCountOnPreviousRanks[
rank - 1] + patchObsCountOnRank[
rank - 1];
225 const size_t rankOwningPatchObs = dist_and_lidx_glb[gloc].second;
268 template <
typename T>
306 template <
typename T>
312 template <
typename T>
314 comm_.allReduceInPlace(x, op);
317 template <
typename T>
319 comm_.allReduceInPlace(x.begin(), x.end(), op);
323 std::unique_ptr<Accumulator<int>>
328 std::unique_ptr<Accumulator<std::size_t>>
333 std::unique_ptr<Accumulator<float>>
338 std::unique_ptr<Accumulator<double>>
343 std::unique_ptr<Accumulator<std::vector<int>>>
348 std::unique_ptr<Accumulator<std::vector<std::size_t>>>
353 std::unique_ptr<Accumulator<std::vector<float>>>
358 std::unique_ptr<Accumulator<std::vector<double>>>
363 template <
typename T>
364 std::unique_ptr<Accumulator<T>>
366 return boost::make_unique<GeneralDistributionAccumulator<T>>(init,
comm_,
patchObsBool_);
396 template <
typename T>
404 for (
size_t ii = 0; ii < x.size(); ++ii) {
406 xtmp[counter] = x[ii];
411 oops::mpi::allGatherv(
comm_, xtmp);
class for distributing obs across multiple process elements
const eckit::mpi::Comm & comm_
Local MPI communicator.
size_t rank() const
Accessor to MPI rank.
A class able to instantiate objects of type T, which should be a subclass of Distribution.
std::unique_ptr< Accumulator< int > > createAccumulatorImpl(int init) const override
Create an object that can be used to calculate the sum of a location-dependent quantity over location...
void computeGlobalUniqueConsecutiveLocIndices(const std::vector< std::pair< double, int >> &dist_and_lidx_glb)
std::vector< bool > patchObsBool_
void min(int &x) const override
Calculates the global minimum (over all locations on all PEs) of a location-dependent quantity.
std::vector< size_t > haloLocRecords_
void computePatchLocs() override
If necessary, identifies locations of "patch obs", i.e.
bool isMyRecord(std::size_t RecNum) const override
Returns true if record RecNum has been assigned to the calling PE during a previous call to assignRec...
const double radius_earth_
void allGatherv(std::vector< size_t > &x) const override
Gather observation data from all processes and deliver the combined data to all processes.
std::unordered_set< std::size_t > recordsInHalo_
size_t globalUniqueConsecutiveLocationIndex(size_t loc) const override
Map the index of a location held on the calling process to the index of the corresponding element of ...
void reductionImpl(T &x, eckit::mpi::Operation::Code op) const
std::unordered_set< std::size_t > recordsOutsideHalo_
void patchObs(std::vector< bool > &) const override
Sets each element of the provided vector to true if the corresponding location is a "patch obs",...
std::unique_ptr< Accumulator< T > > createAccumulatorImplT(const T &init) const
std::unordered_map< std::size_t, double > recordDistancesFromCenter_
void max(int &x) const override
Calculates the global maximum (over all locations on all PEs) of a location-dependent quantity.
void allGathervImpl(std::vector< T > &x) const
std::vector< size_t > haloLocVector_
Halo(const eckit::mpi::Comm &Comm, const eckit::Configuration &config)
Halo selector.
eckit::geometry::Point2 center_
std::vector< size_t > globalUniqueConsecutiveLocIndices_
void assignRecord(const std::size_t RecNum, const std::size_t LocNum, const eckit::geometry::Point2 &point) override
If the record RecNum has not yet been assigned to a PE, assigns it to the appropriate PE.
static DistributionMaker< AtlasDistribution > maker(DIST_NAME)