OOPS
oops/mpi/mpi.h
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2013 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation
8  * nor does it submit to any jurisdiction.
9  */
10 
11 #pragma once
12 
13 #include <Eigen/Dense>
14 
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "eckit/exception/Exceptions.h"
20 #include "eckit/mpi/Comm.h"
21 
22 #include "oops/util/Timer.h"
23 
24 namespace util {
25 class DateTime;
26 } // namespace util
27 
28 namespace oops {
29 namespace mpi {
30 
31 // ------------------------------------------------------------------------------------------------
32 
33 /// Default communicator with all MPI tasks (ie MPI_COMM_WORLD)
34 const eckit::mpi::Comm & world();
35 
36 /// Default communicator with each MPI task by itself
37 const eckit::mpi::Comm & myself();
38 
39 // ------------------------------------------------------------------------------------------------
40 
41 /// Extend eckit Comm for Serializable oops objects
42 
43 template <typename SERIALIZABLE>
44 void send(const eckit::mpi::Comm & comm, const SERIALIZABLE & sendobj,
45  const int dest, const int tag) {
46  util::Timer timer("oops::mpi", "send");
47  std::vector<double> sendbuf;
48  sendobj.serialize(sendbuf);
49  comm.send(sendbuf.data(), sendbuf.size(), dest, tag);
50 }
51 
52 // ------------------------------------------------------------------------------------------------
53 
54 template <typename SERIALIZABLE>
55 void receive(const eckit::mpi::Comm & comm, SERIALIZABLE & recvobj,
56  const int source, const int tag) {
57  util::Timer timer("oops::mpi", "receive");
58  size_t sz = recvobj.serialSize();
59  std::vector<double> recvbuf(sz);
60  eckit::mpi::Status status = comm.receive(recvbuf.data(), sz, source, tag);
61  size_t ii = 0;
62  recvobj.deserialize(recvbuf, ii);
63  ASSERT(ii == sz);
64 }
65 
66 // ------------------------------------------------------------------------------------------------
67 
68 void gather(const eckit::mpi::Comm & comm, const std::vector<double> & send,
69  std::vector<double> & recv, const size_t root);
70 
71 // ------------------------------------------------------------------------------------------------
72 
73 template <typename SERIALIZABLE>
74 void gather(const eckit::mpi::Comm & comm, const std::vector<SERIALIZABLE> & send,
75  std::vector<SERIALIZABLE> & recv, const size_t root) {
76  if (comm.size() > 1) {
77  std::vector<double> sendbuf;
78  std::vector<double> recvbuf;
79 
80  for (const SERIALIZABLE & jsend : send) jsend.serialize(sendbuf);
81 
82  gather(comm, sendbuf, recvbuf, root);
83 
84  if (comm.rank() == root) {
85  size_t indx = 0;
86  for (SERIALIZABLE & jrecv : recv) jrecv.deserialize(recvbuf, indx);
87  }
88  } else {
89  recv = send;
90  }
91 }
92 
93 // ------------------------------------------------------------------------------------------------
94 // allGather for eigen vectors
95 // ------------------------------------------------------------------------------------------------
96 void allGather(const eckit::mpi::Comm & comm,
97  const Eigen::VectorXd &, Eigen::MatrixXd &);
98 
99 // ------------------------------------------------------------------------------------------------
100 
101 /// \brief A wrapper around the MPI *all gather* operation for serializable types.
102 ///
103 /// The *all gather* operation gathers data from all tasks and delivers the combined data to all
104 /// tasks. This wrapper performs that operation for collections of non-primitive types that
105 /// nevertheless support the OOPS serialization interface, i.e. provide the functions
106 ///
107 /// void serialize(std::vector<double> &vect) const;
108 /// void deserialize(const std::vector<double> &vect, size_t &current);
109 ///
110 /// An example of such a type is util::DateTime.
111 ///
112 /// \param comm
113 /// Communicator.
114 /// \param first, last
115 /// Range of values to be delivered from this task to all other tasks.
116 /// \param recvbuf
117 /// Output iterator to the beginning of the range to receive the combined data from all tasks.
118 template <typename CIter, typename Iter>
119 void allGathervUsingSerialize(const eckit::mpi::Comm &comm, CIter first, CIter last,
120  Iter recvbuf) {
121  std::vector<double> serializedLocalData;
122  for (CIter it = first; it != last; ++it)
123  it->serialize(serializedLocalData);
124 
125  eckit::mpi::Buffer<double> buffer(comm.size());
126  comm.allGatherv(serializedLocalData.begin(), serializedLocalData.end(), buffer);
127 
128  size_t numDeserializedDoubles = 0;
129  for (Iter it = recvbuf; numDeserializedDoubles != buffer.buffer.size(); ++it)
130  it->deserialize(buffer.buffer, numDeserializedDoubles);
131 }
132 
133 // ------------------------------------------------------------------------------------------------
134 
135 // The following functions simplify the allGatherv operation on vectors (reducing it to a single
136 // function call).
137 
138 // ------------------------------------------------------------------------------------------------
139 
140 /// \brief Perform the MPI *all gather* operation on a vector of "plain old data".
141 ///
142 /// This operation gathers data from all tasks and delivers the combined data to all tasks.
143 ///
144 /// \tparam T must be a type for which there exists a specialization of eckit::mpi::Data::Type.
145 ///
146 /// \param[in] comm
147 /// Communicator.
148 /// \param[inout] x
149 /// On input, data owned by this task that need to be delivered to all other tasks. On output,
150 /// combined data received from all tasks (concatenated in the order of increasing task ranks).
151 template <typename T>
152 void allGatherv(const eckit::mpi::Comm & comm, std::vector<T> &x) {
153  eckit::mpi::Buffer<T> buffer(comm.size());
154  comm.allGatherv(x.begin(), x.end(), buffer);
155  x = std::move(buffer.buffer);
156 }
157 
158 // ------------------------------------------------------------------------------------------------
159 
160 /// \brief Perform the MPI *all gather* operation on a vector of DateTime objects.
161 ///
162 /// This operation gathers data from all tasks and delivers the combined data to all tasks.
163 ///
164 /// \param[in] comm
165 /// Communicator.
166 /// \param[inout] x
167 /// On input, data owned by this task that need to be delivered to all other tasks. On output,
168 /// combined data received from all tasks (concatenated in the order of increasing task ranks).
169 void allGatherv(const eckit::mpi::Comm & comm, std::vector<util::DateTime> &x);
170 
171 // ------------------------------------------------------------------------------------------------
172 
173 /// \brief Perform the MPI *all gather* operation on a vector of DateTime objects.
174 ///
175 /// This operation gathers data from all tasks and delivers the combined data to all tasks.
176 ///
177 /// \param[in] comm
178 /// Communicator.
179 /// \param[inout] x
180 /// On input, data owned by this task that need to be delivered to all other tasks. On output,
181 /// combined data received from all tasks (concatenated in the order of increasing task ranks).
182 void allGatherv(const eckit::mpi::Comm & comm, std::vector<std::string> &x);
183 
184 // ------------------------------------------------------------------------------------------------
185 
186 /// \brief Perform the exclusive scan operation.
187 ///
188 /// On output, `x` is set to the sum of the values of `x` passed to this function
189 /// on all ranks lower than the calling rank (and to 0 on rank 0).
190 void exclusiveScan(const eckit::mpi::Comm &comm, size_t &x);
191 
192 } // namespace mpi
193 } // namespace oops
void allGather(const eckit::mpi::Comm &comm, const Eigen::VectorXd &sendbuf, Eigen::MatrixXd &recvbuf)
const eckit::mpi::Comm & myself()
Default communicator with each MPI task by itself.
Definition: oops/mpi/mpi.cc:90
void allGathervUsingSerialize(const eckit::mpi::Comm &comm, CIter first, CIter last, Iter recvbuf)
A wrapper around the MPI all gather operation for serializable types.
Definition: oops/mpi/mpi.h:119
void gather(const eckit::mpi::Comm &comm, const std::vector< double > &send, std::vector< double > &recv, const size_t root)
Definition: oops/mpi/mpi.cc:96
void exclusiveScan(const eckit::mpi::Comm &comm, size_t &x)
Perform the exclusive scan operation.
void send(const eckit::mpi::Comm &comm, const SERIALIZABLE &sendobj, const int dest, const int tag)
Extend eckit Comm for Serializable oops objects.
Definition: oops/mpi/mpi.h:44
const eckit::mpi::Comm & world()
Default communicator with all MPI tasks (ie MPI_COMM_WORLD)
Definition: oops/mpi/mpi.cc:84
void allGatherv(const eckit::mpi::Comm &comm, std::vector< util::DateTime > &x)
Perform the MPI all gather operation on a vector of DateTime objects.
void receive(const eckit::mpi::Comm &comm, SERIALIZABLE &recvobj, const int source, const int tag)
Definition: oops/mpi/mpi.h:55
The namespace for the main oops code.
Definition: TLML95.h:34