Reference documentation for deal.II version 8.1.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
work_stream.h
1 // ---------------------------------------------------------------------
2 // @f$Id: work_stream.h 31932 2013-12-08 02:15:54Z heister @f$
3 //
4 // Copyright (C) 2008 - 2013 by the deal.II authors
5 //
6 // This file is part of the deal.II library.
7 //
8 // The deal.II library is free software; you can use it, redistribute
9 // it, and/or modify it under the terms of the GNU Lesser General
10 // Public License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 // The full text of the license can be found in the file LICENSE at
13 // the top level of the deal.II distribution.
14 //
15 // ---------------------------------------------------------------------
16 
17 #ifndef __deal2__work_stream_h
18 #define __deal2__work_stream_h
19 
20 
21 #include <deal.II/base/config.h>
22 #include <deal.II/base/graph_coloring.h>
23 #include <deal.II/base/multithread_info.h>
24 #include <deal.II/base/thread_management.h>
25 #include <deal.II/base/template_constraints.h>
26 #include <deal.II/base/std_cxx1x/function.h>
27 #include <deal.II/base/std_cxx1x/bind.h>
28 #include <deal.II/base/thread_local_storage.h>
29 
30 #ifdef DEAL_II_WITH_THREADS
31 # include <deal.II/base/thread_management.h>
32 # include <tbb/pipeline.h>
33 #endif
34 
35 #include <vector>
36 #include <utility>
37 #include <memory>
38 
39 
40 DEAL_II_NAMESPACE_OPEN
41 
42 
43 
135 namespace WorkStream
136 {
137 
138 #ifdef DEAL_II_WITH_THREADS
139 
140  namespace internal
141  {
142 
143 //TODO: The following classes all use std_cxx1x::shared_ptr, but the
144 // correct pointer class would actually be std::unique_ptr. make this
145 // replacement whenever we have a class that provides these semantics
146 // and that is available also as a fall-back whenever via boost or similar
147 
159  namespace Implementation2
160  {
164  template <typename Iterator,
165  typename ScratchData,
166  typename CopyData>
167  class IteratorRangeToItemStream : public tbb::filter
168  {
169  public:
176  struct ItemType
177  {
183  {
184  std_cxx1x::shared_ptr<ScratchData> scratch_data;
185  bool currently_in_use;
186 
191  :
192  currently_in_use (false)
193  {}
194 
195  ScratchDataObject (ScratchData *p,
196  const bool in_use)
197  :
198  scratch_data (p),
199  currently_in_use (in_use)
200  {}
201 
202 //TODO: when we push back an object to the list of scratch objects, in
203 // Worker::operator(), we first create an object and then copy
204 // it to the end of this list. this involves having two objects
205 // of the current type having pointers to it, each with their own
206 // currently_in_use flag. there is probably little harm in this because
207 // the original one goes out of scope right away again, but it's
208 // certainly awkward. one way to avoid this would be to use unique_ptr
209 // but we'd need to figure out a way to use it in non-C++11 mode
211  :
212  scratch_data (o.scratch_data),
213  currently_in_use (o.currently_in_use)
214  {}
215  };
216 
217 
222  typedef std::list<ScratchDataObject> ScratchDataList;
223 
228  std::vector<Iterator> work_items;
229 
235  std::vector<CopyData> copy_datas;
236 
242  unsigned int n_items;
243 
276 
281  const ScratchData *sample_scratch_data;
282 
288 
289 
296  :
297  n_items (0),
298  scratch_data (0),
300  currently_in_use (false)
301  {}
302  };
303 
304 
310  IteratorRangeToItemStream (const Iterator &begin,
311  const Iterator &end,
312  const unsigned int buffer_size,
313  const unsigned int chunk_size,
314  const ScratchData &sample_scratch_data,
315  const CopyData &sample_copy_data)
316  :
317  tbb::filter (/*is_serial=*/true),
318  remaining_iterator_range (begin, end),
319  item_buffer (buffer_size),
320  sample_scratch_data (sample_scratch_data),
321  chunk_size (chunk_size)
322  {
323  // initialize the elements of the ring buffer
324  for (unsigned int element=0; element<item_buffer.size(); ++element)
325  {
326  Assert (item_buffer[element].n_items == 0,
327  ExcInternalError());
328 
329  item_buffer[element].work_items.resize (chunk_size,
330  remaining_iterator_range.second);
331  item_buffer[element].scratch_data = &thread_local_scratch;
332  item_buffer[element].sample_scratch_data = &sample_scratch_data;
333  item_buffer[element].copy_datas.resize (chunk_size,
334  sample_copy_data);
335  item_buffer[element].currently_in_use = false;
336  }
337  }
338 
339 
344  virtual void *operator () (void *)
345  {
346  // find first unused item. we know that there must be one
347  // because we have set the maximal number of tokens in flight
348  // and have set the ring buffer to have exactly this size. so
349  // if this function is called, we know that less than the
350  // maximal number of items in currently in flight
351  ItemType *current_item = 0;
352  for (unsigned int i=0; i<item_buffer.size(); ++i)
353  if (item_buffer[i].currently_in_use == false)
354  {
355  item_buffer[i].currently_in_use = true;
356  current_item = &item_buffer[i];
357  break;
358  }
359  Assert (current_item != 0, ExcMessage ("This can't be. There must be a free item!"));
360 
361  // initialize the next item. it may
362  // consist of at most chunk_size
363  // elements
364  current_item->n_items = 0;
365  while ((remaining_iterator_range.first !=
367  &&
368  (current_item->n_items < chunk_size))
369  {
370  current_item->work_items[current_item->n_items]
371  = remaining_iterator_range.first;
372 
373  ++remaining_iterator_range.first;
374  ++current_item->n_items;
375  }
376 
377  if (current_item->n_items == 0)
378  // there were no items
379  // left. terminate the pipeline
380  return 0;
381  else
382  return current_item;
383  }
384 
385  private:
391  std::pair<Iterator,Iterator> remaining_iterator_range;
392 
396  std::vector<ItemType> item_buffer;
397 
430 
436  const ScratchData &sample_scratch_data;
437 
450  const unsigned int chunk_size;
451 
457  void init_buffer_elements (const unsigned int element,
458  const CopyData &sample_copy_data)
459  {
460  Assert (item_buffer[element].n_items == 0,
461  ExcInternalError());
462 
463  item_buffer[element].work_items
464  .resize (chunk_size, remaining_iterator_range.second);
465  item_buffer[element].scratch_data
467  item_buffer[element].sample_scratch_data
469  item_buffer[element].copy_datas
470  .resize (chunk_size, sample_copy_data);
471  }
472  };
473 
474 
475 
483  template <typename Iterator,
484  typename ScratchData,
485  typename CopyData>
486  class Worker : public tbb::filter
487  {
488  public:
497  Worker (const std_cxx1x::function<void (const Iterator &,
498  ScratchData &,
499  CopyData &)> &worker,
500  bool copier_exist=true)
501  :
502  tbb::filter (/* is_serial= */ false),
503  worker (worker),
505  {}
506 
507 
511  void *operator () (void *item)
512  {
513  // first unpack the current item
514  typedef
516  ItemType;
517 
518  ItemType *current_item = static_cast<ItemType *> (item);
519 
520  // we need to find an unused scratch data object in the list that
521  // corresponds to the current thread and then mark it as used. if
522  // we can't find one, create one
523  //
524  // as discussed in the discussion of the documentation of the
525  // IteratorRangeToItemStream::scratch_data variable, there is no
526  // need to synchronize access to this variable using a mutex
527  // as long as we have no yield-point in between. this means that
528  // we can't take an iterator into the list now and expect it to
529  // still be valid after calling the worker, but we at least do
530  // not have to lock the following section
531  ScratchData *scratch_data = 0;
532  {
533  typename ItemType::ScratchDataList &
534  scratch_data_list = current_item->scratch_data->get();
535 
536  // see if there is an unused object. if so, grab it and mark
537  // it as used
538  for (typename ItemType::ScratchDataList::iterator
539  p = scratch_data_list.begin();
540  p != scratch_data_list.end(); ++p)
541  if (p->currently_in_use == false)
542  {
543  scratch_data = p->scratch_data.get();
544  p->currently_in_use = true;
545  break;
546  }
547 
548  // if no object was found, create one and mark it as used
549  if (scratch_data == 0)
550  {
551  scratch_data = new ScratchData(*current_item->sample_scratch_data);
552 
553  typename ItemType::ScratchDataList::value_type
554  new_scratch_object (scratch_data, true);
555  scratch_data_list.push_back (new_scratch_object);
556  }
557  }
558 
559  // then call the worker function on each element of the chunk we were
560  // given. since these worker functions are called on separate threads,
561  // nothing good can happen if they throw an exception and we are best
562  // off catching it and showing an error message
563  for (unsigned int i=0; i<current_item->n_items; ++i)
564  {
565  try
566  {
567  if (worker)
568  worker (current_item->work_items[i],
569  *scratch_data,
570  current_item->copy_datas[i]);
571  }
572  catch (const std::exception &exc)
573  {
574  Threads::internal::handle_std_exception (exc);
575  }
576  catch (...)
577  {
578  Threads::internal::handle_unknown_exception ();
579  }
580  }
581 
582  // finally mark the scratch object as unused again. as above, there
583  // is no need to lock anything here since the object we work on
584  // is thread-local
585  {
586  typename ItemType::ScratchDataList &
587  scratch_data_list = current_item->scratch_data->get();
588 
589  for (typename ItemType::ScratchDataList::iterator p =
590  scratch_data_list.begin(); p != scratch_data_list.end();
591  ++p)
592  if (p->scratch_data.get() == scratch_data)
593  {
594  Assert(p->currently_in_use == true, ExcInternalError());
595  p->currently_in_use = false;
596  }
597  }
598 
599  // if there is no copier, mark current item as usable again
600  if (copier_exist==false)
601  current_item->currently_in_use = false;
602 
603 
604  // then return the original pointer
605  // to the now modified object
606  return item;
607  }
608 
609 
610  private:
616  const std_cxx1x::function<void (const Iterator &,
617  ScratchData &,
618  CopyData &)> worker;
619 
625  };
626 
627 
628 
637  template <typename Iterator,
638  typename ScratchData,
639  typename CopyData>
640  class Copier : public tbb::filter
641  {
642  public:
654  Copier (const std_cxx1x::function<void (const CopyData &)> &copier)
655  :
656  tbb::filter (/*is_serial=*/true),
657  copier (copier)
658  {}
659 
660 
664  void *operator () (void *item)
665  {
666  // first unpack the current item
667  typedef
669  ItemType;
670 
671  ItemType *current_item = static_cast<ItemType *> (item);
672 
673  // initiate copying data. for the same reasons as in the worker class
674  // above, catch exceptions rather than letting it propagate into
675  // unknown territories
676  for (unsigned int i=0; i<current_item->n_items; ++i)
677  {
678  try
679  {
680  if (copier)
681  copier (current_item->copy_datas[i]);
682  }
683  catch (const std::exception &exc)
684  {
685  Threads::internal::handle_std_exception (exc);
686  }
687  catch (...)
688  {
689  Threads::internal::handle_unknown_exception ();
690  }
691  }
692 
693  // mark current item as useable again
694  current_item->currently_in_use = false;
695 
696 
697  // return an invalid item since we are at the end of the
698  // pipeline
699  return 0;
700  }
701 
702 
703  private:
707  const std_cxx1x::function<void (const CopyData &)> copier;
708  };
709 
710  }
711 
712 
719  namespace Implementation3
720  {
724  template <typename Iterator,
725  typename ScratchData,
726  typename CopyData>
727  class IteratorRangeToItemStream : public tbb::filter
728  {
729  public:
736  struct ItemType
737  {
743  {
744  std_cxx1x::shared_ptr<ScratchData> scratch_data;
745  std_cxx1x::shared_ptr<CopyData> copy_data;
746  bool currently_in_use;
747 
752  :
753  currently_in_use (false)
754  {}
755 
756  ScratchAndCopyDataObjects (ScratchData *p,
757  CopyData *q,
758  const bool in_use)
759  :
760  scratch_data (p),
761  copy_data (q),
762  currently_in_use (in_use)
763  {}
764 
765 //TODO: when we push back an object to the list of scratch objects, in
766 // Worker::operator(), we first create an object and then copy
767 // it to the end of this list. this involves having two objects
768 // of the current type having pointers to it, each with their own
769 // currently_in_use flag. there is probably little harm in this because
770 // the original one goes out of scope right away again, but it's
771 // certainly awkward. one way to avoid this would be to use unique_ptr
772 // but we'd need to figure out a way to use it in non-C++11 mode
774  :
775  scratch_data (o.scratch_data),
776  copy_data (o.copy_data),
777  currently_in_use (o.currently_in_use)
778  {}
779  };
780 
781 
786  typedef std::list<ScratchAndCopyDataObjects> ScratchAndCopyDataList;
787 
792  std::vector<Iterator> work_items;
793 
799  unsigned int n_items;
800 
809 
814  const ScratchData *sample_scratch_data;
815 
819  const CopyData *sample_copy_data;
820 
826 
833  :
834  n_items (0),
836  sample_copy_data (0),
837  currently_in_use (false)
838  {}
839  };
840 
841 
848  IteratorRangeToItemStream (const typename std::vector<Iterator>::const_iterator &begin,
849  const typename std::vector<Iterator>::const_iterator &end,
850  const unsigned int buffer_size,
851  const unsigned int chunk_size,
852  const ScratchData &sample_scratch_data,
853  const CopyData &sample_copy_data)
854  :
855  tbb::filter (/*is_serial=*/true),
856  remaining_iterator_range (begin, end),
857  item_buffer (buffer_size),
858  sample_scratch_data (sample_scratch_data),
859  sample_copy_data (sample_copy_data),
860  chunk_size (chunk_size)
861  {
862  Assert (begin != end, ExcMessage ("This class is not prepared to deal with empty ranges!"));
863  // initialize the elements of the item_buffer
864  for (unsigned int element=0; element<item_buffer.size(); ++element)
865  {
866  Assert (item_buffer[element].n_items == 0,
867  ExcInternalError());
868 
869  // resize the item_buffer. we have to initialize the new
870  // elements with something and because we can't rely on there
871  // being a default constructor for 'Iterator', we take the first
872  // element in the range [begin,end) pointed to.
873  item_buffer[element].work_items.resize (chunk_size,*begin);
874  item_buffer[element].scratch_and_copy_data = &thread_local_scratch_and_copy;
875  item_buffer[element].sample_scratch_data = &sample_scratch_data;
876  item_buffer[element].sample_copy_data = &sample_copy_data;
877  item_buffer[element].currently_in_use = false;
878  }
879  }
880 
881 
886  virtual void *operator () (void *)
887  {
888  // find first unused item. we know that there must be one
889  // because we have set the maximal number of tokens in flight
890  // and have set the ring buffer to have exactly this size. so
891  // if this function is called, we know that less than the
892  // maximal number of items in currently in flight
893  ItemType *current_item = 0;
894  for (unsigned int i=0; i<item_buffer.size(); ++i)
895  if (item_buffer[i].currently_in_use == false)
896  {
897  item_buffer[i].currently_in_use = true;
898  current_item = &item_buffer[i];
899  break;
900  }
901  Assert (current_item != 0, ExcMessage ("This can't be. There must be a free item!"));
902 
903 
904  // initialize the next item. it may
905  // consist of at most chunk_size
906  // elements
907  current_item->n_items = 0;
908  while ((remaining_iterator_range.first !=
910  &&
911  (current_item->n_items < chunk_size))
912  {
913  // initialize the iterators to work on with the elements
914  // of the vector that remaining_iterator_range
915  // points into
916  current_item->work_items[current_item->n_items]
917  = *remaining_iterator_range.first;
918 
919  ++remaining_iterator_range.first;
920  ++current_item->n_items;
921  }
922 
923  if (current_item->n_items == 0)
924  // there were no items
925  // left. terminate the pipeline
926  return 0;
927  else
928  return current_item;
929  }
930 
931  private:
937  std::pair<typename std::vector<Iterator>::const_iterator,typename std::vector<Iterator>::const_iterator> remaining_iterator_range;
938 
942  std::vector<ItemType> item_buffer;
943 
952 
958  const ScratchData &sample_scratch_data;
959 
965  const CopyData &sample_copy_data;
966 
979  const unsigned int chunk_size;
980 
986  void init_buffer_elements (const unsigned int element)
987  {
988  Assert (item_buffer[element].n_items == 0,
989  ExcInternalError());
990 
991  item_buffer[element].work_items
992  .resize (chunk_size, remaining_iterator_range.second);
993  item_buffer[element].scratch_and_copy_data
995  item_buffer[element].sample_scratch_data
997  item_buffer[element].sample_copy_data
998  = &sample_copy_data;
999  }
1000  };
1001 
1002 
1003 
1011  template <typename Iterator,
1012  typename ScratchData,
1013  typename CopyData>
1014  class WorkerAndCopier : public tbb::filter
1015  {
1016  public:
1025  WorkerAndCopier (const std_cxx1x::function<void (const Iterator &,
1026  ScratchData &,
1027  CopyData &)> &worker,
1028  const std_cxx1x::function<void (const CopyData &)> &copier)
1029  :
1030  tbb::filter (/* is_serial= */ false),
1031  worker (worker),
1032  copier (copier)
1033  {}
1034 
1035 
1039  void *operator () (void *item)
1040  {
1041  // first unpack the current item
1042  typedef
1044  ItemType;
1045 
1046  ItemType *current_item = static_cast<ItemType *> (item);
1047 
1048  // we need to find an unused scratch and corresponding copy
1049  // data object in the list that
1050  // corresponds to the current thread and then mark it as used. if
1051  // we can't find one, create one
1052  //
1053  // as discussed in the discussion of the documentation of the
1054  // IteratorRangeToItemStream::scratch_data variable, there is no
1055  // need to synchronize access to this variable using a mutex
1056  // as long as we have no yield-point in between. this means that
1057  // we can't take an iterator into the list now and expect it to
1058  // still be valid after calling the worker, but we at least do
1059  // not have to lock the following section
1060  ScratchData *scratch_data = 0;
1061  CopyData *copy_data = 0;
1062  {
1063  typename ItemType::ScratchAndCopyDataList &
1064  scratch_and_copy_data_list = current_item->scratch_and_copy_data->get();
1065 
1066  // see if there is an unused object. if so, grab it and mark
1067  // it as used
1068  for (typename ItemType::ScratchAndCopyDataList::iterator
1069  p = scratch_and_copy_data_list.begin();
1070  p != scratch_and_copy_data_list.end(); ++p)
1071  if (p->currently_in_use == false)
1072  {
1073  scratch_data = p->scratch_data.get();
1074  copy_data = p->copy_data.get();
1075  p->currently_in_use = true;
1076  break;
1077  }
1078 
1079  // if no element in the list was found, create one and mark it as used
1080  if (scratch_data == 0)
1081  {
1082  Assert (copy_data==0, ExcInternalError());
1083  scratch_data = new ScratchData(*current_item->sample_scratch_data);
1084  copy_data = new CopyData(*current_item->sample_copy_data);
1085 
1086  typename ItemType::ScratchAndCopyDataList::value_type
1087  new_scratch_object (scratch_data, copy_data, true);
1088  scratch_and_copy_data_list.push_back (new_scratch_object);
1089  }
1090  }
1091 
1092  // then call the worker and copier function on each element of the chunk we were
1093  // given. since these functions are called on separate threads,
1094  // nothing good can happen if they throw an exception and we are best
1095  // off catching it and showing an error message
1096  for (unsigned int i=0; i<current_item->n_items; ++i)
1097  {
1098  try
1099  {
1100  if (worker)
1101  worker (current_item->work_items[i],
1102  *scratch_data,
1103  *copy_data);
1104  if (copier)
1105  copier (*copy_data);
1106  }
1107  catch (const std::exception &exc)
1108  {
1109  Threads::internal::handle_std_exception (exc);
1110  }
1111  catch (...)
1112  {
1113  Threads::internal::handle_unknown_exception ();
1114  }
1115  }
1116 
1117  // finally mark the scratch object as unused again. as above, there
1118  // is no need to lock anything here since the object we work on
1119  // is thread-local
1120  {
1121  typename ItemType::ScratchAndCopyDataList &
1122  scratch_and_copy_data_list = current_item->scratch_and_copy_data->get();
1123 
1124  for (typename ItemType::ScratchAndCopyDataList::iterator p =
1125  scratch_and_copy_data_list.begin(); p != scratch_and_copy_data_list.end();
1126  ++p)
1127  if (p->scratch_data.get() == scratch_data)
1128  {
1129  Assert(p->currently_in_use == true, ExcInternalError());
1130  p->currently_in_use = false;
1131  }
1132  }
1133 
1134  // mark current item as usable again
1135  current_item->currently_in_use = false;
1136 
1137  // return an invalid item since we are at the end of the
1138  // pipeline
1139  return 0;
1140  }
1141 
1142 
1143  private:
1149  const std_cxx1x::function<void (const Iterator &,
1150  ScratchData &,
1151  CopyData &)> worker;
1152 
1157  const std_cxx1x::function<void (const CopyData &)> copier;
1158  };
1159  }
1160  }
1161 
1162 #endif // DEAL_II_WITH_THREADS
1163 
1164 
1165 
1200  template <typename Worker,
1201  typename Copier,
1202  typename Iterator,
1203  typename ScratchData,
1204  typename CopyData>
1205  void
1206  run (const Iterator &begin,
1207  const typename identity<Iterator>::type &end,
1208  Worker worker,
1209  Copier copier,
1210  const ScratchData &sample_scratch_data,
1211  const CopyData &sample_copy_data,
1212  const unsigned int queue_length = 2*multithread_info.n_threads(),
1213  const unsigned int chunk_size = 8)
1214  {
1215  Assert (queue_length > 0,
1216  ExcMessage ("The queue length must be at least one, and preferably "
1217  "larger than the number of processors on this system."));
1218  (void)queue_length; // removes -Wunused-parameter warning in optimized mode
1219  Assert (chunk_size > 0,
1220  ExcMessage ("The chunk_size must be at least one."));
1221  (void)chunk_size; // removes -Wunused-parameter warning in optimized mode
1222 
1223  // if no work then skip. (only use operator!= for iterators since we may
1224  // not have an equality comparison operator)
1225  if (!(begin != end))
1226  return;
1227 
1228  // we want to use TBB if we have support and if it is not disabled at
1229  // runtime:
1230 #ifdef DEAL_II_WITH_THREADS
1231  if (multithread_info.n_threads()==1)
1232 #endif
1233  {
1234  // need to copy the sample since it is marked const
1235  ScratchData scratch_data = sample_scratch_data;
1236  CopyData copy_data = sample_copy_data;
1237 
1238  for (Iterator i=begin; i!=end; ++i)
1239  {
1240  // need to check if the function is not the zero function. To
1241  // check zero-ness, create a C++ function out of it and check that
1242  if (static_cast<const std_cxx1x::function<void (const Iterator &,
1243  ScratchData &,
1244  CopyData &)>& >(worker))
1245  worker (i, scratch_data, copy_data);
1246  if (static_cast<const std_cxx1x::function<void (const CopyData &)>& >
1247  (copier))
1248  copier (copy_data);
1249  }
1250  }
1251 #ifdef DEAL_II_WITH_THREADS
1252  else // have TBB and use more than one thread
1253  {
1254  // create the three stages of the pipeline
1256  iterator_range_to_item_stream (begin, end,
1257  queue_length,
1258  chunk_size,
1259  sample_scratch_data,
1260  sample_copy_data);
1261 
1262  // Check that the copier exist
1263  if (static_cast<const std_cxx1x::function<void (const CopyData &)>& >(copier))
1264  {
1267 
1268  // now create a pipeline from these stages
1269  tbb::pipeline assembly_line;
1270  assembly_line.add_filter (iterator_range_to_item_stream);
1271  assembly_line.add_filter (worker_filter);
1272  assembly_line.add_filter (copier_filter);
1273 
1274  // and run it
1275  assembly_line.run (queue_length);
1276 
1277  assembly_line.clear ();
1278  }
1279  else
1280  {
1282 
1283  // now create a pipeline from these stages
1284  tbb::pipeline assembly_line;
1285  assembly_line.add_filter (iterator_range_to_item_stream);
1286  assembly_line.add_filter (worker_filter);
1287 
1288  // and run it
1289  assembly_line.run (queue_length);
1290 
1291  assembly_line.clear ();
1292  }
1293  }
1294 #endif
1295  }
1296 
1297 
1333  template <typename Worker,
1334  typename Copier,
1335  typename Iterator,
1336  typename ScratchData,
1337  typename CopyData>
1338  void
1339  run (const std::vector<std::vector<Iterator> > &colored_iterators,
1340  Worker worker,
1341  Copier copier,
1342  const ScratchData &sample_scratch_data,
1343  const CopyData &sample_copy_data,
1344  const unsigned int queue_length = 2*multithread_info.n_threads(),
1345  const unsigned int chunk_size = 8)
1346  {
1347  Assert (queue_length > 0,
1348  ExcMessage ("The queue length must be at least one, and preferably "
1349  "larger than the number of processors on this system."));
1350  (void)queue_length; // removes -Wunused-parameter warning in optimized mode
1351  Assert (chunk_size > 0,
1352  ExcMessage ("The chunk_size must be at least one."));
1353  (void)chunk_size; // removes -Wunused-parameter warning in optimized mode
1354 
1355  // we want to use TBB if we have support and if it is not disabled at
1356  // runtime:
1357 #ifdef DEAL_II_WITH_THREADS
1358  if (multithread_info.n_threads()==1)
1359 #endif
1360  {
1361  // need to copy the sample since it is marked const
1362  ScratchData scratch_data = sample_scratch_data;
1363  CopyData copy_data = sample_copy_data;
1364 
1365  for (unsigned int color=0; color<colored_iterators.size(); ++color)
1366  for (typename std::vector<Iterator>::const_iterator p = colored_iterators[color].begin();
1367  p != colored_iterators[color].end(); ++p)
1368  {
1369  // need to check if the function is not the zero function. To
1370  // check zero-ness, create a C++ function out of it and check that
1371  if (static_cast<const std_cxx1x::function<void (const Iterator &,
1372  ScratchData &,
1373  CopyData &)>& >(worker))
1374  worker (*p, scratch_data, copy_data);
1375  if (static_cast<const std_cxx1x::function<void (const CopyData &)>& >(copier))
1376  copier (copy_data);
1377  }
1378  }
1379 #ifdef DEAL_II_WITH_THREADS
1380  else // have TBB and use more than one thread
1381  {
1382  // loop over the various colors of what we're given
1383  for (unsigned int color=0; color<colored_iterators.size(); ++color)
1384  if (colored_iterators[color].size() > 0)
1385  {
1386  // create the three stages of the pipeline
1388  iterator_range_to_item_stream (colored_iterators[color].begin(),
1389  colored_iterators[color].end(),
1390  queue_length,
1391  chunk_size,
1392  sample_scratch_data,
1393  sample_copy_data);
1394 
1395 
1397  worker_and_copier_filter (worker, copier);
1398 
1399  // now create a pipeline from these stages
1400  tbb::pipeline assembly_line;
1401  assembly_line.add_filter (iterator_range_to_item_stream);
1402  assembly_line.add_filter (worker_and_copier_filter);
1403 
1404  // and run it
1405  assembly_line.run (queue_length);
1406 
1407  assembly_line.clear ();
1408  }
1409  }
1410 #endif
1411  }
1412 
1413 
1414 
1415 
1416 
1445  template <typename MainClass,
1446  typename Iterator,
1447  typename ScratchData,
1448  typename CopyData>
1449  void
1450  run (const Iterator &begin,
1451  const typename identity<Iterator>::type &end,
1452  MainClass &main_object,
1453  void (MainClass::*worker) (const Iterator &,
1454  ScratchData &,
1455  CopyData &),
1456  void (MainClass::*copier) (const CopyData &),
1457  const ScratchData &sample_scratch_data,
1458  const CopyData &sample_copy_data,
1459  const unsigned int queue_length = 2*multithread_info.n_threads(),
1460  const unsigned int chunk_size = 8)
1461  {
1462  // forward to the other function
1463  run (begin, end,
1464  std_cxx1x::bind (worker,
1465  std_cxx1x::ref (main_object),
1466  std_cxx1x::_1, std_cxx1x::_2, std_cxx1x::_3),
1467  std_cxx1x::bind (copier,
1468  std_cxx1x::ref (main_object),
1469  std_cxx1x::_1),
1470  sample_scratch_data,
1471  sample_copy_data,
1472  queue_length,
1473  chunk_size);
1474  }
1475 
1476 }
1477 
1478 
1479 
1480 
1481 DEAL_II_NAMESPACE_CLOSE
1482 
1483 
1484 
1485 
1486 //---------------------------- work_stream.h ---------------------------
1487 // end of #ifndef __deal2__work_stream_h
1488 #endif
1489 //---------------------------- work_stream.h ---------------------------
IteratorRangeToItemStream(const typename std::vector< Iterator >::const_iterator &begin, const typename std::vector< Iterator >::const_iterator &end, const unsigned int buffer_size, const unsigned int chunk_size, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data)
Definition: work_stream.h:848
std::pair< typename std::vector< Iterator >::const_iterator, typename std::vector< Iterator >::const_iterator > remaining_iterator_range
Definition: work_stream.h:937
::ExceptionBase & ExcMessage(std::string arg1)
void init_buffer_elements(const unsigned int element, const CopyData &sample_copy_data)
Definition: work_stream.h:457
Copier(const std_cxx1x::function< void(const CopyData &)> &copier)
Definition: work_stream.h:654
const std_cxx1x::function< void(const CopyData &)> copier
Definition: work_stream.h:707
const std_cxx1x::function< void(const Iterator &, ScratchData &, CopyData &)> worker
Definition: work_stream.h:618
Threads::ThreadLocalStorage< ScratchDataList > * scratch_data
Definition: work_stream.h:275
IteratorRangeToItemStream(const Iterator &begin, const Iterator &end, const unsigned int buffer_size, const unsigned int chunk_size, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data)
Definition: work_stream.h:310
#define Assert(cond, exc)
Definition: exceptions.h:299
Worker(const std_cxx1x::function< void(const Iterator &, ScratchData &, CopyData &)> &worker, bool copier_exist=true)
Definition: work_stream.h:497
const std_cxx1x::function< void(const Iterator &, ScratchData &, CopyData &)> worker
Definition: work_stream.h:1151
const std_cxx1x::function< void(const CopyData &)> copier
Definition: work_stream.h:1157
WorkerAndCopier(const std_cxx1x::function< void(const Iterator &, ScratchData &, CopyData &)> &worker, const std_cxx1x::function< void(const CopyData &)> &copier)
Definition: work_stream.h:1025
void run(const Iterator &begin, const typename identity< Iterator >::type &end, Worker worker, Copier copier, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data, const unsigned int queue_length=2 *multithread_info.n_threads(), const unsigned int chunk_size=8)
Definition: work_stream.h:1206
Threads::ThreadLocalStorage< typename ItemType::ScratchAndCopyDataList > thread_local_scratch_and_copy
Definition: work_stream.h:951
MultithreadInfo multithread_info
Threads::ThreadLocalStorage< typename ItemType::ScratchDataList > thread_local_scratch
Definition: work_stream.h:429
::ExceptionBase & ExcInternalError()
Threads::ThreadLocalStorage< ScratchAndCopyDataList > * scratch_and_copy_data
Definition: work_stream.h:808