22 # include "rheolef/config.h"
23 # ifdef _RHEOLEF_HAVE_MPI
25 # include "rheolef/disarray.h"
27 # include "rheolef/mpi_assembly_begin.h"
28 # include "rheolef/mpi_assembly_end.h"
29 # include "rheolef/mpi_scatter_init.h"
30 # include "rheolef/mpi_scatter_begin.h"
31 # include "rheolef/mpi_scatter_end.h"
32 # include "rheolef/load_chunk.h"
33 # include "rheolef/disarray_store.h"
34 # include "rheolef/rheostream.h"
40 template <
class T,
class A>
41 disarray_rep<T,distributed,A>::disarray_rep (
const disarray_rep<T,distributed,A>& x)
49 assert_macro(x._stash.size() == 0 &&
50 x._send.data.size() == 0 &&
51 x._receive.data.size() == 0,
52 "copy during assembly phase: should not be done");
54 template <
class T,
class A>
55 disarray_rep<T,distributed,A>::disarray_rep (
56 const distributor& ownership,
59 : base(ownership, init_val,alloc),
67 template <
class T,
class A>
69 disarray_rep<T,distributed,A>::resize (
70 const distributor& ownership,
73 base::resize(ownership, init_val);
77 _receive.waits.clear();
78 _receive.data.clear();
79 _receive_max_size = 0;
87 typename Map::mapped_type&
88 _stash_create_new_entry (Map& stash,
typename Map::size_type dis_i, std::false_type)
91 typedef typename Map::mapped_type
T;
92 std::pair<typename Map::iterator,bool>
status
93 = stash.insert (std::pair<const size_type,T>(dis_i,
T()));
94 return (*(
status.first)).second;
96 template <
class MultiMap>
98 typename MultiMap::mapped_type&
99 _stash_create_new_entry (MultiMap& stash,
typename MultiMap::size_type dis_i, std::true_type)
102 typedef typename MultiMap::mapped_type U;
103 std::pair<typename MultiMap::iterator,bool>
status
104 = stash.insert (std::pair<const size_type,U>(dis_i,U()));
105 return (*(
status.first)).second;
107 template <
class T,
class A>
108 typename disarray_rep<T,distributed,A>::reference
109 disarray_rep<T,distributed,A>::new_dis_entry (
size_type dis_i)
111 size_type first_dis_i = ownership().first_index();
112 size_type last_dis_i = ownership().last_index();
113 if (dis_i >= first_dis_i && dis_i < last_dis_i) {
114 return disarray_rep<T,distributed,A>::operator[](dis_i - first_dis_i);
116 assert_macro (dis_i < ownership().dis_size(),
"index "<<dis_i
117 <<
" is out of range [0:" << ownership().dis_size() <<
"[");
118 return _stash_create_new_entry (_stash, dis_i, is_container());
125 _stash_set (Map& stash,
typename Map::size_type dis_i,
const typename Map::mapped_type& val, std::false_type)
128 typedef typename Map::mapped_type
T;
129 std::pair<typename Map::iterator,bool>
status
130 = stash.insert (std::pair<const size_type,T>(dis_i,
T()));
131 (*(
status.first)).second = val;
136 _stash_set_add (Map& stash,
typename Map::size_type dis_i,
const typename Map::mapped_type& val, std::false_type)
139 typedef typename Map::mapped_type
T;
140 std::pair<typename Map::iterator,bool>
status
141 = stash.insert (std::pair<const size_type,T>(dis_i,
T()));
142 (*(
status.first)).second += val;
144 template <
class MultiMap,
class T>
150 typedef typename MultiMap::iterator iterator;
152 typedef typename MultiMap::mapped_type U;
153 std::pair<iterator, iterator> range_dis_i = stash.equal_range (dis_i);
154 stash.erase (range_dis_i.first, range_dis_i.second);
155 for (
typename T::const_iterator iter = val.begin(), last = val.end(); iter != last; iter++) {
156 stash.insert (std::pair<const size_type,U>(dis_i,*iter));
160 template <
class MultiMap,
class U>
163 _stash_set_add_multi (MultiMap& stash,
typename MultiMap::size_type dis_i,
const U& val, std::false_type)
166 typedef typename MultiMap::mapped_type W;
167 stash.insert (std::pair<const size_type,W>(dis_i,val));
170 template <
class MultiMap,
class U>
173 _stash_set_add_multi (MultiMap& stash,
typename MultiMap::size_type dis_i,
const U& val, std::true_type)
176 typedef typename MultiMap::mapped_type W;
177 for (
typename U::const_iterator iter = val.begin(), last = val.end(); iter != last; iter++) {
178 stash.insert (std::pair<const size_type,W>(dis_i,*iter));
182 template <
class MultiMap,
class U>
185 _stash_set_add (MultiMap& stash,
typename MultiMap::size_type dis_i,
const U& val, std::true_type)
187 _stash_set_add_multi (stash, dis_i, val,
typename is_container<U>::type());
189 template <
class T,
class A>
191 disarray_rep<T,distributed,A>::set_dis_entry (
size_type dis_i,
const T& value)
193 size_type start = ownership().first_index();
194 size_type last = ownership().last_index();
195 if (dis_i >= start && dis_i < last) {
196 disarray_rep<T,distributed,A>::operator[](dis_i - start) = value;
198 assert_macro (dis_i < ownership().dis_size(),
"index "<<dis_i
199 <<
" is out of range [0:" << ownership().dis_size() <<
"[");
200 _stash_set (_stash, dis_i, value, is_container());
203 template <
class T,
class A>
206 disarray_rep<T,distributed,A>::set_add_dis_entry (
size_type dis_i,
const U& val)
208 size_type start = ownership().first_index();
209 size_type last = ownership().last_index();
210 if (dis_i >= start && dis_i < last) {
211 base::operator[](dis_i - start) += val;
213 assert_macro (dis_i < ownership().dis_size(),
214 "index "<<dis_i<<
" is out of range [0:"<<ownership().dis_size());
215 _stash_set_add (_stash, dis_i, val, is_container());
218 template <
class T,
class A>
219 template <
class SetOp>
221 disarray_rep<T,distributed,A>::dis_entry_assembly_begin (SetOp my_set_op)
233 template <
class T,
class A>
234 template <
class SetOp>
236 disarray_rep<T,distributed,A>::dis_entry_assembly_end(SetOp my_set_op)
243 begin() - ownership().first_index(),
250 _receive.waits.clear();
251 _receive.data.clear();
252 _receive_max_size = 0;
257 template <
class T,
class A>
260 disarray_rep<T,distributed,A>::repartition (
261 const disarray_rep<size_type,distributed,A2>& partition,
262 disarray_rep<T,distributed,A>& new_disarray,
263 disarray_rep<size_type,distributed,A2>& old_numbering,
264 disarray_rep<size_type,distributed,A2>& new_numbering)
const
267 communicator_type comm = ownership().comm();
270 vector<size_type> send_local_elt_size (nproc, 0);
271 typename disarray_rep<size_type,distributed,A2>::const_iterator iter_part = partition.begin();
272 for (
size_type ie = 0; ie < partition.size(); ie++, iter_part++) {
273 send_local_elt_size [*iter_part]++;
275 vector<size_type> recv_local_elt_size (nproc, 0);
276 all_to_all (comm, send_local_elt_size, recv_local_elt_size);
277 vector<size_type> recv_local_elt_start (nproc+1);
278 recv_local_elt_start [0] = 0;
279 for (
size_type iproc = 0; iproc < nproc; iproc++) {
280 recv_local_elt_start [iproc+1] = recv_local_elt_start [iproc] + recv_local_elt_size[iproc];
282 vector<size_type> send_local_elt_start (nproc);
283 all_to_all (comm, recv_local_elt_start.begin().operator->(), send_local_elt_start.begin().operator->());
284 size_type new_local_n_elt = recv_local_elt_start [nproc];
288 distributor new_elt_ownership (global_n_elt, comm, new_local_n_elt);
289 new_disarray.resize (new_elt_ownership);
290 old_numbering.resize (new_elt_ownership, numeric_limits<size_type>::max());
291 new_numbering.resize (ownership(), numeric_limits<size_type>::max());
292 iter_part = partition.begin();
293 const_iterator iter_elt = begin();
294 typename disarray_rep<size_type,distributed,A2>::iterator iter_new_num_elt = new_numbering.begin();
295 for (
size_type ie = 0, ne = partition.size(); ie < ne; ie++, iter_part++, iter_elt++, iter_new_num_elt++) {
297 const T& x = *iter_elt;
298 size_type new_global_ie = new_elt_ownership[iproc] + send_local_elt_start[iproc];
299 new_disarray.dis_entry (new_global_ie) = x;
300 *iter_new_num_elt = new_global_ie;
301 size_type old_global_ie = ownership()[my_proc] + ie;
302 old_numbering.dis_entry (new_global_ie) = old_global_ie;
303 send_local_elt_start[iproc]++;
305 new_disarray.template dis_entry_assembly<typename default_set_op<T>::type>();
306 old_numbering.template dis_entry_assembly<typename default_set_op<size_type>::type>();
308 template <
class T,
class A>
311 disarray_rep<T,distributed,A>::reverse_permutation (
312 disarray_rep<size_type,distributed,A2>& inew2dis_iold)
const
314 check_macro (inew2dis_iold.dis_size() == dis_size(),
"reverse permutation[0:"<<inew2dis_iold.dis_size()
315 <<
"[ has incompatible dis_range with oriinal permutation[0:"<<dis_size()<<
"[");
316 size_type first_dis_iold = ownership().first_index();
317 for (
size_type iold = 0; iold < size(); iold++) {
318 size_type dis_iold = first_dis_iold + iold;
319 size_type dis_inew = base::operator[] (iold);
320 inew2dis_iold.dis_entry (dis_inew) = dis_iold;
322 inew2dis_iold.template dis_entry_assembly<typename default_set_op<T>::type>();
324 template <
class T,
class A>
327 disarray_rep<T,distributed,A>::permutation_apply (
328 const disarray_rep<size_type,distributed,A2>& new_numbering,
329 disarray_rep<T,distributed,A>& new_disarray)
const
332 "permutation_apply: incompatible disarray("<<size()<<
") and permutation("<<new_numbering.size()<<
") sizes");
333 check_macro (dis_size() == new_disarray.dis_size(),
334 "permutation_apply: incompatible disarray("<<dis_size()<<
") and permutation("<<new_disarray.dis_size()<<
") dis_sizes");
335 typename disarray_rep<size_type,distributed,A2>::const_iterator iter_dis_new_ie = new_numbering.begin();
336 for (const_iterator iter = begin(), last = end(); iter != last; iter++, iter_dis_new_ie++) {
338 new_disarray.dis_entry (dis_new_ie) = *iter;
340 new_disarray.template dis_entry_assembly<typename default_set_op<T>::type>();
343 template <
class T,
class A>
344 template <
class Set,
class Map>
346 disarray_rep<T,distributed,A>::append_dis_entry (
const Set& ext_idx_set, Map& ext_idx_map, std::false_type)
const
349 scatter_message<std::vector<T,A> > from;
350 scatter_message<std::vector<T,A> > to;
353 std::vector<size_type> ext_idx (ext_idx_set.size());
354 std::copy (ext_idx_set.begin(), ext_idx_set.end(), ext_idx.begin());
357 std::vector<size_type> id (ext_idx.size());
358 for (
size_type i = 0; i <
id.size(); i++)
id[i] = i;
361 distributor::tag_type tag_init = distributor::get_new_tag();
364 ext_idx.begin().operator->(),
366 id.begin().operator->(),
367 ownership().dis_size(),
368 ownership().begin().operator->(),
375 std::vector<T,A> buffer (ext_idx.size());
376 distributor::tag_type tag = distributor::get_new_tag();
378 begin().operator->(),
379 buffer.begin().operator->(),
388 begin().operator->(),
397 for (
size_type i = 0; i < buffer.size(); i++) {
398 ext_idx_map.insert (std::make_pair (ext_idx[i], buffer[i]));
401 template <
class T,
class A>
403 disarray_rep<T,distributed,A>::reset_dis_indexes()
const
405 std::set<size_type> ext_idx_set;
406 for (
typename scatter_map_type::const_iterator iter = _ext_x.begin(), last = _ext_x.end(); iter != last; iter++) {
407 ext_idx_set.insert ((*iter).first);
409 set_dis_indexes (ext_idx_set);
412 template <
class T,
class A>
413 template <
class Set,
class Map>
415 disarray_rep<T,distributed,A>::append_dis_entry (
const Set& ext_idx_set, Map& ext_idx_map, std::true_type)
const
417 typedef typename T::value_type S;
420 typedef scatter_message<std::vector<T>,
true> message_type;
426 std::vector<size_type> ext_idx (ext_idx_set.size());
427 std::copy (ext_idx_set.begin(), ext_idx_set.end(), ext_idx.begin());
430 std::vector<size_type> id (ext_idx.size());
431 for (
size_type i = 0; i <
id.size(); i++)
id[i] = i;
434 distributor::tag_type tag_init = distributor::get_new_tag();
437 ext_idx.begin().operator->(),
439 id.begin().operator->(),
440 ownership().dis_size(),
441 ownership().begin().operator->(),
448 std::vector<size_type> data_sizes (size());
450 data_sizes[i] = base::operator[](i).size();
453 std::vector<size_type> buffer (ext_idx.size());
454 distributor::tag_type tag = distributor::get_new_tag();
456 data_sizes.begin().operator->(),
457 buffer.begin().operator->(),
460 set_op<size_type,size_type>(),
466 data_sizes.begin().operator->(),
470 set_op<size_type,size_type>(),
479 std::vector<T> multi_buffer (ext_idx.size());
480 distributor::tag_type multi_tag = distributor::get_new_tag();
482 begin().operator->(),
483 multi_buffer.begin().operator->(),
492 begin().operator->(),
493 multi_buffer.begin(),
501 for (
size_type i = 0; i < multi_buffer.size(); i++) {
502 ext_idx_map.insert (std::make_pair (ext_idx[i], multi_buffer[i]));
506 template <
class T,
class A>
507 template <
class Set,
class Map>
510 disarray_rep<T,distributed,A>::append_dis_entry (
const Set& ext_idx_set, Map& ext_idx_map)
const
512 append_dis_entry (ext_idx_set, ext_idx_map, is_container());
514 template <
class T,
class A>
515 typename disarray_rep<T,distributed,A>::const_reference
516 disarray_rep<T,distributed,A>::dis_at (
const size_type dis_i)
const
518 if (dis_i >= ownership().first_index() && dis_i < ownership().last_index()) {
519 size_type i = dis_i - ownership().first_index();
520 return base::operator[](i);
522 typename scatter_map_type::const_iterator iter = _ext_x.find (dis_i);
523 check_macro (iter != _ext_x.end(),
"unexpected external index="<<dis_i);
524 return (*iter).second;
526 template <
class T,
class A>
528 disarray_rep<T,distributed,A>::get_dis_indexes (std::set<size_type>& ext_idx_set)
const
531 for (
auto x: _ext_x) {
532 ext_idx_set.insert (x.first);
538 template <
class T,
class A>
539 template <
class PutFunction>
541 disarray_rep<T,distributed,A>::put_values (odiststream& ops, PutFunction put_element)
const
543 distributor::tag_type tag = distributor::get_new_tag();
544 std::ostream& s = ops.os();
548 mpi::reduce(comm(), size(), max_size, mpi::maximum<size_type>(), 0);
550 size_type io_proc = odiststream::io_proc();
551 if (ownership().process() == io_proc) {
553 put_element (s, base::operator[](i));
557 std::vector<T,A> values (max_size);
558 for (
size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
559 if (iproc == io_proc)
continue;
560 size_type loc_sz_i = ownership().size(iproc);
561 if (loc_sz_i == 0)
continue;
562 mpi::status status = comm().recv(iproc, tag, values.begin().operator->(), max_size);
563 boost::optional<int> n_data_opt =
status.count<
T>();
567 put_element (s, values[i]);
574 comm().send(io_proc, tag, begin().operator->(), size());
579 template <
class T,
class A>
581 disarray_rep<T,distributed,A>::put_values (odiststream& ops)
const
583 return put_values (ops, _disarray_put_element_type<T>());
585 template <
class T,
class A>
587 disarray_rep<T,distributed,A>::put_matlab (odiststream& ops)
const
590 put_values (ops, _disarray_put_matlab_type<T>());
593 template <
class T,
class A>
594 template <
class PutFunction,
class A2>
596 disarray_rep<T,distributed,A>::permuted_put_values (
598 const disarray_rep<size_type,distributed,A2>& perm,
599 PutFunction put_element)
const
601 assert_macro (perm.size() == size(),
"permutation size does not match");
602 size_type io_proc = odiststream::io_proc();
604 distributor io_ownership (dis_size(), comm(), (my_proc == io_proc) ? dis_size() : 0);
605 disarray_rep<T,distributed,A> perm_x (io_ownership);
607 perm_x.dis_entry (perm[i]) = base::operator[](i);
609 perm_x.template dis_entry_assembly_begin<typename default_set_op<T>::type>();
610 perm_x.template dis_entry_assembly_end <typename default_set_op<T>::type>();
611 return perm_x.base::put_values (ops, put_element);
613 template <
class T,
class A>
614 template <
class GetFunction>
616 disarray_rep<T,distributed,A>::get_values (idiststream& ps, GetFunction get_element) {
617 distributor::tag_type tag = distributor::get_new_tag();
618 std::istream& s = ps.is();
619 size_type io_proc = odiststream::io_proc();
620 if (ownership().process() == io_proc) {
622 if (!
load_chunk (s, begin(), end(), get_element))
623 error_macro(
"read failed on input stream.");
625 if (ownership().n_process() > 1) {
629 for (
size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
630 size_max = std::max (size_max, ownership().size(iproc));
632 std::vector<T,A> data_proc_j (size_max);
633 T *start_j = data_proc_j.begin().operator->();
636 for (
size_type jproc = 0; jproc < ownership().n_process(); jproc++) {
637 if (jproc == io_proc)
continue;
639 size_type loc_sz_j = ownership().size(jproc);
640 if (loc_sz_j == 0)
continue;
641 T *last_j = start_j + loc_sz_j;
642 if (!
load_chunk (s, start_j, last_j, get_element))
643 error_macro(
"read failed on input stream.");
644 comm().send (jproc, tag, start_j, loc_sz_j);
649 comm().recv(io_proc, tag, begin().operator->(), size());
654 template <
class T,
class A>
656 disarray_rep<T,distributed,A>::get_values (idiststream& ips)
658 return get_values (ips, _disarray_get_element_type<T>());
660 template <
class T,
class A>
field::size_type size_type
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
verbose clean transpose logscale grid shrink ball stereo iso volume skipvtk deformation fastfieldload lattice reader_on_stdin color format format format format format format format format format format format format format format format format format format dump
This file is part of Rheolef.
apply_iterator< Iterator, Operator > make_apply_iterator(Iterator i, Operator op)
Size mpi_assembly_end(Message &receive, Message &send, Size receive_max_size, Container x)
Stash::size_type mpi_assembly_begin(const Stash &stash, InputIterator first_stash_idx, InputIterator last_stash_idx, const distributor &ownership, Message &receive, Message &send)
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
disarray_store< OutputRandomIterator, SetOp, Size, IsContainer > disarray_make_store(OutputRandomIterator x, SetOp op, Size, IsContainer)
void mpi_scatter_end(InputIterator x, OutputIterator y, Message &from, Message &to, SetOp op, Tag tag, Comm comm)
std::string itos(std::string::size_type i)
itos: see the rheostream page for the full documentation
bool load_chunk(std::istream &s, RandomIterator iter, RandomIterator last)
void mpi_scatter_begin(InputIterator x, OutputIterator y, Message &from, Message &to, SetOp op, Tag tag, Comm comm)