My Project
p2pcommunicator.hh
1/*
2 Copyright 2015 IRIS AS
3
4 This file is part of the Open Porous Media project (OPM).
5
6 OPM is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 OPM is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with OPM. If not, see <http://www.gnu.org/licenses/>.
18*/
19#ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
20#define DUNE_COMMUNICATOR_HEADER_INCLUDED
21
22#include <cassert>
23#include <algorithm>
24#include <vector>
25#include <set>
26#include <map>
27
28#include <dune/common/version.hh>
29
30#include <dune/common/parallel/mpihelper.hh>
31#if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
32#include <dune/common/parallel/communication.hh>
33#else
34#include <dune/common/parallel/collectivecommunication.hh>
35#endif
36
37// the following implementation is only available in case MPI is available
38#if HAVE_MPI
39#if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
40#include <dune/common/parallel/mpicommunication.hh>
41#else
42#include <dune/common/parallel/mpicollectivecommunication.hh>
43#endif
44#endif
45
46
47namespace Dune
48{
50 {
51 typedef std::vector< char > BufferType;
52
53 mutable BufferType buffer_;
54 const double factor_;
55 mutable size_t pos_;
56public:
59 SimpleMessageBuffer( const double factor = 1.1 )
60 : buffer_(), factor_( factor )
61 {
63 }
64
66 void clear() { buffer_.clear(); resetReadPosition(); }
68 void resetReadPosition() { pos_ = 0 ; }
70 size_t size() const { return buffer_.size(); }
71
73 void reserve( const size_t size )
74 {
75 buffer_.reserve( size );
76 }
77
79 void resize( const size_t size )
80 {
81 buffer_.resize( size );
82 }
83
85 template <class T>
86 void write( const T& value )
87 {
88 // union to access bytes in value
89 const size_t tsize = sizeof( T );
90 size_t pos = buffer_.size();
91 const size_t sizeNeeded = pos + tsize ;
92 // reserve with some 10% overestimation
93 if( buffer_.capacity() < sizeNeeded )
94 {
95 reserve( size_t(factor_ * sizeNeeded) ) ;
96 }
97 // resize to size need to store value
98 buffer_.resize( sizeNeeded );
99 // copy value to buffer
100 std::copy_n( reinterpret_cast<const char *> (&value), tsize, buffer_.data()+pos );
101 }
102
103 void write( const std::string& str)
104 {
105 int size = str.size();
106 write(size);
107 for (int k = 0; k < size; ++k) {
108 write(str[k]);
109 }
110 }
111
113 template <class T>
114 void read( T& value ) const
115 {
116 // read bytes from stream and store in value
117 const size_t tsize = sizeof( T );
118 assert( pos_ + tsize <= buffer_.size() );
119 std::copy_n( buffer_.data()+pos_, tsize, reinterpret_cast<char *> (&value) );
120 pos_ += tsize;
121 }
122
123 void read( std::string& str) const
124 {
125 int size = 0;
126 read(size);
127 str.resize(size);
128 for (int k = 0; k < size; ++k) {
129 read(str[k]);
130 }
131 }
132
134 std::pair< char* , int > buffer() const
135 {
136 return std::make_pair( buffer_.data(), int(buffer_.size()) );
137 }
138 };
139
141 template < class MsgBuffer >
142 class Point2PointCommunicator : public CollectiveCommunication< MPIHelper::MPICommunicator >
143 {
144 public:
146 typedef MPIHelper::MPICommunicator MPICommunicator ;
147
149 typedef MsgBuffer MessageBufferType ;
150
151 protected:
152#if DUNE_VERSION_NEWER(DUNE_GRID, 2, 7)
153 using BaseType = Dune::Communication<MPICommunicator>;
154#else
155 using BaseType = CollectiveCommunication< MPICommunicator>;
156#endif
158
159 // starting message tag
160 static const int messagetag = 234;
161
162 typedef std::map< int, int > linkage_t;
163 typedef std::vector< int > vector_t;
164
165 linkage_t sendLinkage_ ;
166 linkage_t recvLinkage_ ;
167
168 vector_t sendDest_ ;
169 vector_t recvSource_ ;
170
171 mutable vector_t _recvBufferSizes;
172 mutable bool _recvBufferSizesComputed;
173
174 public :
175 using BaseType :: rank;
176 using BaseType :: size;
177
178 /* \brief data handle interface that needs to be implemented for use with some of
179 * the exchange methods */
181 {
182 protected:
184 public:
185 virtual ~DataHandleInterface () {}
186 virtual void pack( const int link, MessageBufferType& os ) = 0 ;
187 virtual void unpack( const int link, MessageBufferType& os ) = 0 ;
188 // should contain work that could be done between send and receive
189 virtual void localComputation () {}
190 };
191
192 public:
194 Point2PointCommunicator( const MPICommunicator& mpiComm = MPIHelper::getCommunicator() )
195 : BaseType( mpiComm ) { removeLinkage(); }
196
198 Point2PointCommunicator( const BaseType& comm ) : BaseType( comm ) { removeLinkage(); }
199
200
202 inline void insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks );
203
205 inline int sendLinks () const { return sendLinkage_.size(); }
206
208 inline int recvLinks () const { return recvLinkage_.size(); }
209
211 const vector_t& recvBufferSizes() const { return _recvBufferSizes; }
212
214 inline int sendLink (const int rank) const
215 {
216 assert (sendLinkage_.end () != sendLinkage_.find (rank)) ;
217 return (* sendLinkage_.find (rank)).second ;
218 }
219
221 inline int recvLink (const int rank) const
222 {
223 assert (recvLinkage_.end () != recvLinkage_.find (rank)) ;
224 return (* recvLinkage_.find (rank)).second ;
225 }
226
228 const std::vector< int > &sendDest () const { return sendDest_; }
230 const std::vector< int > &recvSource () const { return recvSource_; }
231
233 inline void removeLinkage () ;
234
236 virtual std::vector< MessageBufferType > exchange (const std::vector< MessageBufferType > &) const;
237
239 virtual void exchange ( DataHandleInterface& ) const;
240
244 virtual void exchangeCached ( DataHandleInterface& ) const;
245
246 protected:
247 inline void computeDestinations( const linkage_t& linkage, vector_t& dest );
248
249 // return new tag number for the exchange messages
250 static int getMessageTag( const unsigned int increment )
251 {
252 static int tag = messagetag + 2 ;
253 // increase tag counter
254 const int retTag = tag;
255 tag += increment ;
256 // the MPI standard guaratees only up to 2^15-1
257 if( tag >= 32767 )
258 {
259 // reset tag to initial value
260 tag = messagetag + 2 ;
261 }
262 return retTag;
263 }
264
265 // return new tag number for the exchange messages
266 static int getMessageTag()
267 {
268 return getMessageTag( 1 );
269 }
270 };
271
272} // namespace Dune
273
274// include inline implementation
275#include "p2pcommunicator_impl.hh"
276
277#endif // #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
Definition: p2pcommunicator.hh:181
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:143
MsgBuffer MessageBufferType
type of message buffer used
Definition: p2pcommunicator.hh:149
int recvLinks() const
return number of processes we will receive data from
Definition: p2pcommunicator.hh:208
int recvLink(const int rank) const
return recv link number for a given recv rank number
Definition: p2pcommunicator.hh:221
const std::vector< int > & sendDest() const
return vector containing all process numbers we will send to
Definition: p2pcommunicator.hh:228
const std::vector< int > & recvSource() const
return vector containing all process numbers we will receive from
Definition: p2pcommunicator.hh:230
MPIHelper::MPICommunicator MPICommunicator
type of MPI communicator, either MPI_Comm or NoComm as defined in MPIHelper
Definition: p2pcommunicator.hh:146
int sendLink(const int rank) const
return send link number for a given send rank number
Definition: p2pcommunicator.hh:214
virtual void exchangeCached(DataHandleInterface &) const
exchange data with peers, handle defines pack and unpack of data, if receive buffers are known from p...
Definition: p2pcommunicator_impl.hh:619
int sendLinks() const
return number of processes we will send data to
Definition: p2pcommunicator.hh:205
const vector_t & recvBufferSizes() const
return vector containing possible recv buffer sizes
Definition: p2pcommunicator.hh:211
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:599
Point2PointCommunicator(const MPICommunicator &mpiComm=MPIHelper::getCommunicator())
constructor taking mpi communicator
Definition: p2pcommunicator.hh:194
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
Point2PointCommunicator(const BaseType &comm)
constructor taking collective communication
Definition: p2pcommunicator.hh:198
Definition: p2pcommunicator.hh:50
void resetReadPosition()
reset read position of buffer to beginning
Definition: p2pcommunicator.hh:68
size_t size() const
return size of buffer
Definition: p2pcommunicator.hh:70
std::pair< char *, int > buffer() const
return pointer to buffer and size for use with MPI functions
Definition: p2pcommunicator.hh:134
void clear()
clear the buffer
Definition: p2pcommunicator.hh:66
void reserve(const size_t size)
reserve memory for 'size' entries
Definition: p2pcommunicator.hh:73
void resize(const size_t size)
resize buffer to 'size' entries
Definition: p2pcommunicator.hh:79
SimpleMessageBuffer(const double factor=1.1)
constructor taking memory reserve estimation factor (default is 1.1, i.e.
Definition: p2pcommunicator.hh:59
void read(T &value) const
read value from buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:114
void write(const T &value)
write value to buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:86
Copyright 2019 Equinor AS.
Definition: CartesianIndexMapper.hpp:10