dune-common  2.6-git
communicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMUNICATOR
4 #define DUNE_COMMUNICATOR
5 
6 #if HAVE_MPI
7 
8 #include <cassert>
9 #include <cstddef>
10 #include <iostream>
11 #include <map>
12 #include <type_traits>
13 #include <utility>
14 
15 #include <mpi.h>
16 
21 #include <dune/common/unused.hh>
22 
23 namespace Dune
24 {
108  struct SizeOne
109  {};
110 
117  {};
118 
119 
125  template<class V>
126  struct CommPolicy
127  {
139  typedef V Type;
140 
146  typedef typename V::value_type IndexedType;
147 
153 
162  static const void* getAddress(const V& v, int index);
163 
169  static int getSize(const V&, int index);
170  };
171 
172  template<class K, int n> class FieldVector;
173 
174  template<class B, class A> class VariableBlockVector;
175 
176  template<class K, class A, int n>
178  {
180 
181  typedef typename Type::B IndexedType;
182 
184 
185  static const void* getAddress(const Type& v, int i);
186 
187  static int getSize(const Type& v, int i);
188  };
189 
194  {};
195 
199  template<class T>
201  {
203 
204  static const IndexedType& gather(const T& vec, std::size_t i);
205 
206  static void scatter(T& vec, const IndexedType& v, std::size_t i);
207 
208  };
209 
221  template<typename T>
222  class DatatypeCommunicator : public InterfaceBuilder
223  {
224  public:
225 
229  typedef T ParallelIndexSet;
230 
235 
239  typedef typename RemoteIndices::GlobalIndex GlobalIndex;
240 
244  typedef typename RemoteIndices::Attribute Attribute;
245 
249  typedef typename RemoteIndices::LocalIndex LocalIndex;
250 
254  DatatypeCommunicator();
255 
259  ~DatatypeCommunicator();
260 
287  template<class T1, class T2, class V>
288  void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
289 
293  void forward();
294 
298  void backward();
299 
303  void free();
304  private:
305  enum {
309  commTag_ = 234
310  };
311 
315  const RemoteIndices* remoteIndices_;
316 
317  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
318  MessageTypeMap;
319 
323  MessageTypeMap messageTypes;
324 
328  void* data_;
329 
330  MPI_Request* requests_[2];
331 
335  bool created_;
336 
340  template<class V, bool FORWARD>
341  void createRequests(V& sendData, V& receiveData);
342 
346  template<class T1, class T2, class V, bool send>
347  void createDataTypes(const T1& source, const T2& destination, V& data);
348 
352  void sendRecv(MPI_Request* req);
353 
357  struct IndexedTypeInformation
358  {
364  void build(int i)
365  {
366  length = new int[i];
367  displ = new MPI_Aint[i];
368  size = i;
369  }
370 
374  void free()
375  {
376  delete[] length;
377  delete[] displ;
378  }
380  int* length;
382  MPI_Aint* displ;
388  int elements;
392  int size;
393  };
394 
400  template<class V>
401  struct MPIDatatypeInformation
402  {
407  MPIDatatypeInformation(const V& data) : data_(data)
408  {}
409 
415  void reserve(int proc, int size)
416  {
417  information_[proc].build(size);
418  }
425  void add(int proc, int local)
426  {
427  IndexedTypeInformation& info=information_[proc];
428  assert((info.elements)<info.size);
429  MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
430  info.displ+info.elements);
431  info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
432  info.elements++;
433  }
434 
439  std::map<int,IndexedTypeInformation> information_;
443  const V& data_;
444 
445  };
446 
447  };
448 
459  {
460 
461  public:
466 
473  template<class Data, class Interface>
474  typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
475  build(const Interface& interface);
476 
484  template<class Data, class Interface>
485  void build(const Data& source, const Data& target, const Interface& interface);
486 
515  template<class GatherScatter, class Data>
516  void forward(const Data& source, Data& dest);
517 
546  template<class GatherScatter, class Data>
547  void backward(Data& source, const Data& dest);
548 
574  template<class GatherScatter, class Data>
575  void forward(Data& data);
576 
602  template<class GatherScatter, class Data>
603  void backward(Data& data);
604 
608  void free();
609 
614 
615  private:
616 
620  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
621  InterfaceMap;
622 
623 
627  template<class Data, typename IndexedTypeFlag>
628  struct MessageSizeCalculator
629  {};
630 
635  template<class Data>
636  struct MessageSizeCalculator<Data,SizeOne>
637  {
644  inline int operator()(const InterfaceInformation& info) const;
653  inline int operator()(const Data& data, const InterfaceInformation& info) const;
654  };
655 
660  template<class Data>
661  struct MessageSizeCalculator<Data,VariableSize>
662  {
671  inline int operator()(const Data& data, const InterfaceInformation& info) const;
672  };
673 
677  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
678  struct MessageGatherer
679  {};
680 
685  template<class Data, class GatherScatter, bool send>
686  struct MessageGatherer<Data,GatherScatter,send,SizeOne>
687  {
689  typedef typename CommPolicy<Data>::IndexedType Type;
690 
695  typedef GatherScatter Gatherer;
696 
697  enum {
703  forward=send
704  };
705 
713  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
714  };
715 
720  template<class Data, class GatherScatter, bool send>
721  struct MessageGatherer<Data,GatherScatter,send,VariableSize>
722  {
724  typedef typename CommPolicy<Data>::IndexedType Type;
725 
730  typedef GatherScatter Gatherer;
731 
732  enum {
738  forward=send
739  };
740 
748  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
749  };
750 
754  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
755  struct MessageScatterer
756  {};
757 
762  template<class Data, class GatherScatter, bool send>
763  struct MessageScatterer<Data,GatherScatter,send,SizeOne>
764  {
766  typedef typename CommPolicy<Data>::IndexedType Type;
767 
772  typedef GatherScatter Scatterer;
773 
774  enum {
780  forward=send
781  };
782 
790  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
791  };
796  template<class Data, class GatherScatter, bool send>
797  struct MessageScatterer<Data,GatherScatter,send,VariableSize>
798  {
800  typedef typename CommPolicy<Data>::IndexedType Type;
801 
806  typedef GatherScatter Scatterer;
807 
808  enum {
814  forward=send
815  };
816 
824  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
825  };
826 
830  struct MessageInformation
831  {
833  MessageInformation()
834  : start_(0), size_(0)
835  {}
836 
844  MessageInformation(size_t start, size_t size)
845  : start_(start), size_(size)
846  {}
850  size_t start_;
854  size_t size_;
855  };
856 
863  typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
864  InformationMap;
868  InformationMap messageInformation_;
872  char* buffers_[2];
876  size_t bufferSize_[2];
877 
878  enum {
882  commTag_
883  };
884 
888  std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
889 
890  MPI_Comm communicator_;
891 
895  template<class GatherScatter, bool FORWARD, class Data>
896  void sendRecv(const Data& source, Data& target);
897 
898  };
899 
900 #ifndef DOXYGEN
901 
902  template<class V>
903  inline const void* CommPolicy<V>::getAddress(const V& v, int index)
904  {
905  return &(v[index]);
906  }
907 
908  template<class V>
909  inline int CommPolicy<V>::getSize(const V& v, int index)
910  {
912  DUNE_UNUSED_PARAMETER(index);
913  return 1;
914  }
915 
916  template<class K, class A, int n>
917  inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
918  {
919  return &(v[index][0]);
920  }
921 
922  template<class K, class A, int n>
923  inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
924  {
925  return v[index].getsize();
926  }
927 
928  template<class T>
929  inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
930  {
931  return vec[i];
932  }
933 
934  template<class T>
935  inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
936  {
937  vec[i]=v;
938  }
939 
940  template<typename T>
941  DatatypeCommunicator<T>::DatatypeCommunicator()
942  : remoteIndices_(0), created_(false)
943  {
944  requests_[0]=0;
945  requests_[1]=0;
946  }
947 
948 
949 
950  template<typename T>
951  DatatypeCommunicator<T>::~DatatypeCommunicator()
952  {
953  free();
954  }
955 
956  template<typename T>
957  template<class T1, class T2, class V>
958  inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
959  const T1& source, V& sendData,
960  const T2& destination, V& receiveData)
961  {
962  remoteIndices_ = &remoteIndices;
963  free();
964  createDataTypes<T1,T2,V,false>(source,destination, receiveData);
965  createDataTypes<T1,T2,V,true>(source,destination, sendData);
966  createRequests<V,true>(sendData, receiveData);
967  createRequests<V,false>(receiveData, sendData);
968  created_=true;
969  }
970 
971  template<typename T>
972  void DatatypeCommunicator<T>::free()
973  {
974  if(created_) {
975  delete[] requests_[0];
976  delete[] requests_[1];
977  typedef MessageTypeMap::iterator iterator;
978  typedef MessageTypeMap::const_iterator const_iterator;
979 
980  const const_iterator end=messageTypes.end();
981 
982  for(iterator process = messageTypes.begin(); process != end; ++process) {
983  MPI_Datatype *type = &(process->second.first);
984  int finalized=0;
985  MPI_Finalized(&finalized);
986  if(*type!=MPI_DATATYPE_NULL && !finalized)
987  MPI_Type_free(type);
988  type = &(process->second.second);
989  if(*type!=MPI_DATATYPE_NULL && !finalized)
990  MPI_Type_free(type);
991  }
992  messageTypes.clear();
993  created_=false;
994  }
995 
996  }
997 
998  template<typename T>
999  template<class T1, class T2, class V, bool send>
1000  void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
1001  {
1002 
1003  MPIDatatypeInformation<V> dataInfo(data);
1004  this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1005 
1006  typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1007  const const_iterator end=this->remoteIndices_->end();
1008 
1009  // Allocate MPI_Datatypes and deallocate memory for the type construction.
1010  for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1011  IndexedTypeInformation& info=dataInfo.information_[process->first];
1012  // Shift the displacement
1013  MPI_Aint base;
1014  MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1015 
1016  for(int i=0; i< info.elements; i++) {
1017  info.displ[i]-=base;
1018  }
1019 
1020  // Create data type
1021  MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1022  MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1023  MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1024  MPI_Type_commit(type);
1025  // Deallocate memory
1026  info.free();
1027  }
1028  }
1029 
1030  template<typename T>
1031  template<class V, bool createForward>
1032  void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1033  {
1034  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1035  int rank;
1036  static int index = createForward ? 1 : 0;
1037  int noMessages = messageTypes.size();
1038  // allocate request handles
1039  requests_[index] = new MPI_Request[2*noMessages];
1040  const MapIterator end = messageTypes.end();
1041  int request=0;
1042  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1043 
1044  // Set up the requests for receiving first
1045  for(MapIterator process = messageTypes.begin(); process != end;
1046  ++process, ++request) {
1047  MPI_Datatype type = createForward ? process->second.second : process->second.first;
1048  void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1049  MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1050  }
1051 
1052  // And now the send requests
1053 
1054  for(MapIterator process = messageTypes.begin(); process != end;
1055  ++process, ++request) {
1056  MPI_Datatype type = createForward ? process->second.first : process->second.second;
1057  void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1058  MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1059  }
1060  }
1061 
1062  template<typename T>
1063  void DatatypeCommunicator<T>::forward()
1064  {
1065  sendRecv(requests_[1]);
1066  }
1067 
1068  template<typename T>
1069  void DatatypeCommunicator<T>::backward()
1070  {
1071  sendRecv(requests_[0]);
1072  }
1073 
1074  template<typename T>
1075  void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1076  {
1077  int noMessages = messageTypes.size();
1078  // Start the receive calls first
1079  MPI_Startall(noMessages, requests);
1080  // Now the send calls
1081  MPI_Startall(noMessages, requests+noMessages);
1082 
1083  // Wait for completion of the communication send first then receive
1084  MPI_Status* status=new MPI_Status[2*noMessages];
1085  for(int i=0; i<2*noMessages; i++)
1086  status[i].MPI_ERROR=MPI_SUCCESS;
1087 
1088  int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1089  int receive = MPI_Waitall(noMessages, requests, status);
1090 
1091  // Error checks
1092  int success=1, globalSuccess=0;
1093  if(send==MPI_ERR_IN_STATUS) {
1094  int rank;
1095  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1096  std::cerr<<rank<<": Error in sending :"<<std::endl;
1097  // Search for the error
1098  for(int i=noMessages; i< 2*noMessages; i++)
1099  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1100  char message[300];
1101  int messageLength;
1102  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1103  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1104  for(int j = 0; j < messageLength; j++)
1105  std::cout << message[j];
1106  }
1107  std::cerr<<std::endl;
1108  success=0;
1109  }
1110 
1111  if(receive==MPI_ERR_IN_STATUS) {
1112  int rank;
1113  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1114  std::cerr<<rank<<": Error in receiving!"<<std::endl;
1115  // Search for the error
1116  for(int i=0; i< noMessages; i++)
1117  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1118  char message[300];
1119  int messageLength;
1120  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1121  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1122  for(int j = 0; j < messageLength; j++)
1123  std::cerr << message[j];
1124  }
1125  std::cerr<<std::endl;
1126  success=0;
1127  }
1128 
1129  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1130 
1131  delete[] status;
1132 
1133  if(!globalSuccess)
1134  DUNE_THROW(CommunicationError, "A communication error occurred!");
1135 
1136  }
1137 
1139  {
1140  buffers_[0]=0;
1141  buffers_[1]=0;
1142  bufferSize_[0]=0;
1143  bufferSize_[1]=0;
1144  }
1145 
1146  template<class Data, class Interface>
1147  typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1148  BufferedCommunicator::build(const Interface& interface)
1149  {
1150  interfaces_=interface.interfaces();
1151  communicator_=interface.communicator();
1152  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1153  ::const_iterator const_iterator;
1154  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1155  const const_iterator end = interfaces_.end();
1156  int lrank;
1157  MPI_Comm_rank(communicator_, &lrank);
1158 
1159  bufferSize_[0]=0;
1160  bufferSize_[1]=0;
1161 
1162  for(const_iterator interfacePair = interfaces_.begin();
1163  interfacePair != end; ++interfacePair) {
1164  int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1165  int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1166  if (noSend + noRecv > 0)
1167  messageInformation_.insert(std::make_pair(interfacePair->first,
1168  std::make_pair(MessageInformation(bufferSize_[0],
1169  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1170  MessageInformation(bufferSize_[1],
1171  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1172  bufferSize_[0] += noSend;
1173  bufferSize_[1] += noRecv;
1174  }
1175 
1176  // allocate the buffers
1177  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1178  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1179 
1180  buffers_[0] = new char[bufferSize_[0]];
1181  buffers_[1] = new char[bufferSize_[1]];
1182  }
1183 
1184  template<class Data, class Interface>
1185  void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1186  {
1187 
1188  interfaces_=interface.interfaces();
1189  communicator_=interface.communicator();
1190  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1191  ::const_iterator const_iterator;
1192  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1193  const const_iterator end = interfaces_.end();
1194 
1195  bufferSize_[0]=0;
1196  bufferSize_[1]=0;
1197 
1198  for(const_iterator interfacePair = interfaces_.begin();
1199  interfacePair != end; ++interfacePair) {
1200  int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1201  int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1202  if (noSend + noRecv > 0)
1203  messageInformation_.insert(std::make_pair(interfacePair->first,
1204  std::make_pair(MessageInformation(bufferSize_[0],
1205  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1206  MessageInformation(bufferSize_[1],
1207  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1208  bufferSize_[0] += noSend;
1209  bufferSize_[1] += noRecv;
1210  }
1211 
1212  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1213  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1214  // allocate the buffers
1215  buffers_[0] = new char[bufferSize_[0]];
1216  buffers_[1] = new char[bufferSize_[1]];
1217  }
1218 
1219  inline void BufferedCommunicator::free()
1220  {
1221  messageInformation_.clear();
1222  if(buffers_[0])
1223  delete[] buffers_[0];
1224 
1225  if(buffers_[1])
1226  delete[] buffers_[1];
1227  buffers_[0]=buffers_[1]=0;
1228  }
1229 
1231  {
1232  free();
1233  }
1234 
1235  template<class Data>
1236  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1237  (const InterfaceInformation& info) const
1238  {
1239  return info.size();
1240  }
1241 
1242 
1243  template<class Data>
1244  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1245  (const Data&, const InterfaceInformation& info) const
1246  {
1247  return operator()(info);
1248  }
1249 
1250 
1251  template<class Data>
1252  inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1253  (const Data& data, const InterfaceInformation& info) const
1254  {
1255  int entries=0;
1256 
1257  for(size_t i=0; i < info.size(); i++)
1258  entries += CommPolicy<Data>::getSize(data,info[i]);
1259 
1260  return entries;
1261  }
1262 
1263 
1264  template<class Data, class GatherScatter, bool FORWARD>
1265  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1266  {
1267  DUNE_UNUSED_PARAMETER(bufferSize);
1268  typedef typename InterfaceMap::const_iterator
1269  const_iterator;
1270 
1271  int rank;
1272  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1273  const const_iterator end = interfaces.end();
1274  size_t index=0;
1275 
1276  for(const_iterator interfacePair = interfaces.begin();
1277  interfacePair != end; ++interfacePair) {
1278  int size = forward ? interfacePair->second.first.size() :
1279  interfacePair->second.second.size();
1280 
1281  for(int i=0; i < size; i++) {
1282  int local = forward ? interfacePair->second.first[i] :
1283  interfacePair->second.second[i];
1284  for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1285 
1286 #ifdef DUNE_ISTL_WITH_CHECKING
1287  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1288 #endif
1289  buffer[index]=GatherScatter::gather(data, local, j);
1290  }
1291 
1292  }
1293  }
1294 
1295  }
1296 
1297 
1298  template<class Data, class GatherScatter, bool FORWARD>
1299  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1300  {
1301  DUNE_UNUSED_PARAMETER(bufferSize);
1302  typedef typename InterfaceMap::const_iterator
1303  const_iterator;
1304  const const_iterator end = interfaces.end();
1305  size_t index = 0;
1306 
1307  int rank;
1308  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1309 
1310  for(const_iterator interfacePair = interfaces.begin();
1311  interfacePair != end; ++interfacePair) {
1312  size_t size = FORWARD ? interfacePair->second.first.size() :
1313  interfacePair->second.second.size();
1314 
1315  for(size_t i=0; i < size; i++) {
1316 
1317 #ifdef DUNE_ISTL_WITH_CHECKING
1318  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1319 #endif
1320 
1321  buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1322  interfacePair->second.second[i]);
1323  }
1324  }
1325 
1326  }
1327 
1328 
1329  template<class Data, class GatherScatter, bool FORWARD>
1330  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1331  {
1332  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1333  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1334 
1335  assert(infoPair!=interfaces.end());
1336 
1337  const Information& info = FORWARD ? infoPair->second.second :
1338  infoPair->second.first;
1339 
1340  for(size_t i=0, index=0; i < info.size(); i++) {
1341  for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1342  GatherScatter::scatter(data, buffer[index++], info[i], j);
1343  }
1344  }
1345 
1346 
1347  template<class Data, class GatherScatter, bool FORWARD>
1348  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1349  {
1350  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1351  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1352 
1353  assert(infoPair!=interfaces.end());
1354 
1355  const Information& info = FORWARD ? infoPair->second.second :
1356  infoPair->second.first;
1357 
1358  for(size_t i=0; i < info.size(); i++) {
1359  GatherScatter::scatter(data, buffer[i], info[i]);
1360  }
1361  }
1362 
1363 
1364  template<class GatherScatter,class Data>
1365  void BufferedCommunicator::forward(Data& data)
1366  {
1367  this->template sendRecv<GatherScatter,true>(data, data);
1368  }
1369 
1370 
1371  template<class GatherScatter, class Data>
1372  void BufferedCommunicator::backward(Data& data)
1373  {
1374  this->template sendRecv<GatherScatter,false>(data, data);
1375  }
1376 
1377 
1378  template<class GatherScatter, class Data>
1379  void BufferedCommunicator::forward(const Data& source, Data& dest)
1380  {
1381  this->template sendRecv<GatherScatter,true>(source, dest);
1382  }
1383 
1384 
1385  template<class GatherScatter, class Data>
1386  void BufferedCommunicator::backward(Data& source, const Data& dest)
1387  {
1388  this->template sendRecv<GatherScatter,false>(dest, source);
1389  }
1390 
1391 
1392  template<class GatherScatter, bool FORWARD, class Data>
1393  void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1394  {
1395  int rank, lrank;
1396 
1397  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1398  MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1399 
1400  typedef typename CommPolicy<Data>::IndexedType Type;
1401  Type *sendBuffer, *recvBuffer;
1402  size_t sendBufferSize;
1403 #ifndef NDEBUG
1404  size_t recvBufferSize;
1405 #endif
1406 
1407  if(FORWARD) {
1408  sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1409  sendBufferSize = bufferSize_[0];
1410  recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1411 #ifndef NDEBUG
1412  recvBufferSize = bufferSize_[1];
1413 #endif
1414  }else{
1415  sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1416  sendBufferSize = bufferSize_[1];
1417  recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1418 #ifndef NDEBUG
1419  recvBufferSize = bufferSize_[0];
1420 #endif
1421  }
1422  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1423 
1424  MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1425 
1426  MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1427  MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1428  /* Number of recvRequests that are not MPI_REQUEST_NULL */
1429  size_t numberOfRealRecvRequests = 0;
1430 
1431  // Setup receive first
1432  typedef typename InformationMap::const_iterator const_iterator;
1433 
1434  const const_iterator end = messageInformation_.end();
1435  size_t i=0;
1436  int* processMap = new int[messageInformation_.size()];
1437 
1438  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1439  processMap[i]=info->first;
1440  if(FORWARD) {
1441  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1442  Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1443  if(info->second.second.size_) {
1444  MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1445  MPI_BYTE, info->first, commTag_, communicator_,
1446  recvRequests+i);
1447  numberOfRealRecvRequests += 1;
1448  } else {
1449  // Nothing to receive -> set request to inactive
1450  recvRequests[i]=MPI_REQUEST_NULL;
1451  }
1452  }else{
1453  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1454  Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1455  if(info->second.first.size_) {
1456  MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1457  MPI_BYTE, info->first, commTag_, communicator_,
1458  recvRequests+i);
1459  numberOfRealRecvRequests += 1;
1460  } else {
1461  // Nothing to receive -> set request to inactive
1462  recvRequests[i]=MPI_REQUEST_NULL;
1463  }
1464  }
1465  }
1466 
1467  // now the send requests
1468  i=0;
1469  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1470  if(FORWARD) {
1471  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1472  Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1473  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1474  if(info->second.first.size_)
1475  MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1476  MPI_BYTE, info->first, commTag_, communicator_,
1477  sendRequests+i);
1478  else
1479  // Nothing to send -> set request to inactive
1480  sendRequests[i]=MPI_REQUEST_NULL;
1481  }else{
1482  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1483  Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1484  if(info->second.second.size_)
1485  MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1486  MPI_BYTE, info->first, commTag_, communicator_,
1487  sendRequests+i);
1488  else
1489  // Nothing to send -> set request to inactive
1490  sendRequests[i]=MPI_REQUEST_NULL;
1491  }
1492 
1493  // Wait for completion of receive and immediately start scatter
1494  i=0;
1495  //int success = 1;
1496  int finished = MPI_UNDEFINED;
1497  MPI_Status status; //[messageInformation_.size()];
1498  //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1499 
1500  for(i=0; i< numberOfRealRecvRequests; i++) {
1501  status.MPI_ERROR=MPI_SUCCESS;
1502  MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1503  assert(finished != MPI_UNDEFINED);
1504 
1505  if(status.MPI_ERROR==MPI_SUCCESS) {
1506  int& proc = processMap[finished];
1507  typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1508  assert(infoIter != messageInformation_.end());
1509 
1510  MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1511  assert(info.start_+info.size_ <= recvBufferSize);
1512 
1513  MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1514  }else{
1515  std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1516  //success=0;
1517  }
1518  }
1519 
1520  MPI_Status recvStatus;
1521 
1522  // Wait for completion of sends
1523  for(i=0; i< messageInformation_.size(); i++)
1524  if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1525  std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1526  //success=0;
1527  }
1528  /*
1529  int globalSuccess;
1530  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1531 
1532  if(!globalSuccess)
1533  DUNE_THROW(CommunicationError, "A communication error occurred!");
1534  */
1535  delete[] processMap;
1536  delete[] sendRequests;
1537  delete[] recvRequests;
1538 
1539  }
1540 
1541 #endif // DOXYGEN
1542 
1544 }
1545 
1546 #endif // HAVE_MPI
1547 
1548 #endif
Classes describing a distributed indexset.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:213
Provides classes for building the communication interface between remote indices. ...
V Type
The type the policy is for.
Definition: communicator.hh:139
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
VariableBlockVector< FieldVector< K, n >, A > Type
Definition: communicator.hh:179
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:216
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
Default policy used for communicating an indexed type.
Definition: communicator.hh:126
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:146
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
~BufferedCommunicator()
Destructor.
Information describing an interface.
Definition: interface.hh:98
GatherScatter default implementation that just copies data.
Definition: communicator.hh:200
void forward(const Data &source, Data &dest)
Send from source to target.
Definition: communicator.hh:174
A few common exception classes.
MPI_Comm communicator() const
Get the mpi communicator used.
Definition: remoteindices.hh:1709
Dune namespace.
Definition: alignedallocator.hh:9
CommPolicy< T >::IndexedType IndexedType
Definition: communicator.hh:202
The indices present on remote processes.
Definition: remoteindices.hh:49
Standard Dune debug streams.
void free()
Free the allocated memory (i.e. buffers and message information.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:152
Default exception class for I/O errors.
Definition: exceptions.hh:229
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:116
const InformationMap & interfaces() const
Get information about the interfaces.
Definition: interface.hh:422
A traits class describing the mapping of types onto MPI_Datatypes.
Definition: bigunsignedint.hh:25
Error thrown if there was a problem with the communication.
Definition: communicator.hh:193
Definition: communicator.hh:172
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:219
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:108
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:224
Communication interface between remote and local indices.
Definition: interface.hh:206
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:458
Base class of all classes representing a communication interface.
Definition: interface.hh:32
#define DUNE_THROW(E, m)
Definition: exceptions.hh:216
BufferedCommunicator()
Constructor.
std::enable_if< std::is_same< SizeOne, typename CommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
An index present on the local process.
Definition: localindex.hh:32