Rheolef  7.1
an efficient C++ finite element environment
hack_array_mpi.icc
Go to the documentation of this file.
1 #include "rheolef/config.h"
22 #ifdef _RHEOLEF_HAVE_MPI
23 
24 #include "rheolef/hack_array.h"
25 
26 namespace rheolef {
27 
28 // ===============================================================
29 // allocator
30 // ===============================================================
31 template<class T, class A>
32 hack_array_mpi_rep<T,A>::hack_array_mpi_rep (const A& alloc)
33  : base(alloc),
34  _stash(),
35  _send(),
36  _receive(),
37  _receive_max_size(),
38  _ext_x()
39 {
40 }
41 template<class T, class A>
42 hack_array_mpi_rep<T,A>::hack_array_mpi_rep (const distributor& ownership, const parameter_type& param, const A& alloc)
43  : base(ownership,param,alloc),
44  _stash(),
45  _send(),
46  _receive(),
47  _receive_max_size(),
48  _ext_x()
49 {
50 }
51 template <class T, class A>
52 void
53 hack_array_mpi_rep<T,A>::resize (const distributor& ownership, const parameter_type& param)
54 {
55  base::resize (ownership, param);
56  _stash.clear();
57  _send.waits.clear();
58  _send.data.clear();
59  _receive.waits.clear();
60  _receive.data.clear();
61  _receive_max_size = 0;
62 }
63 // ===============================================================
64 // assembly
65 // ===============================================================
66 template <class T, class A>
67 void
68 hack_array_mpi_rep<T,A>::set_dis_entry (size_type dis_i, const generic_value_type& val)
69 {
70  if (ownership().is_owned (dis_i)) {
71  size_type i = dis_i - ownership().first_index();
72  operator[](i) = val;
73  return;
74  }
75  assert_macro (dis_i < ownership().dis_size(), "index "<<dis_i
76  << " is out of range [0:" << ownership().dis_size() << "[");
77  // loop on the raw data vector (fixedsize, run-time known)
78  size_type dis_iraw = base::_value_size*dis_i + 1;
79  typename generic_value_type::const_iterator iter = val._data_begin();
80  for (size_type loc_iraw = 0, loc_nraw = val._data_size(); loc_iraw < loc_nraw; loc_iraw++, iter++) {
81  _stash.insert (std::pair<const size_type,raw_type>(dis_iraw + loc_iraw, *iter));
82  }
83 }
84 template <class T, class A>
85 void
86 hack_array_mpi_rep<T,A>::dis_entry_assembly_begin ()
87 {
88  _receive_max_size = mpi_assembly_begin (
89  _stash,
90  make_apply_iterator(_stash.begin(), first_op<typename stash_map_type::value_type>()),
91  make_apply_iterator(_stash.end(), first_op<typename stash_map_type::value_type>()),
92  raw_base::ownership(),
93  _receive,
94  _send);
95 
96  _stash.clear();
97 }
98 template <class T, class A>
99 void
100 hack_array_mpi_rep<T,A>::dis_entry_assembly_end()
101 {
103  _receive,
104  _send,
105  _receive_max_size,
107  raw_base::begin() - raw_base::ownership().first_index(),
108  set_op<raw_type,raw_type>(),
109  size_type(0),
110  std::false_type()));
111 
112  _send.waits.clear();
113  _send.data.clear();
114  _receive.waits.clear();
115  _receive.data.clear();
116  _receive_max_size = 0;
117 }
118 // ===============================================================
119 // scatter
120 // ===============================================================
125 template <class T, class A>
126 template <class Set, class Map>
127 void
128 hack_array_mpi_rep<T,A>::append_dis_entry (const Set& ext_idx_set, Map& ext_idx_map) const
129 {
130  // 0) declare the local context for raw data
131  scatter_message<std::vector<raw_type> > raw_from;
132  scatter_message<std::vector<raw_type> > raw_to;
133 
134  // 1) convert set to vector, for direct acess & multiply by data_size
135  std::vector<size_type> raw_ext_idx (base::_data_size*ext_idx_set.size());
136  typename Set::const_iterator iter = ext_idx_set.begin();
137  for (size_type i = 0; i < ext_idx_set.size(); i++, iter++) {
138  size_type idx = *iter;
139  for (size_type l = 0; l < base::_data_size; l++) {
140  size_type raw_i = base::_data_size*i + l;
141  size_type raw_idx = base::_value_size*idx + l + (base::_value_size - base::_data_size);
142  raw_ext_idx [raw_i] = raw_idx;
143  }
144  }
145  // 2) declare id[i]=i for scatter
146  std::vector<size_type> raw_id (raw_ext_idx.size());
147  for (size_type i = 0; i < raw_id.size(); i++) raw_id[i] = i;
148 
149  // 3) init scatter
150  size_type raw_dis_size = base::_value_size*ownership().dis_size();
151  size_type raw_size = base::_value_size*ownership().size();
152  distributor raw_ownership (raw_dis_size, ownership().comm(), raw_size);
153  distributor::tag_type tag_init = distributor::get_new_tag();
155  raw_ext_idx.size(),
156  raw_ext_idx.begin().operator->(),
157  raw_id.size(),
158  raw_id.begin().operator->(),
159  raw_ownership.dis_size(),
160  raw_ownership.begin().operator->(),
161  tag_init,
162  raw_ownership.comm(),
163  raw_from,
164  raw_to);
165 
166  // 4) begin scatter: send local data to others and get ask for missing data
167  std::vector<raw_type> raw_buffer (raw_ext_idx.size());
168  distributor::tag_type tag = distributor::get_new_tag();
169  // access to an itrator to raw_data (behaves as a pointer on raw_type)
170  typedef typename base::base raw_base;
171  typename raw_base::const_iterator raw_begin = raw_base::begin();
173  raw_begin,
174  raw_buffer.begin().operator->(),
175  raw_from,
176  raw_to,
177  set_op<raw_type,raw_type>(),
178  tag,
179  raw_ownership.comm());
180 
181  // 5) end scatter: receive missing data
183  raw_begin,
184  raw_buffer.begin().operator->(),
185  raw_from,
186  raw_to,
187  set_op<raw_type,raw_type>(),
188  tag,
189  raw_ownership.comm());
190 
191  // 6) unpack raw data: build the associative container: pair (ext_idx ; data)
192  iter = ext_idx_set.begin();
193  for (size_type i = 0; i < ext_idx_set.size(); i++, iter++) {
194  size_type idx = *iter;
195  automatic_value_type value (base::_parameter);
196  typename automatic_value_type::iterator p = value._data_begin();
197  for (size_type l = 0; l < base::_data_size; l++, p++) {
198  size_type raw_i = base::_data_size*i + l;
199  *p = raw_buffer[raw_i];
200  }
201  ext_idx_map.insert (std::make_pair (idx, value));
202  }
203 }
204 template <class T, class A>
205 typename hack_array_mpi_rep<T,A>::const_reference
206 hack_array_mpi_rep<T,A>::dis_at (const size_type dis_i) const
207 {
208  if (ownership().is_owned(dis_i)) {
209  size_type i = dis_i - ownership().first_index();
210  return operator[](i);
211  }
212  typename scatter_map_type::const_iterator iter = _ext_x.find (dis_i);
213  check_macro (iter != _ext_x.end(), "unexpected external index="<<dis_i);
214  return (*iter).second;
215 }
216 template <class T, class A>
217 void
218 hack_array_mpi_rep<T,A>::update_dis_entries() const
219 {
220  std::set<size_type> ext_i;
221  for (typename scatter_map_type::const_iterator
222  iter = _ext_x.begin(),
223  last = _ext_x.end(); iter != last; ++iter) {
224  ext_i.insert ((*iter).first);
225  }
226  get_dis_entry (ext_i, _ext_x);
227 }
228 // ===============================================================
229 // put & get
230 // ===============================================================
231 template <class T, class A>
232 template <class PutFunction>
233 odiststream&
234 hack_array_mpi_rep<T,A>::put_values (odiststream& ops, PutFunction put_element) const
235 {
236  distributor::tag_type tag = distributor::get_new_tag();
237  std::ostream& os = ops.os();
238 
239  // determine maximum message to arrive
240  size_type max_size;
241  mpi::reduce(comm(), size(), max_size, mpi::maximum<size_type>(), 0);
242 
243  size_type io_proc = odiststream::io_proc();
244  if (ownership().process() == io_proc) {
245  base::put_values (ops, put_element);
246  // then, receive and print messages
247  std::vector<raw_type> raw_values (max_size*base::_data_size, std::numeric_limits<raw_type>::max());
248  for (size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
249  if (iproc == io_proc) continue;
250  size_type loc_sz_i = ownership().size(iproc);
251  if (loc_sz_i == 0) continue;
252  mpi::status status = comm().recv(iproc, tag, raw_values.begin().operator->(), raw_values.size());
253  boost::optional<int> n_data_opt = status.count<raw_type>();
254  check_macro (n_data_opt, "receive failed");
255  size_type n_data = n_data_opt.get();
256  check_macro (n_data == loc_sz_i*base::_data_size, "unexpected receive message size");
257  typename T::automatic_type tmp (base::_parameter);
258  for (size_type i = 0; i < loc_sz_i; i++) {
259  typename T::iterator p = tmp._data_begin();
260  for (size_type iloc = 0; iloc < base::_data_size; iloc++, p++) {
261  *p = raw_values [i*base::_data_size + iloc];
262  }
263  put_element (os, tmp);
264  os << std::endl;
265  }
266  }
267  os << std::flush;
268  } else {
269  if (size() != 0) {
270  std::vector<raw_type> raw_values (size()*base::_data_size, std::numeric_limits<raw_type>::max());
271  for (size_type i = 0, n = size(); i < n; i++) {
272  for (size_type j = 0; j < base::_data_size; j++) {
273  raw_values [i*base::_data_size + j] = raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size));
274  }
275  }
276  comm().send(io_proc, tag, raw_values.begin().operator->(), raw_values.size());
277  }
278  }
279  return ops;
280 }
281 template <class T, class A>
282 odiststream&
283 hack_array_mpi_rep<T,A>::put_values (odiststream& ops) const
284 {
285  return put_values (ops, _disarray_put_element_type<generic_value_type>());
286 }
287 template <class T, class A>
288 template <class GetFunction>
289 idiststream&
290 hack_array_mpi_rep<T,A>::get_values (idiststream& ips, GetFunction get_element)
291 {
292  distributor::tag_type tag = distributor::get_new_tag();
293  std::istream& is = ips.is();
294  size_type my_proc = ownership().process();
295  size_type io_proc = odiststream::io_proc();
296  size_type size_max = 1;
297  for (size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
298  size_max = std::max (size_max, ownership().size(iproc));
299  }
300  distributor io_ownership (size_max, comm(), (my_proc == io_proc) ? size_max : 0);
301  hack_array_seq_rep<T,A> data_proc_j (io_ownership, base::_parameter);
302  if (my_proc == io_proc) {
303  // load first chunk associated to proc 0
304  check_macro (load_chunk (is, begin(), end(), get_element), "read failed on input stream.");
305  if (ownership().n_process() > 1) {
306  // read in other chuncks and send to other processors
307  // determine maximum chunck owned by other
308  std::vector<raw_type> raw_values (size_max*base::_data_size, std::numeric_limits<raw_type>::max());
309  typename hack_array_seq_rep<T,A>::iterator start_j = data_proc_j.begin();
310  // bizarre qu'on lise ts les blocs dans la meme zone de memoire
311  // et qu'on attende pas que ce soit envoye pour ecraser par le suivant ?
312  for (size_type jproc = 0; jproc < ownership().n_process(); jproc++) {
313  if (jproc == io_proc) continue;
314  // load first chunk associated to proc j
315  size_type loc_sz_j = ownership().size(jproc);
316  if (loc_sz_j == 0) continue;
317  typename hack_array_seq_rep<T,A>::iterator last_j = start_j + loc_sz_j;
318  check_macro (load_chunk (is, start_j, last_j, get_element), "read failed on input stream.");
319  for (size_type i = 0, n = loc_sz_j; i < n; i++) {
320  for (size_type j = 0; j < base::_data_size; j++) {
321  raw_values [i*base::_data_size + j] = data_proc_j.raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size));
322  }
323  }
324  comm().send (jproc, tag, raw_values.begin().operator->(), loc_sz_j*base::_data_size);
325  }
326  }
327  } else {
328  if (size() != 0) {
329  std::vector<raw_type> raw_values (size()*base::_data_size, std::numeric_limits<raw_type>::max());
330  comm().recv (io_proc, tag, raw_values.begin().operator->(), size()*base::_data_size);
331  for (size_type i = 0, n = size(); i < n; i++) {
332  for (size_type j = 0; j < base::_data_size; j++) {
333  raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size))
334  = raw_values [i*base::_data_size + j];
335  }
336  }
337  }
338  }
339  return ips;
340 }
341 template <class T, class A>
342 idiststream&
343 hack_array_mpi_rep<T,A>::get_values (idiststream& ips)
344 {
345  return get_values (ips, _disarray_get_element_type<generic_value_type>());
346 }
347 template <class T, class A>
348 template <class PutFunction, class Permutation>
349 odiststream&
350 hack_array_mpi_rep<T,A>::permuted_put_values (
351  odiststream& ops,
352  const Permutation& perm,
353  PutFunction put_element) const
354 {
355  // NOTE: could be merged with disarray::permuted_put_value : same code exactly
356  assert_macro (perm.size() == size(), "permutation size does not match");
357  size_type io_proc = odiststream::io_proc();
358  size_type my_proc = comm().rank();
359  distributor io_ownership (dis_size(), comm(), (my_proc == io_proc) ? dis_size() : 0);
360  hack_array_mpi_rep<T,A> perm_x (io_ownership, base::_parameter);
361  for (size_type i = 0, n = size(); i < n; i++) {
362  perm_x.dis_entry (perm[i]) = operator[](i);
363  }
364  perm_x.dis_entry_assembly();
365  perm_x.hack_array_seq_rep<T,A>::put_values (ops, put_element);
366  return ops;
367 }
368 // ===============================================================
369 // repartition
370 // ===============================================================
371 template <class T, class A>
372 template <class A2>
373 void
374 hack_array_mpi_rep<T,A>::repartition ( // old_numbering for *this
375  const disarray_rep<size_type,distributed,A2>& partition, // old_ownership
376  hack_array_mpi_rep<T,A>& new_array, // new_ownership
377  disarray_rep<size_type,distributed,A2>& old_numbering, // new_ownership
378  disarray_rep<size_type,distributed,A2>& new_numbering) const // old_ownership
379 {
380  using namespace std;
381  communicator comm = ownership().comm();
382  size_type nproc = comm.size();
383  size_type my_proc = comm.rank();
384  vector<size_type> send_local_elt_size (nproc, 0);
385  typename disarray_rep<size_type,distributed,A2>::const_iterator iter_part = partition.begin();
386  for (size_type ie = 0; ie < partition.size(); ie++, iter_part++) {
387  send_local_elt_size [*iter_part]++;
388  }
389  vector<size_type> recv_local_elt_size (nproc, 0);
390  all_to_all (comm, send_local_elt_size, recv_local_elt_size);
391  vector<size_type> recv_local_elt_start (nproc+1);
392  recv_local_elt_start [0] = 0;
393  for (size_type iproc = 0; iproc < nproc; iproc++) {
394  recv_local_elt_start [iproc+1] = recv_local_elt_start [iproc] + recv_local_elt_size[iproc];
395  }
396  vector<size_type> send_local_elt_start (nproc);
397  all_to_all (comm, recv_local_elt_start.begin().operator->(), send_local_elt_start.begin().operator->());
398  size_type new_local_n_elt = recv_local_elt_start [nproc];
399  size_type global_n_elt = dis_size();
400 
401  // re-distribute data:
402  distributor new_elt_ownership (global_n_elt, comm, new_local_n_elt);
403  new_array.resize (new_elt_ownership, base::_parameter); // CHANGE FROM ARRAY here
404  old_numbering.resize (new_elt_ownership, numeric_limits<size_type>::max());
405  new_numbering.resize (ownership(), numeric_limits<size_type>::max());
406  iter_part = partition.begin();
407  const_iterator iter_elt = begin();
408  typename disarray_rep<size_type,distributed,A2>::iterator iter_new_num_elt = new_numbering.begin();
409  for (size_type ie = 0, ne = partition.size(); ie < ne; ie++, iter_part++, iter_elt++, iter_new_num_elt++) {
410  size_type iproc = *iter_part;
411  const generic_value_type& x = *iter_elt; // CHANGE FROM ARRAY here
412  size_type new_global_ie = new_elt_ownership[iproc] + send_local_elt_start[iproc];
413  new_array.dis_entry (new_global_ie) = x;
414  *iter_new_num_elt = new_global_ie;
415  size_type old_global_ie = ownership()[my_proc] + ie;
416  old_numbering.dis_entry (new_global_ie) = old_global_ie;
417  send_local_elt_start[iproc]++;
418  }
419  new_array.dis_entry_assembly();
420  old_numbering.template dis_entry_assembly<typename default_set_op<size_type>::type>();
421 }
422 
423 } // namespace rheolef
424 #endif // _RHEOLEF_HAVE_MPI
field::size_type size_type
Definition: branch.cc:425
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
This file is part of Rheolef.
apply_iterator< Iterator, Operator > make_apply_iterator(Iterator i, Operator op)
Definition: msg_util.h:142
Size mpi_assembly_end(Message &receive, Message &send, Size receive_max_size, Container x)
Stash::size_type mpi_assembly_begin(const Stash &stash, InputIterator first_stash_idx, InputIterator last_stash_idx, const distributor &ownership, Message &receive, Message &send)
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
disarray_store< OutputRandomIterator, SetOp, Size, IsContainer > disarray_make_store(OutputRandomIterator x, SetOp op, Size, IsContainer)
void mpi_scatter_end(InputIterator x, OutputIterator y, Message &from, Message &to, SetOp op, Tag tag, Comm comm)
bool load_chunk(std::istream &s, RandomIterator iter, RandomIterator last)
Definition: load_chunk.h:27
void mpi_scatter_begin(InputIterator x, OutputIterator y, Message &from, Message &to, SetOp op, Tag tag, Comm comm)
Definition: sphere.icc:25