Rheolef  7.1
an efficient C++ finite element environment
mpi_scatter_init.h
Go to the documentation of this file.
1 #ifndef _RHEO_MPI_SCATTER_INIT_H
2 #define _RHEO_MPI_SCATTER_INIT_H
23 
24 #include "rheolef/compiler.h"
25 #include "rheolef/distributed.h"
26 #include "rheolef/scatter_message.h"
27 
28 #include "rheolef/msg_sort_with_permutation.h"
29 #include "rheolef/msg_to_context.h"
30 #include "rheolef/msg_from_context_pattern.h"
31 #include "rheolef/msg_from_context_indices.h"
32 #include "rheolef/msg_local_context.h"
33 #include "rheolef/msg_local_optimize.h"
34 
35 #include "rheolef/msg_util.h"
36 #include <boost/functional.hpp>
37 #include <boost/iterator/transform_iterator.hpp>
38 
39 /*F:
40 NAME: mpi_scatter_init -- gather/scatter initialize (@PACKAGE@ @VERSION@)
41 DESCRIPTION:
42  Initialize communication
43  for distributed to sequential scatter context.
44 COMPLEXITY:
45  Time and memory complexity is O(nidx+nproc).
46  For finite-element problems in d dimenion
47 
48 | nidx ~ N^((d-1)/d)
49 
50  where N is the number of degrees of freedom.
51 
52 IMPLEMENTATION
53  Inspirated from petsc-2.0/vpscat.c: VecScatterCreate_PtoS()
54 AUTHORS:
55  LMC-IMAG, 38041 Grenoble cedex 9, France
56  | Pierre.Saramito@imag.fr
57 DATE: 23 march 1999
58 END:
59 */
60 
61 namespace rheolef {
62 
63 //<mpi_scatter_init:
64 template <class Message, class Size, class SizeRandomIterator1,
65  class SizeRandomIterator2, class SizeRandomIterator3, class Tag>
66 void
68 // input:
69  Size nidx,
70  SizeRandomIterator1 idx,
71  Size nidy,
72  SizeRandomIterator2 idy,
73  Size idy_maxval,
74  SizeRandomIterator3 ownership,
75  Tag tag,
77 // output:
78  Message& from,
79  Message& to)
80 {
81  typedef Size size_type;
82  size_type my_proc = comm.rank();
83  size_type nproc = comm.size();
84 
85  // -------------------------------------------------------
86  // 1) first count number of contributors to each processor
87  // -------------------------------------------------------
88  std::vector<size_type> msg_size(nproc, 0);
89  std::vector<size_type> msg_mark(nproc, 0);
90  std::vector<size_type> owner (nidx);
91  size_type send_nproc = 0;
92  {
93  size_type iproc = 0;
94  for (size_type i = 0; i < nidx; i++) {
95  for (; iproc < nproc; iproc++) {
96  if (idx[i] >= ownership[iproc] && idx[i] < ownership[iproc+1]) {
97  owner[i] = iproc;
98  msg_size [iproc]++;
99  if (!msg_mark[iproc]) {
100  msg_mark[iproc] = 1;
101  send_nproc++;
102  }
103  break;
104  }
105  }
106  check_macro (iproc != nproc, "bad stash data: idx["<<i<<"]="<<idx[i]<<" out of range [0:"<<ownership[nproc]<<"[");
107  }
108  } // end block
109  // -------------------------------------------------------
110  // 2) avoid to send message to my-proc in counting
111  // -------------------------------------------------------
112  size_type n_local = msg_size[my_proc];
113  if (n_local != 0) {
114  msg_size [my_proc] = 0;
115  msg_mark [my_proc] = 0;
116  send_nproc--;
117  }
118  // ----------------------------------------------------------------
119  // 3) compute number of messages to be send to my_proc
120  // ----------------------------------------------------------------
121  std::vector<size_type> work(nproc);
122  mpi::all_reduce (
123  comm,
124  msg_mark.begin().operator->(),
125  nproc,
126  work.begin().operator->(),
127  std::plus<size_type>());
128  size_type receive_nproc = work [my_proc];
129  // ----------------------------------------------------------------
130  // 4) compute messages max size to be send to my_proc
131  // ----------------------------------------------------------------
132  mpi::all_reduce (
133  comm,
134  msg_size.begin().operator->(),
135  nproc,
136  work.begin().operator->(),
137  mpi::maximum<size_type>());
138  size_type receive_max_size = work [my_proc];
139  // ----------------------------------------------------------------
140  // 5) post receive: exchange the buffer adresses between processes
141  // ----------------------------------------------------------------
142  std::list<std::pair<size_type,mpi::request> > receive_waits;
143  std::vector<size_type> receive_data (receive_nproc*receive_max_size);
144  for (size_type i_receive = 0; i_receive < receive_nproc; i_receive++) {
145  mpi::request i_req = comm.irecv (
146  mpi::any_source,
147  tag,
148  receive_data.begin().operator->() + i_receive*receive_max_size,
149  receive_max_size);
150  receive_waits.push_back (std::make_pair(i_receive, i_req));
151  }
152  // ---------------------------------------------------------------------------
153  // 6) compute the send indexes
154  // ---------------------------------------------------------------------------
155  // comme idx est trie, on peut faire une copie de idx dans send_data
156  // et du coup owner et send_data_ownership sont inutiles
157  std::vector<size_type> send_data (nidx);
158  std::copy (idx, idx+nidx, send_data.begin());
159  // ---------------------------------------------------------------------------
160  // 7) do send
161  // ---------------------------------------------------------------------------
162  std::list<std::pair<size_type,mpi::request> > send_waits;
163  {
164  size_type i_send = 0;
165  size_type i_start = 0;
166  for (size_type iproc = 0; iproc < nproc; iproc++) {
167  size_type i_msg_size = msg_size[iproc];
168  if (i_msg_size == 0) continue;
169  mpi::request i_req = comm.isend (
170  iproc,
171  tag,
172  send_data.begin().operator->() + i_start,
173  i_msg_size);
174  send_waits.push_back(std::make_pair(i_send,i_req));
175  i_send++;
176  i_start += i_msg_size;
177  }
178  } // end block
179  // ---------------------------------------------------------------------------
180  // 8) wait on receives
181  // ---------------------------------------------------------------------------
182  // note: for wait_all, build an iterator adapter that scan the pair.second in [index,request]
183  // and then get an iterator in the pair using iter.base(): retrive the corresponding index
184  // for computing the position in the receive.data buffer
185  typedef boost::transform_iterator<select2nd<size_t,mpi::request>, std::list<std::pair<size_t,mpi::request> >::iterator>
186  request_iterator;
187  std::vector<size_type> receive_size (receive_nproc);
188  std::vector<size_type> receive_proc (receive_nproc);
189  size_type receive_total_size = 0;
190  while (receive_waits.size() != 0) {
191  typedef size_type data_type; // exchanged data is of "size_type"
192  request_iterator iter_r_waits (receive_waits.begin(), select2nd<size_t,mpi::request>()),
193  last_r_waits (receive_waits.end(), select2nd<size_t,mpi::request>());
194  // waits on any receive...
195  std::pair<mpi::status,request_iterator> pair_status = mpi::wait_any (iter_r_waits, last_r_waits);
196  // check status
197  boost::optional<int> i_msg_size_opt = pair_status.first.count<data_type>();
198  check_macro (i_msg_size_opt, "receive wait failed");
199  int iproc = pair_status.first.source();
200  check_macro (iproc >= 0, "receive: source iproc = "<<iproc<<" < 0 !");
201  // get size of receive and number in data
202  size_type i_msg_size = (size_t)i_msg_size_opt.get();
203  std::list<std::pair<size_t,mpi::request> >::iterator i_pair_ptr = pair_status.second.base();
204  size_type i_receive = (*i_pair_ptr).first;
205  receive_proc [i_receive] = iproc;
206  receive_size [i_receive] = i_msg_size;
207  receive_total_size += i_msg_size;
208  receive_waits.erase (i_pair_ptr);
209  }
210  // ---------------------------------------------------------------------------
211  // 9) allocate the entire send(to) scatter context
212  // ---------------------------------------------------------------------------
213  to.resize (receive_total_size, receive_nproc);
214 
215  // ---------------------------------------------------------------------------
216  // 10) compute the permutation of values that gives the sorted source[] sequence
217  // ---------------------------------------------------------------------------
218  // init: perm[i] = i
219  std::vector<size_type> perm(receive_nproc);
220  copy(index_iterator<size_type>(), index_iterator<size_type>(receive_nproc), perm.begin());
222  receive_proc.begin().operator->(),
223  perm.begin().operator->(),
224  receive_nproc);
225  // ---------------------------------------------------------------------------
226  // 11) Computes the receive compresed message pattern for send(to)
227  // ---------------------------------------------------------------------------
228  size_type istart = ownership[my_proc]; // = ownership.first_index()
230  perm.begin(),
231  perm.end(),
232  receive_proc.begin(),
233  receive_size.begin(),
234  receive_data.begin(),
235  receive_max_size,
236  istart,
237  to.procs().begin(),
238  to.starts().begin(),
239  to.indices().begin());
240  // ---------------------------------------------------------------------------
241  // 12) allocate the entire receive(from) scatter context
242  // ---------------------------------------------------------------------------
243  from.resize(nidy, send_nproc);
244  // ---------------------------------------------------------------------------
245  // 13) Computes the receive compresed message pattern for receive(from)
246  // ---------------------------------------------------------------------------
247  std::vector<size_type> proc2from_proc(nproc);
249  msg_size.begin(),
250  msg_size.end(),
251  from.procs().begin(),
252  from.starts().begin(),
253  proc2from_proc.begin());
254  // ---------------------------------------------------------------------------
255  // 14) Computes the receive compresed message indices for receive(from)
256  // ---------------------------------------------------------------------------
257  // assume that indices are sorted by increasing order
258  std::vector<size_type> start(send_nproc+1);
259  copy (from.starts().begin(), from.starts().end(), start.begin());
261  owner.begin(),
262  owner.end(),
263  idy,
264  proc2from_proc.begin(),
265  my_proc,
266  idy_maxval,
267  start.begin(),
268  from.indices().begin());
269  // ---------------------------------------------------------------------------
270  // 15) wait on sends
271  // ---------------------------------------------------------------------------
272  request_iterator iter_s_waits (send_waits.begin(), select2nd<size_type,mpi::request>()),
273  last_s_waits (send_waits.end(), select2nd<size_type,mpi::request>());
274  mpi::wait_all (iter_s_waits, last_s_waits);
275  // ---------------------------------------------------------------------------
276  // 16) Computes the receive compresed message local pattern,
277  // i.e. the only part that does not requires communication.
278  // ---------------------------------------------------------------------------
279  from.local_slots.resize(n_local);
280  to.local_slots.resize(n_local);
281  size_type ilast = ownership[my_proc+1]; // = ownership.last_index()
283  idx,
284  idx+nidx,
285  idy,
286  idy_maxval,
287  istart,
288  ilast,
289  to.local_slots.begin(),
290  to.local_slots.end(),
291  from.local_slots.begin());
292  // ---------------------------------------------------------------------------
293  // 17) Optimize local exchanges during gatter/scatter
294  // ---------------------------------------------------------------------------
295  bool has_opt = msg_local_optimize (
296  to.local_slots.begin(),
297  to.local_slots.end(),
298  from.local_slots.begin());
299 
300  if (has_opt && n_local != 0) {
301  to.local_is_copy = true;
302  to.local_copy_start = to.local_slots[0];
303  to.local_copy_length = n_local;
304  from.local_is_copy = true;
305  from.local_copy_start = from.local_slots[0];
306  from.local_copy_length = n_local;
307  }
308 }
309 //>mpi_scatter_init:
310 } // namespace rheolef
311 #endif // _RHEO_MPI_SCATTER_INIT_H
field::size_type size_type
Definition: branch.cc:425
size_t size_type
Definition: basis_get.cc:76
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
This file is part of Rheolef.
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
bool msg_local_optimize(InputIterator1 to_loc_idx, InputIterator1 last_to_loc_idx, InputIterator2 from_loc_idy)
void msg_from_context_indices(InputIterator1 owner, InputIterator1 last_owner, InputIterator2 idy, InputRandomIterator proc2from_proc, Proc my_proc, Size idy_maxval, MutableRandomIterator ptr, OutputIterator from_idx)
void msg_from_context_pattern(InputIterator1 msg_size, InputIterator1 last_msg_size, OutputIterator1 from_proc, OutputIterator2 from_ptr, OutputIterator3 proc2from_proc)
void msg_to_context(InputIterator1 perm, InputIterator1 last_perm, InputRandomIterator2 r_iproc, InputRandomIterator3 r_size, InputRandomIterator4 r_idx, Size receive_max_size, Size istart, OutputIterator1 to_proc, OutputIterator2 to_ptr, OutputIterator3 to_idx)
void sort_with_permutation(RandomIterator v, SizeRandomIterator p, Size n)
void msg_local_context(InputIterator1 idx, InputIterator1 last_idx, InputIterator2 idy, Size idy_maxval, Size istart, Size ilast, OutputIterator1 to_loc_idx, OutputIterator1 last_to_loc_idx, OutputIterator2 from_loc_idy)