17 #include "eckit/exception/Exceptions.h"
18 #include "oops/util/DateTime.h"
33 const std::vector<std::string> &strings) {
34 std::pair<std::vector<char>, std::vector<size_t>> result;
35 std::vector<char> &charArray = result.first;
36 std::vector<size_t> &lengths = result.second;
38 size_t totalLength = 0;
39 lengths.reserve(strings.size());
40 for (
const std::string &s : strings) {
41 lengths.push_back(s.size());
42 totalLength += s.size();
45 charArray.reserve(totalLength);
46 for (
const std::string &s : strings) {
47 charArray.insert(charArray.end(), s.begin(), s.end());
63 std::vector<std::string>
decodeStrings(
const std::vector<char> &charArray,
64 const std::vector<size_t> &lengths) {
65 std::vector<std::string> strings;
66 strings.reserve(lengths.size());
68 std::vector<char>::const_iterator nextStringBegin = charArray.begin();
69 for (
size_t length : lengths) {
70 strings.emplace_back(nextStringBegin, nextStringBegin + length);
71 nextStringBegin += length;
84 const eckit::mpi::Comm &
world() {
85 return eckit::mpi::comm();
91 return eckit::mpi::self();
96 void gather(
const eckit::mpi::Comm & comm,
const std::vector<double> &
send,
97 std::vector<double> & recv,
const size_t root) {
98 size_t ntasks = comm.size();
100 int mysize =
send.size();
101 std::vector<int> sizes(ntasks);
102 comm.allGather(mysize, sizes.begin(), sizes.end());
103 std::vector<int> displs(ntasks);
104 size_t rcvsz = sizes[0];
106 for (
size_t jj = 1; jj < ntasks; ++jj) {
107 displs[jj] = displs[jj - 1] + sizes[jj - 1];
110 if (comm.rank() == root) recv.resize(rcvsz);
112 comm.gatherv(
send, recv, sizes, displs, root);
121 const Eigen::VectorXd & sendbuf, Eigen::MatrixXd & recvbuf) {
122 const int ntasks = comm.size();
123 int buf_size = sendbuf.size();
125 std::vector<double> vbuf(sendbuf.data(), sendbuf.data() + buf_size);
126 std::vector<double> vbuf_total(ntasks * buf_size);
128 std::vector<int> recvcounts(ntasks);
129 for (
int ii = 0; ii < ntasks; ++ii) recvcounts[ii] = buf_size;
131 std::vector<int> displs(ntasks);
132 for (
int ii = 0; ii < ntasks; ++ii) displs[ii] = ii * buf_size;
134 comm.allGatherv(vbuf.begin(), vbuf.end(),
135 vbuf_total.begin(), recvcounts.data(), displs.data());
137 for (
int ii = 0; ii < ntasks; ++ii) {
138 std::vector<double> vloc(vbuf_total.begin() + ii * buf_size,
139 vbuf_total.begin() + (ii + 1) * buf_size);
140 Eigen::VectorXd my_vect = Eigen::Map<Eigen::VectorXd, Eigen::Unaligned>(vloc.data(),
142 recvbuf.col(ii) = my_vect;
148 void allGatherv(
const eckit::mpi::Comm & comm, std::vector<util::DateTime> &x) {
149 size_t globalSize = x.size();
150 comm.allReduceInPlace(globalSize, eckit::mpi::sum());
151 std::vector<util::DateTime> globalX(globalSize);
153 x = std::move(globalX);
158 void allGatherv(
const eckit::mpi::Comm & comm, std::vector<std::string> &x) {
159 std::pair<std::vector<char>, std::vector<size_t>> encodedX =
encodeStrings(x);
162 eckit::mpi::Buffer<char> charBuffer(comm.size());
163 comm.allGatherv(encodedX.first.begin(), encodedX.first.end(), charBuffer);
166 eckit::mpi::Buffer<size_t> lengthBuffer(comm.size());
167 comm.allGatherv(encodedX.second.begin(), encodedX.second.end(), lengthBuffer);
180 std::vector<size_t> xs(comm.size());
181 comm.allGather(x, xs.begin(), xs.end());
182 x = std::accumulate(xs.begin(), xs.begin() + comm.rank(), 0);
std::pair< std::vector< char >, std::vector< size_t > > encodeStrings(const std::vector< std::string > &strings)
Join strings into a single character array before MPI transfer.
std::vector< std::string > decodeStrings(const std::vector< char > &charArray, const std::vector< size_t > &lengths)
Split a character array into multiple strings.
void allGather(const eckit::mpi::Comm &comm, const Eigen::VectorXd &sendbuf, Eigen::MatrixXd &recvbuf)
const eckit::mpi::Comm & myself()
Default communicator with each MPI task by itself.
void allGathervUsingSerialize(const eckit::mpi::Comm &comm, CIter first, CIter last, Iter recvbuf)
A wrapper around the MPI all gather operation for serializable types.
void gather(const eckit::mpi::Comm &comm, const std::vector< double > &send, std::vector< double > &recv, const size_t root)
void exclusiveScan(const eckit::mpi::Comm &comm, size_t &x)
Perform the exclusive scan operation.
void send(const eckit::mpi::Comm &comm, const SERIALIZABLE &sendobj, const int dest, const int tag)
Extend eckit Comm for Serializable oops objects.
const eckit::mpi::Comm & world()
Default communicator with all MPI tasks (ie MPI_COMM_WORLD)
void allGatherv(const eckit::mpi::Comm &comm, std::vector< util::DateTime > &x)
Perform the MPI all gather operation on a vector of DateTime objects.
The namespace for the main oops code.