c++-gtk-utils
parallel.h
Go to the documentation of this file.
1 /* Copyright (C) 2013 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 #ifndef CGU_PARALLEL_H
40 #define CGU_PARALLEL_H
41 
42 #include <utility> // for std::move, std::forward and std::pair
43 #include <memory> // for std::unique_ptr
44 #include <iterator> // for std::iterator_traits and std::distance
45 #include <exception> // for std::exception
46 #include <functional> // for std::bind
47 #include <type_traits> // for std::remove_reference and std::remove_const
48 #include <limits> // for std::numeric_limits
49 #include <algorithm> // for std::min
50 #include <tuple>
51 
52 #include <c++-gtk-utils/callback.h>
54 #include <c++-gtk-utils/mutex.h>
55 
56 namespace Cgu {
57 
58 namespace Thread {
59 
60 struct ParallelError: public std::exception {
61  virtual const char* what() const throw() {return "ParallelError\n";}
62 };
63 
64 #ifndef DOXYGEN_PARSING
65 
66 // in version 2.2.2, this was the ParallelHelper namespace. Because
67 // the meaning of the DiffType* argument of these functions has
68 // changed in version 2.2.3 (it is incremented rather than
69 // decremented), this is now the ParallelHelper2 namespace so that no
70 // ODR issues arise on library linking with old binaries
71 namespace ParallelHelper2 {
72 
73 template <class ArgType, class DiffType, class Iterator>
74 void for_each_cb_func(const Cgu::Callback::SafeFunctorArg<ArgType&>& s_task,
75  Iterator iter,
76  Mutex* m, Cond* cond,
77  DiffType* done_count) {
78  s_task(*iter);
79  Mutex::Lock l{*m};
80  ++*done_count;
81  cond->signal();
82 }
83 
84 template <class FType, class ArgType, class DestType>
85 void transform1_func(const FType& func,
86  ArgType& arg,
87  DestType& res) {
88  res = func(arg);
89 }
90 
91 
92 template <class ArgType, class DestType, class DiffType, class SourceIterator>
93 void transform1_cb_func(const Cgu::Callback::SafeFunctorArg<ArgType&, DestType&>& s_task,
94  SourceIterator source_iter,
95  Mutex* m, Cond* cond,
96  DiffType* done_count,
97  DestType* results,
98  DiffType index) {
99  DestType res;
100  s_task(*source_iter, res);
101  Mutex::Lock l{*m};
102  // move to 'results' within the mutex because g++ <= 4.7 does not
103  // correctly implement the C++11 memory model on some 64 bit
104  // platforms (this is a slight pessimization for gcc >= 4.8)
105  results[index] = std::move(res);
106  ++*done_count;
107  cond->signal();
108 }
109 
110 template <class FType, class Arg1Type,
111  class Arg2Type, class DestType>
112 void transform2_func(const FType& func,
113  Arg1Type& arg1,
114  Arg2Type& arg2,
115  DestType& res) {
116  res = func(arg1, arg2);
117 }
118 
119 
120 template <class Arg1Type, class Arg2Type, class DestType,
121  class DiffType, class SourceIterator1, class SourceIterator2>
122 void transform2_cb_func(const Cgu::Callback::SafeFunctorArg<Arg1Type&, Arg2Type&, DestType&>& s_task,
123  SourceIterator1 source_iter1,
124  SourceIterator2 source_iter2,
125  Mutex* m, Cond* cond,
126  DiffType* done_count,
127  DestType* results,
128  DiffType index) {
129  DestType res;
130  s_task(*source_iter1, *source_iter2, res);
131  Mutex::Lock l{*m};
132  // move to 'results' within the mutex because g++ <= 4.7 does not
133  // correctly implement the C++11 memory model on some 64 bit
134  // platforms (this is a slight pessimization for gcc >= 4.8)
135  results[index] = std::move(res);
136  ++*done_count;
137  cond->signal();
138 }
139 
140 template <class DiffType>
141 void fail_func(Mutex* m, Cond* cond,
142  bool* error, DiffType* done_count) noexcept {
143  Mutex::Lock l{*m};
144  ++*done_count;
145  *error = true;
146  cond->signal();
147 }
148 
149 } // namespace ParallelHelper2
150 
151 #endif // DOXYGEN_PARSING
152 
153 /**
154  * \#include <c++-gtk-utils/parallel.h>
155  *
156  * This function applies a callable object to each element of a
157  * container in the range ['first', 'last'), by executing each such
158  * application as a task of a Thread::TaskManager object. Tasks are
159  * added to the Thread::TaskManager object in the order in which the
160  * respective elements appear in the container (and if a task mutates
161  * its argument, it will do so in respect of the correct element of
162  * the container), but no other ordering arises, and the tasks will
163  * execute in parallel to the extent that the Thread::TaskManager
164  * object has sufficient threads available to do so.
165  *
166  * Apart from that, and that this function returns void, it does the
167  * same as std::for_each(). It can mutate container elements if the
168  * callable object takes its argument by non-const reference. It will
169  * not return until the callable object has been applied to all of the
170  * elements in the range ['first', 'last').
171  *
172  * This function can be called by a task running on the same
173  * TaskManager object.
174  *
175  * @param tm The Thread::TaskManager object on which the tasks will
176  * run.
177  * @param first The beginning of the range to which 'func' is to be
178  * applied.
179  * @param last One past the last element to which 'func' is to be
180  * applied.
181  * @param func A callable object to be applied to each element in the
182  * range ['first', 'last'), such as formed by a lambda expression or
183  * the result of std::bind. It should take a single unbound argument
184  * of the value type of the container to which 'first' and 'last'
185  * relate or a const or non-const reference to that type. Any return
186  * value is discarded. If an exception propagates from 'func', the
187  * exception will be consumed while the for each loop is running, and
188  * an attempt will still be made to apply 'func' to all remaining
189  * elements in the range ['first', 'last'), and only after that
190  * attempt has completed will the exception Cgu::Thread::ParallelError
191  * be thrown.
192  * @exception std::bad_alloc This exception will be thrown if memory
193  * is exhausted and the system throws in that case. (On systems with
194  * over-commit/lazy-commit combined with virtual memory (swap), it is
195  * rarely useful to check for memory exhaustion). If this exception
196  * is thrown, some tasks may nonetheless have already started by
197  * virtue of the call to this function, but subsequent ones will not.
198  * @exception Cgu::Thread::TaskError This exception will be thrown if
199  * stop_all() has previously been called on the Thread::TaskManager
200  * object, or if another thread calls stop_all() after this method is
201  * called but before it has returned. It will also be thrown if the
202  * Thread::TaskManager object's is_error() method would return true
203  * because its internal thread pool loop implementation has thrown
204  * std::bad_alloc, or a thread has failed to start correctly because
205  * pthread has run out of resources. (On systems with
206  * over-commit/lazy-commit combined with virtual memory (swap), it is
207  * rarely useful to check for memory exhaustion, and if a reasonable
208  * maximum thread count has been chosen for the Thread::TaskManager
209  * object pthread should not run out of other resources, but there may
210  * be some specialized cases where the return value of is_error() is
211  * useful.) If this exception is thrown, some tasks may nonetheless
212  * have already started by virtue of the call to this function.
213  * @exception Cgu::Thread::ParallelError This exception will be thrown
214  * if an exception propagates from the 'func' callable object when it
215  * executes on being applied to one or more elements of the container.
216  * Such an exception will not stop an attempt being made to apply
217  * 'func' (successfully or unsuccessfully) to all elements in the
218  * range ['first', 'last'). Cgu::Thread::ParallelError will be thrown
219  * after such attempted application has finished.
220  * @exception Cgu::Thread::MutexError This exception will be thrown if
221  * initialization of a mutex used by this function fails. (It is
222  * often not worth checking for this, as it means either memory is
223  * exhausted or pthread has run out of other resources to create new
224  * mutexes.) If this exception is thrown, no tasks will start.
225  * @exception Cgu::Thread::CondError This exception will be thrown if
226  * initialization of a condition variable used by this function fails.
227  * (It is often not worth checking for this, as it means either memory
228  * is exhausted or pthread has run out of other resources to create
229  * new condition variables.) If this exception is thrown, no tasks
230  * will start.
231  * @note An exception might also be thrown if the copy or move
232  * constructor of the 'func' callable objects throws. If such an
233  * exception is thrown, no tasks will start.
234  *
235  * Since 2.0.19/2.2.2
236  */
237 template <class Iterator, class Func>
239  Iterator first,
240  Iterator last,
241  Func&& func) {
242 
243  typedef typename std::iterator_traits<Iterator>::value_type ArgType;
244  typedef typename std::iterator_traits<Iterator>::difference_type DiffType;
245 
246  Mutex mutex;
247  Cond cond;
248  DiffType start_count = 0;
249  DiffType done_count = 0;
250  bool error = false;
251 
252  // construct SafeFunctorArg objects so that they can be shared
253  // between different tasks
255  Cgu::Callback::lambda<ArgType&>(std::forward<Func>(func))
256  };
258  Cgu::Callback::make(&ParallelHelper2::fail_func<DiffType>,
259  &mutex,
260  &cond,
261  &error,
262  &done_count)
263  };
264 
265  for (; first != last; ++first, ++start_count) {
266  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
267  Cgu::Callback::make_ref(&ParallelHelper2::for_each_cb_func<ArgType, DiffType, Iterator>,
268  s_task,
269  first,
270  &mutex,
271  &cond,
272  &done_count)
273  );
274  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
275  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
276  );
277 
278  tm.add_task(std::move(task_cb), std::move(fail_cb));
279  }
280 
281  Mutex::Lock l{mutex};
282  while (start_count > done_count) cond.wait(mutex);
283  if (error) throw ParallelError();
284 }
285 
286 /**
287  * \#include <c++-gtk-utils/parallel.h>
288  *
289  * This function maps over a container in the range ['first', 'last'),
290  * applying a unary callable object to each element of the container
291  * in that range and storing the result in the destination range, by
292  * executing each such application as a task of a Thread::TaskManager
293  * object. Tasks are added to the Thread::TaskManager object in the
294  * order in which the respective elements appear in the source
295  * container, and the final result appears in the destination
296  * container in the same order as the source range from which it is
297  * generated (including if a back_inserter iterator is used), but no
298  * other ordering arises, and the tasks will execute in parallel to
299  * the extent that the Thread::TaskManager object has sufficient
300  * threads available to do so.
301  *
302  * Apart from that, this function does the same as the version of
303  * std::transform() taking a unary function, except that it returns
304  * void (see Thread::parallel_transform_partial() for a function which
305  * returns a destination iterator and an iterator to the source
306  * range). It will not return until the callable object has been
307  * applied to all of the elements in the range ['first', 'last').
308  *
309  * This function can be called by a task running on the same
310  * TaskManager object (a result can therefore be delivered
311  * asynchronously by calling this function in a task created by the
312  * make_task_when(), make_task_compose() or make_task_when_full()
313  * Thread::TaskManager methods). A task can carry out a map-reduce
314  * operation by passing the result of calling this function to
315  * std::accumulate() to perform a fold-left or fold-right on that
316  * result.
317  *
318  * A separate overload of this function takes a binary callable
319  * object.
320  *
321  * Here is a trivial example of a map-reduce operation which maps over
322  * a vector by multiplying each element by 2 in separate tasks, and
323  * then folds-left using std::accumulate() (std::accumulate() can fold
324  * using any callable object, but in this example the default of
325  * addition is used):
326  *
327  * @code
328  * using namespace Cgu;
329  * std::vector<int> v{1, 2, 3, 4, 5};
330  * Thread::TaskManager tm;
331  *
332  * Thread::parallel_transform(tm,
333  * v.begin(),
334  * v.end(),
335  * v.begin(),
336  * [] (int elt) {return elt * 2;});
337  * // res will be equal to 30
338  * int res = std::accumulate(v.begin(), v.end(), 0);
339  * @endcode
340  *
341  * @param tm The Thread::TaskManager object on which the tasks will
342  * run.
343  * @param first The beginning of the range to which 'func' is to be
344  * applied.
345  * @param last One past the last element to which 'func' is to be
346  * applied.
347  * @param dest The beginning of the range to which the result of
348  * applying 'func' to the elements in the range ['first', 'last') is
349  * to be stored. As in the case of std::transform, this can overlap
350  * with or be the same as the source range. It may also be an insert
351  * iterator.
352  * @param func A unary callable object to be applied to each element
353  * in the range ['first', 'last'), such as formed by a lambda
354  * expression or the result of std::bind. It should take a single
355  * unbound argument of the value type of the container to which
356  * 'first' and 'last' relate or a const or non-const reference to that
357  * type. If an exception propagates from 'func', the exception will
358  * be consumed while the transform loop is running, and an attempt
359  * will still be made to apply 'func' to all remaining elements in the
360  * range ['first', 'last'), and only after that attempt has completed
361  * will the exception Cgu::Thread::ParallelError be thrown.
362  * @exception std::bad_alloc This exception will be thrown if memory
363  * is exhausted and the system throws in that case. (On systems with
364  * over-commit/lazy-commit combined with virtual memory (swap), it is
365  * rarely useful to check for memory exhaustion). If this exception
366  * is thrown, some tasks may nonetheless have already started by
367  * virtue of the call to this function, but subsequent ones will not.
368  * @exception Cgu::Thread::TaskError This exception will be thrown if
369  * stop_all() has previously been called on the Thread::TaskManager
370  * object, or if another thread calls stop_all() after this method is
371  * called but before it has returned. It will also be thrown if the
372  * Thread::TaskManager object's is_error() method would return true
373  * because its internal thread pool loop implementation has thrown
374  * std::bad_alloc, or a thread has failed to start correctly because
375  * pthread has run out of resources. (On systems with
376  * over-commit/lazy-commit combined with virtual memory (swap), it is
377  * rarely useful to check for memory exhaustion, and if a reasonable
378  * maximum thread count has been chosen for the Thread::TaskManager
379  * object pthread should not run out of other resources, but there may
380  * be some specialized cases where the return value of is_error() is
381  * useful.) If this exception is thrown, some tasks may nonetheless
382  * have already started by virtue of the call to this function.
383  * @exception Cgu::Thread::ParallelError This exception will be thrown
384  * if an exception propagates from the 'func' callable object when it
385  * executes on being applied to one or more elements of the source
386  * container. Such an exception will not stop an attempt being made
387  * to apply 'func' (successfully or unsuccessfully) to all elements in
388  * the range ['first', 'last'). Cgu::Thread::ParallelError will be
389  * thrown after such attempted application has finished.
390  * @exception Cgu::Thread::MutexError This exception will be thrown if
391  * initialization of a mutex used by this function fails. (It is
392  * often not worth checking for this, as it means either memory is
393  * exhausted or pthread has run out of other resources to create new
394  * mutexes.) If this exception is thrown, no tasks will start.
395  * @exception Cgu::Thread::CondError This exception will be thrown if
396  * initialization of a condition variable used by this function fails.
397  * (It is often not worth checking for this, as it means either memory
398  * is exhausted or pthread has run out of other resources to create
399  * new condition variables.) If this exception is thrown, no tasks
400  * will start.
401  * @note An exception might also be thrown if the copy or move
402  * constructor of the 'func' callable objects throws. If such an
403  * exception is thrown, no tasks will start.
404  *
405  * Since 2.0.19/2.2.2
406  */
407 template <class SourceIterator, class DestIterator, class Func>
409  SourceIterator first,
410  SourceIterator last,
411  DestIterator dest,
412  Func&& func) {
413 
414  if (first == last) return;
415 
416  typedef typename std::iterator_traits<SourceIterator>::value_type ArgType;
417  typedef typename std::iterator_traits<SourceIterator>::difference_type DiffType;
418  typedef typename std::remove_const<typename std::remove_reference<Func>::type>::type FType;
419  typedef decltype(func(*first)) DestType;
420 
421  Mutex mutex;
422  Cond cond;
423  DiffType start_count = 0;
424  DiffType done_count = 0;
425  bool error = false;
426 
427  // intermediate results have to be held in an array so destination
428  // ordering can be enforced when using insert interators. This
429  // causes some inefficiency for non-random access iterators
430  std::unique_ptr<DestType[]> results(new DestType[std::distance(first, last)]);
431 
432  // construct SafeFunctorArg objects so that they can be shared
433  // between different tasks
435  Cgu::Callback::make_ref(&ParallelHelper2::transform1_func<FType, ArgType, DestType>,
436  std::forward<Func>(func))
437  };
439  Cgu::Callback::make(&ParallelHelper2::fail_func<DiffType>,
440  &mutex,
441  &cond,
442  &error,
443  &done_count)
444  };
445 
446  for (; first != last; ++first, ++start_count) {
447  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
448  Cgu::Callback::lambda<>(std::bind(&ParallelHelper2::transform1_cb_func<ArgType, DestType, DiffType, SourceIterator>,
449  s_task,
450  first,
451  &mutex,
452  &cond,
453  &done_count,
454  results.get(),
455  start_count))
456  );
457  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
458  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
459  );
460 
461  tm.add_task(std::move(task_cb), std::move(fail_cb));
462  }
463 
464  Mutex::Lock l{mutex};
465  while (start_count > done_count) cond.wait(mutex);
466  if (error) throw ParallelError();
467  for (DiffType index = 0; index < start_count; ++dest, ++index) {
468  *dest = std::move(results[index]);
469  }
470 }
471 
472 /**
473  * \#include <c++-gtk-utils/parallel.h>
474  *
475  * This function maps over two containers, one in the range ['first1',
476  * 'last1') and the other beginning at 'first2', applying a binary
477  * callable object to each element of the containers in those ranges
478  * and storing the result in the destination range, by executing each
479  * such application as a task of a Thread::TaskManager object. Tasks
480  * are added to the Thread::TaskManager object in the order in which
481  * the respective elements appear in the source containers, and the
482  * final result appears in the destination container in the same order
483  * as the source ranges from which it is generated (including if a
484  * back_inserter iterator is used), but no other ordering arises, and
485  * the tasks will execute in parallel to the extent that the
486  * Thread::TaskManager object has sufficient threads available to do
487  * so.
488  *
489  * Apart from that, this function does the same as the version of
490  * std::transform() taking a binary function, except that it returns
491  * void (see Thread::parallel_transform_partial() for a function which
492  * returns a destination iterator and iterators to the source ranges).
493  * It will not return until the callable object has been applied to
494  * all of the elements in the source ranges.
495  *
496  * This function can be called by a task running on the same
497  * TaskManager object (a result can therefore be delivered
498  * asynchronously by calling this function in a task created by the
499  * make_task_when(), make_task_compose() or make_task_when_full()
500  * Thread::TaskManager methods). A task can carry out a map-reduce
501  * operation by passing the result of calling this function to
502  * std::accumulate() to perform a fold-left or fold-right on that
503  * result.
504  *
505  * A separate overload of this function takes a unary callable object.
506  *
507  * Here is a trivial example of a map-reduce operation which maps over
508  * two vectors by adding respective elements of the vectors in
509  * separate tasks, and then folds-left using std::accumulate()
510  * (std::accumulate() can fold using any callable object, but in this
511  * example the default of addition is used):
512  *
513  * @code
514  * using namespace Cgu;
515  * std::vector<int> v1{2, 4, 6, 8, 10};
516  * std::vector<int> v2{10, 20, 30, 40, 50};
517  * std::vector<int> v3;
518  * Thread::TaskManager tm;
519  *
520  * Thread::parallel_transform(tm,
521  * v1.begin(),
522  * v1.end(),
523  * v2.begin(),
524  * std::back_inserter(v3),
525  * std::plus<int>());
526  * // res will be equal to 180
527  * int res = std::accumulate(v3.begin(), v3.end(), 0);
528  * @endcode
529  *
530  * @param tm The Thread::TaskManager object on which the tasks will
531  * run.
532  * @param first1 The beginning of the range which is to be passed as
533  * the first argument of 'func'.
534  * @param last1 One past the last element of the range which is to be
535  * passed as the first argument of 'func'.
536  * @param first2 The beginning of the range which is to be passed as
537  * the second argument of 'func'.
538  * @param dest The beginning of the range to which the result of
539  * applying 'func' to the elements in the source ranges is to be
540  * stored. As in the case of std::transform, this can overlap with or
541  * be the same as one of the source ranges. It may also be an insert
542  * iterator.
543  * @param func A binary callable object to be applied to each element
544  * in the source ranges, such as formed by a lambda expression or the
545  * result of std::bind. It should take two unbound arguments of the
546  * value types of the containers to which 'first1' and 'first2' relate
547  * or const or non-const references to those types. If an exception
548  * propagates from 'func', the exception will be consumed while the
549  * transform loop is running, and an attempt will still be made to
550  * apply 'func' to all remaining elements of the source ranges, and
551  * only after that attempt has completed will the exception
552  * Cgu::Thread::ParallelError be thrown.
553  * @exception std::bad_alloc This exception will be thrown if memory
554  * is exhausted and the system throws in that case. (On systems with
555  * over-commit/lazy-commit combined with virtual memory (swap), it is
556  * rarely useful to check for memory exhaustion). If this exception
557  * is thrown, some tasks may nonetheless have already started by
558  * virtue of the call to this function, but subsequent ones will not.
559  * @exception Cgu::Thread::TaskError This exception will be thrown if
560  * stop_all() has previously been called on the Thread::TaskManager
561  * object, or if another thread calls stop_all() after this method is
562  * called but before it has returned. It will also be thrown if the
563  * Thread::TaskManager object's is_error() method would return true
564  * because its internal thread pool loop implementation has thrown
565  * std::bad_alloc, or a thread has failed to start correctly because
566  * pthread has run out of resources. (On systems with
567  * over-commit/lazy-commit combined with virtual memory (swap), it is
568  * rarely useful to check for memory exhaustion, and if a reasonable
569  * maximum thread count has been chosen for the Thread::TaskManager
570  * object pthread should not run out of other resources, but there may
571  * be some specialized cases where the return value of is_error() is
572  * useful.) If this exception is thrown, some tasks may nonetheless
573  * have already started by virtue of the call to this function.
574  * @exception Cgu::Thread::ParallelError This exception will be thrown
575  * if an exception propagates from the 'func' callable object when it
576  * executes on being applied to one or more elements of the source
577  * ranges. Such an exception will not stop an attempt being made to
578  * apply 'func' (successfully or unsuccessfully) to all elements in
579  * the source ranges. Cgu::Thread::ParallelError will be thrown after
580  * such attempted application has finished.
581  * @exception Cgu::Thread::MutexError This exception will be thrown if
582  * initialization of a mutex used by this function fails. (It is
583  * often not worth checking for this, as it means either memory is
584  * exhausted or pthread has run out of other resources to create new
585  * mutexes.) If this exception is thrown, no tasks will start.
586  * @exception Cgu::Thread::CondError This exception will be thrown if
587  * initialization of a condition variable used by this function fails.
588  * (It is often not worth checking for this, as it means either memory
589  * is exhausted or pthread has run out of other resources to create
590  * new condition variables.) If this exception is thrown, no tasks
591  * will start.
592  * @note An exception might also be thrown if the copy or move
593  * constructor of the 'func' callable objects throws. If such an
594  * exception is thrown, no tasks will start.
595  *
596  * Since 2.0.19/2.2.2
597  */
598 template <class SourceIterator1, class SourceIterator2, class DestIterator, class Func>
600  SourceIterator1 first1,
601  SourceIterator1 last1,
602  SourceIterator2 first2,
603  DestIterator dest,
604  Func&& func) {
605 
606  if (first1 == last1) return;
607 
608  typedef typename std::iterator_traits<SourceIterator1>::value_type Arg1Type;
609  typedef typename std::iterator_traits<SourceIterator1>::difference_type DiffType;
610  typedef typename std::iterator_traits<SourceIterator2>::value_type Arg2Type;
611  typedef typename std::remove_const<typename std::remove_reference<Func>::type>::type FType;
612  typedef decltype(func(*first1, *first2)) DestType;
613 
614  Mutex mutex;
615  Cond cond;
616  DiffType start_count = 0;
617  DiffType done_count = 0;
618  bool error = false;
619 
620  // intermediate results have to be held in an array so destination
621  // ordering can be enforced when using insert interators. This
622  // causes some inefficiency for non-random access iterators
623  std::unique_ptr<DestType[]> results(new DestType[std::distance(first1, last1)]);
624 
625  // construct SafeFunctorArg objects so that they can be shared
626  // between different tasks
628  Cgu::Callback::make_ref(&ParallelHelper2::transform2_func<FType, Arg1Type, Arg2Type, DestType>,
629  std::forward<Func>(func))
630  };
632  Cgu::Callback::make(&ParallelHelper2::fail_func<DiffType>,
633  &mutex,
634  &cond,
635  &error,
636  &done_count)
637  };
638 
639  for (; first1 != last1; ++first1, ++first2, ++start_count) {
640  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
641  Cgu::Callback::lambda<>(std::bind(&ParallelHelper2::transform2_cb_func<Arg1Type, Arg2Type, DestType, DiffType, SourceIterator1, SourceIterator2>,
642  s_task,
643  first1,
644  first2,
645  &mutex,
646  &cond,
647  &done_count,
648  results.get(),
649  start_count))
650  );
651  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
652  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
653  );
654 
655  tm.add_task(std::move(task_cb), std::move(fail_cb));
656  }
657 
658  Mutex::Lock l{mutex};
659  while (start_count > done_count) cond.wait(mutex);
660  if (error) throw ParallelError();
661  for (DiffType index = 0; index < start_count; ++dest, ++index) {
662  *dest = std::move(results[index]);
663  }
664 }
665 
666 /**
667  * \#include <c++-gtk-utils/parallel.h>
668  *
669  * This function applies a callable object to each element of a
670  * container in the range ['first', 'last') subject to a maximum, by
671  * executing each such application as a task of a Thread::TaskManager
672  * object. Tasks are added to the Thread::TaskManager object in the
673  * order in which the respective elements appear in the container (and
674  * if a task mutates its argument, it will do so in respect of the
675  * correct element of the container), but no other ordering arises,
676  * and the tasks will execute in parallel to the extent that the
677  * Thread::TaskManager object has sufficient threads available to do
678  * so.
679  *
680  * This function does the same as Thread::parallel_for_each(), except
681  * that it returns an iterator to the element past the last element to
682  * which the callable object has been applied, and it has a 'max'
683  * parameter to limit the maximum number of elements to which the
684  * callable object will be applied on any one call to this function
685  * even if the range ['first', 'last') is greater than this maximum.
686  * Whether this limitation has had effect on any one call can be
687  * tested by checking whether the return value is equal to the 'last'
688  * parameter. If it is not, a further call to this function can be
689  * made.
690  *
691  * The main purpose of this additional function is to enable the
692  * application of the callable object to the elements of a container
693  * to be dealt with in chunks, possibly to enable other tasks to be
694  * interleaved at reasonable intervals. For a container which does
695  * not support random access iterators, the 'last' parameter can be
696  * set to, say, the end of the container and the chunk size set with
697  * the 'max' paramater, with the return value being used as the
698  * 'first' parameter for subsequent calls to this function. This
699  * avoids having to increment the iterator for each "chunk" by
700  * stepping through the container by hand. In this usage, it
701  * therefore represents a minor efficiency improvement.
702  *
703  * @param tm The Thread::TaskManager object on which the tasks will
704  * run.
705  * @param first The beginning of the range to which 'func' is to be
706  * applied.
707  * @param last One past the last element to which 'func' is to be
708  * applied, subject to any maximum specified as the 'max' parameter.
709  * @param max The maximum number of elements of the source container
710  * to which 'func' will be applied on this call. It is not an error
711  * if it is greater than the distance between 'first' and 'last'. If
712  * it is equal to or greater than that distance, it follows that the
713  * iterator returned will be equal to 'last'. The same results if
714  * 'max' is a negative number (in that case no maximum will take
715  * effect and each element in the range ['first', 'last') will have
716  * 'func' applied to it).
717  * @param func A callable object to be applied (subject to the
718  * maximum) to each element in the range ['first', 'last'), such as
719  * formed by a lambda expression or the result of std::bind. It
720  * should take a single unbound argument of the value type of the
721  * container to which 'first' and 'last' relate or a const or
722  * non-const reference to that type. Any return value is discarded.
723  * If an exception propagates from 'func', the exception will be
724  * consumed while the for each loop is running, and an attempt will
725  * still be made to apply 'func' to all remaining elements in the
726  * range ['first', 'last') subject to the maximum, and only after that
727  * attempt has completed will the exception Cgu::Thread::ParallelError
728  * be thrown.
729  * @return An iterator representing the element past the last element
730  * of the range to which 'func' was applied, which may be passed as
731  * the 'first' argument of a subsequent call to this function.
732  * @exception std::bad_alloc This exception will be thrown if memory
733  * is exhausted and the system throws in that case. (On systems with
734  * over-commit/lazy-commit combined with virtual memory (swap), it is
735  * rarely useful to check for memory exhaustion). If this exception
736  * is thrown, some tasks may nonetheless have already started by
737  * virtue of the call to this function, but subsequent ones will not.
738  * @exception Cgu::Thread::TaskError This exception will be thrown if
739  * stop_all() has previously been called on the Thread::TaskManager
740  * object, or if another thread calls stop_all() after this method is
741  * called but before it has returned. It will also be thrown if the
742  * Thread::TaskManager object's is_error() method would return true
743  * because its internal thread pool loop implementation has thrown
744  * std::bad_alloc, or a thread has failed to start correctly because
745  * pthread has run out of resources. (On systems with
746  * over-commit/lazy-commit combined with virtual memory (swap), it is
747  * rarely useful to check for memory exhaustion, and if a reasonable
748  * maximum thread count has been chosen for the Thread::TaskManager
749  * object pthread should not run out of other resources, but there may
750  * be some specialized cases where the return value of is_error() is
751  * useful.) If this exception is thrown, some tasks may nonetheless
752  * have already started by virtue of the call to this function.
753  * @exception Cgu::Thread::ParallelError This exception will be thrown
754  * if an exception propagates from the 'func' callable object when it
755  * executes on being applied to one or more elements of the container.
756  * Such an exception will not stop an attempt being made to apply
757  * 'func' (successfully or unsuccessfully) to all elements in the
758  * range ['first', 'last') subject to the maximum.
759  * Cgu::Thread::ParallelError will be thrown after such attempted
760  * application has finished.
761  * @exception Cgu::Thread::MutexError This exception will be thrown if
762  * initialization of a mutex used by this function fails. (It is
763  * often not worth checking for this, as it means either memory is
764  * exhausted or pthread has run out of other resources to create new
765  * mutexes.) If this exception is thrown, no tasks will start.
766  * @exception Cgu::Thread::CondError This exception will be thrown if
767  * initialization of a condition variable used by this function fails.
768  * (It is often not worth checking for this, as it means either memory
769  * is exhausted or pthread has run out of other resources to create
770  * new condition variables.) If this exception is thrown, no tasks
771  * will start.
772  * @note An exception might also be thrown if the copy or move
773  * constructor of the 'func' callable objects throws. If such an
774  * exception is thrown, no tasks will start.
775  *
776  * Since 2.0.20/2.2.3
777  */
778 template <class Iterator, class Func>
780  Iterator first,
781  Iterator last,
782  int max,
783  Func&& func) {
784 
785  if (first == last || !max) return first;
786 
787  typedef typename std::iterator_traits<Iterator>::value_type ArgType;
788  typedef typename std::iterator_traits<Iterator>::difference_type DiffType;
789 
790  Mutex mutex;
791  Cond cond;
792  DiffType start_count = 0;
793  DiffType done_count = 0;
794  bool error = false;
795 
796  // a specialization of std::numeric_limits::max() for all arithmetic
797  // types is required by §3.9.1/8 of the standard. The iterator
798  // difference type must be a signed integer type (§24.2.1/1). All
799  // signed integer types are arithmetic types (§3.9.1/2, §3.9.1/7 and
800  // §3.9.1/8).
801  const DiffType local_max =
802  (max >= 0) ? max : std::numeric_limits<DiffType>::max();
803 
804  // construct SafeFunctorArg objects so that they can be shared
805  // between different tasks
807  Cgu::Callback::lambda<ArgType&>(std::forward<Func>(func))
808  };
810  Cgu::Callback::make(&ParallelHelper2::fail_func<DiffType>,
811  &mutex,
812  &cond,
813  &error,
814  &done_count)
815  };
816 
817  for (; first != last && start_count < local_max; ++first, ++start_count) {
818  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
819  Cgu::Callback::make_ref(&ParallelHelper2::for_each_cb_func<ArgType, DiffType, Iterator>,
820  s_task,
821  first,
822  &mutex,
823  &cond,
824  &done_count)
825  );
826  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
827  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
828  );
829 
830  tm.add_task(std::move(task_cb), std::move(fail_cb));
831  }
832 
833  Mutex::Lock l{mutex};
834  while (start_count > done_count) cond.wait(mutex);
835  if (error) throw ParallelError();
836  return first;
837 }
838 
839 /**
840  * \#include <c++-gtk-utils/parallel.h>
841  *
842  * This function maps over a container in the range ['first', 'last')
843  * subject to a maximum, applying a unary callable object to each
844  * element of the container in that range (subject to the maximum) and
845  * storing the result in the destination range, by executing each such
846  * application as a task of a Thread::TaskManager object. Tasks are
847  * added to the Thread::TaskManager object in the order in which the
848  * respective elements appear in the source container, and the final
849  * result appears in the destination container in the same order as
850  * the source range from which it is generated (including if a
851  * back_inserter iterator is used), but no other ordering arises, and
852  * the tasks will execute in parallel to the extent that the
853  * Thread::TaskManager object has sufficient threads available to do
854  * so.
855  *
856  * A separate overload of this function takes a binary callable
857  * object.
858  *
859  * This function does the same as the version of
860  * Thread::parallel_transform() taking a unary callable object, except
861  * that it returns a std::pair object containing an iterator to the
862  * element past the last element of the source range transformed, and
863  * a destination iterator to the element past the last element stored
864  * in the destination range, and it has a 'max' parameter to limit the
865  * maximum number of elements which will be so transformed on any one
866  * call to this function. Whether this limitation has had effect on
867  * any one call can be tested by checking whether the first value of
868  * the pair returned is equal to the 'last' parameter. If it is not,
869  * a further call to this function can be made.
870  *
871  * The main purpose of this additional function is to enable the
872  * parallel transform of the elements of a container to be dealt with
873  * in chunks, possibly to enable other tasks to be interleaved at
874  * reasonable intervals. For source or destination containers which
875  * do not support random access iterators, the 'last' parameter can be
876  * set to, say, the end of the container and the chunk size set with
877  * the 'max' paramater, with the values of the returned pair being
878  * used as the 'first' and 'dest' parameters for subsequent calls to
879  * this function. This avoids having to increment the source and
880  * destination iterators for each "chunk" by stepping through the
881  * respective containers by hand. In this usage, it therefore
882  * represents a minor efficiency improvement.
883  *
884  * @param tm The Thread::TaskManager object on which the tasks will
885  * run.
886  * @param first The beginning of the range to which 'func' is to be
887  * applied.
888  * @param last One past the last element to which 'func' is to be
889  * applied, subject to any maximum specified as the 'max' parameter.
890  * @param dest The beginning of the range to which the result of
891  * applying 'func' to the elements in the source range is to be
892  * stored. As in the case of std::transform, this can overlap with or
893  * be the same as the source range. It may also be an insert
894  * iterator.
895  * @param max The maximum number of elements of the source container
896  * which will be transformed on this call. It is not an error if it
897  * is greater than the distance between 'first' and 'last'. If it is
898  * equal to or greater than that distance, it follows that the first
899  * value of the pair returned by this function will be equal to
900  * 'last'. The same results if 'max' is a negative number (in that
901  * case no maximum will take effect and all elements in the range
902  * ['first', 'last') will be transformed), so passing -1 might be
903  * useful as a means of obtaining a destination iterator (the second
904  * value) for other purposes, particularly where it is not a random
905  * access iterator).
906  * @param func A unary callable object to be applied (subject to the
907  * maximum) to each element in the range ['first', 'last'), such as
908  * formed by a lambda expression or the result of std::bind. It
909  * should take a single unbound argument of the value type of the
910  * container to which 'first' and 'last' relate or a const or
911  * non-const reference to that type. If an exception propagates from
912  * 'func', the exception will be consumed while the transform loop is
913  * running, and an attempt will still be made to apply 'func' to all
914  * remaining elements in the range ['first', 'last') subject to the
915  * maximum, and only after that attempt has completed will the
916  * exception Cgu::Thread::ParallelError be thrown.
917  * @return A std::pair object. Its first member is an iterator
918  * representing the element past the last element of the source range
919  * transformed, which may be passed as the 'first' argument of a
920  * subsequent call to this function; and its second member is an
921  * iterator representing the element past the last element stored in
922  * the destination range, which may be passed as the 'dest' argument
923  * of the subsequent call.
924  * @exception std::bad_alloc This exception will be thrown if memory
925  * is exhausted and the system throws in that case. (On systems with
926  * over-commit/lazy-commit combined with virtual memory (swap), it is
927  * rarely useful to check for memory exhaustion). If this exception
928  * is thrown, some tasks may nonetheless have already started by
929  * virtue of the call to this function, but subsequent ones will not.
930  * @exception Cgu::Thread::TaskError This exception will be thrown if
931  * stop_all() has previously been called on the Thread::TaskManager
932  * object, or if another thread calls stop_all() after this method is
933  * called but before it has returned. It will also be thrown if the
934  * Thread::TaskManager object's is_error() method would return true
935  * because its internal thread pool loop implementation has thrown
936  * std::bad_alloc, or a thread has failed to start correctly because
937  * pthread has run out of resources. (On systems with
938  * over-commit/lazy-commit combined with virtual memory (swap), it is
939  * rarely useful to check for memory exhaustion, and if a reasonable
940  * maximum thread count has been chosen for the Thread::TaskManager
941  * object pthread should not run out of other resources, but there may
942  * be some specialized cases where the return value of is_error() is
943  * useful.) If this exception is thrown, some tasks may nonetheless
944  * have already started by virtue of the call to this function.
945  * @exception Cgu::Thread::ParallelError This exception will be thrown
946  * if an exception propagates from the 'func' callable object when it
947  * executes on being applied to one or more elements of the source
948  * container. Such an exception will not stop an attempt being made
949  * to apply 'func' (successfully or unsuccessfully) to all elements in
950  * the range ['first', 'last') subject to the maximum.
951  * Cgu::Thread::ParallelError will be thrown after such attempted
952  * application has finished.
953  * @exception Cgu::Thread::MutexError This exception will be thrown if
954  * initialization of a mutex used by this function fails. (It is
955  * often not worth checking for this, as it means either memory is
956  * exhausted or pthread has run out of other resources to create new
957  * mutexes.) If this exception is thrown, no tasks will start.
958  * @exception Cgu::Thread::CondError This exception will be thrown if
959  * initialization of a condition variable used by this function fails.
960  * (It is often not worth checking for this, as it means either memory
961  * is exhausted or pthread has run out of other resources to create
962  * new condition variables.) If this exception is thrown, no tasks
963  * will start.
964  * @note An exception might also be thrown if the copy or move
965  * constructor of the 'func' callable objects throws. If such an
966  * exception is thrown, no tasks will start.
967  *
968  * Since 2.0.20/2.2.3
969  */
970 template <class SourceIterator, class DestIterator, class Func>
971 std::pair<SourceIterator, DestIterator>
973  SourceIterator first,
974  SourceIterator last,
975  DestIterator dest,
976  int max,
977  Func&& func) {
978 
979  if (first == last || !max) return {first, dest};
980 
981  typedef typename std::iterator_traits<SourceIterator>::value_type ArgType;
982  typedef typename std::iterator_traits<SourceIterator>::difference_type DiffType;
983  typedef typename std::remove_const<typename std::remove_reference<Func>::type>::type FType;
984  typedef decltype(func(*first)) DestType;
985 
986  Mutex mutex;
987  Cond cond;
988  DiffType start_count = 0;
989  DiffType done_count = 0;
990  bool error = false;
991 
992  // a specialization of std::numeric_limits::max() for all arithmetic
993  // types is required by §3.9.1/8 of the standard. The iterator
994  // difference type must be a signed integer type (§24.2.1/1). All
995  // signed integer types are arithmetic types (§3.9.1/2, §3.9.1/7 and
996  // §3.9.1/8).
997  const DiffType local_max =
998  (max >= 0) ? max : std::numeric_limits<DiffType>::max();
999 
1000  // intermediate results have to be held in an array so destination
1001  // ordering can be enforced when using insert interators. This
1002  // causes some inefficiency for non-random access iterators
1003  std::unique_ptr<DestType[]> results(new DestType[std::min(local_max,
1004  std::distance(first, last))]);
1005 
1006  // construct SafeFunctorArg objects so that they can be shared
1007  // between different tasks
1009  Cgu::Callback::make_ref(&ParallelHelper2::transform1_func<FType, ArgType, DestType>,
1010  std::forward<Func>(func))
1011  };
1013  Cgu::Callback::make(&ParallelHelper2::fail_func<DiffType>,
1014  &mutex,
1015  &cond,
1016  &error,
1017  &done_count)
1018  };
1019 
1020  for (; first != last && start_count < local_max; ++first, ++start_count) {
1021  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
1022  Cgu::Callback::lambda<>(std::bind(&ParallelHelper2::transform1_cb_func<ArgType, DestType, DiffType, SourceIterator>,
1023  s_task,
1024  first,
1025  &mutex,
1026  &cond,
1027  &done_count,
1028  results.get(),
1029  start_count))
1030  );
1031  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
1032  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
1033  );
1034 
1035  tm.add_task(std::move(task_cb), std::move(fail_cb));
1036  }
1037 
1038  Mutex::Lock l{mutex};
1039  while (start_count > done_count) cond.wait(mutex);
1040  if (error) throw ParallelError();
1041  for (DiffType index = 0; index < start_count; ++dest, ++index) {
1042  *dest = std::move(results[index]);
1043  }
1044  return {first, dest};
1045 }
1046 
1047 /**
1048  * \#include <c++-gtk-utils/parallel.h>
1049  *
1050  * This function maps over two containers, one in the range ['first1',
1051  * 'last1') subject to a maximum, and the other beginning at 'first2',
1052  * applying a binary callable object to each element of the containers
1053  * in those ranges (subject to the maximum) and storing the result in
1054  * the destination range, by executing each such application as a task
1055  * of a Thread::TaskManager object. Tasks are added to the
1056  * Thread::TaskManager object in the order in which the respective
1057  * elements appear in the source containers, and the final result
1058  * appears in the destination container in the same order as the
1059  * source ranges from which it is generated (including if a
1060  * back_inserter iterator is used), but no other ordering arises, and
1061  * the tasks will execute in parallel to the extent that the
1062  * Thread::TaskManager object has sufficient threads available to do
1063  * so.
1064  *
1065  * A separate overload of this function takes a unary callable object.
1066  *
1067  * This function does the same as the version of
1068  * Thread::parallel_transform() taking a binary callable object,
1069  * except that it returns a std::tuple object containing iterators to
1070  * the element past the last elements of the source ranges
1071  * transformed, and a destination iterator to the element past the
1072  * last element stored in the destination range, and it has a 'max'
1073  * parameter to limit the maximum number of elements which will be so
1074  * transformed on any one call to this function. Whether this
1075  * limitation has had effect on any one call can be tested by checking
1076  * whether the first value of the tuple returned is equal to the
1077  * 'last' parameter. If it is not, a further call to this function
1078  * can be made.
1079  *
1080  * The main purpose of this additional function is to enable the
1081  * parallel transform of the elements of a container to be dealt with
1082  * in chunks, possibly to enable other tasks to be interleaved at
1083  * reasonable intervals. For source or destination containers which
1084  * do not support random access iterators, the 'last' parameter can be
1085  * set to, say, the end of the container and the chunk size set with
1086  * the 'max' paramater, with the values of the returned tuple being
1087  * used as the 'first1', 'first2' and 'dest' parameters for subsequent
1088  * calls to this function. This avoids having to increment the source
1089  * and destination iterators for each "chunk" by stepping through the
1090  * respective containers by hand. In this usage, it therefore
1091  * represents a minor efficiency improvement.
1092  *
1093  * @param tm The Thread::TaskManager object on which the tasks will
1094  * run.
1095  * @param first1 The beginning of the range which is to be passed as
1096  * the first argument of 'func'.
1097  * @param last1 One past the last element of the range which is to be
1098  * passed as the first argument of 'func', subject to any maximum
1099  * specified as the 'max' parameter.
1100  * @param first2 The beginning of the range which is to be passed as
1101  * the second argument of 'func'.
1102  * @param dest The beginning of the range to which the result of
1103  * applying 'func' to the elements in the source ranges is to be
1104  * stored. As in the case of std::transform, this can overlap with or
1105  * be the same as one of the source ranges. It may also be an insert
1106  * iterator.
1107  * @param max The maximum number of elements of the source containers
1108  * which will be transformed on this call. It is not an error if it
1109  * is greater than the distance between 'first1' and 'last1'. If it
1110  * is equal to or greater than that distance, it follows that the
1111  * first value of the tuple returned by this function will be equal to
1112  * 'last1'. The same results if 'max' is a negative number (in that
1113  * case no maximum will take effect and all elements in the range
1114  * ['first1', 'last1') will be transformed), so passing -1 might be
1115  * useful as a means of obtaining a second source iterator (the second
1116  * value of the tuple) or destination iterator (the third value) for
1117  * other purposes, particularly where they are not random access
1118  * iterators.
1119  * @param func A binary callable object to be applied (subject to the
1120  * maximum) to each element in the source ranges, such as formed by a
1121  * lambda expression or the result of std::bind. It should take two
1122  * unbound arguments of the value types of the containers to which
1123  * 'first1' and 'first2' relate or const or non-const references to
1124  * those types. If an exception propagates from 'func', the exception
1125  * will be consumed while the transform loop is running, and an
1126  * attempt will still be made to apply 'func' to all remaining
1127  * elements of the source ranges subject to the maximum, and only
1128  * after that attempt has completed will the exception
1129  * Cgu::Thread::ParallelError be thrown.
1130  * @return A std::tuple object. Its first value is an iterator
1131  * representing the element past the last element of the first source
1132  * range transformed, which may be passed as the 'first1' argument of
1133  * a subsequent call to this function; its second value is an iterator
1134  * representing the element past the last element of the second source
1135  * range transformed, which may be passed as the 'first2' argument of
1136  * the subsequent call; and its third value is an iterator
1137  * representing the element past the last element stored in the
1138  * destination range, which may be passed as the 'dest' argument of
1139  * the subsequent call.
1140  * @exception std::bad_alloc This exception will be thrown if memory
1141  * is exhausted and the system throws in that case. (On systems with
1142  * over-commit/lazy-commit combined with virtual memory (swap), it is
1143  * rarely useful to check for memory exhaustion). If this exception
1144  * is thrown, some tasks may nonetheless have already started by
1145  * virtue of the call to this function, but subsequent ones will not.
1146  * @exception Cgu::Thread::TaskError This exception will be thrown if
1147  * stop_all() has previously been called on the Thread::TaskManager
1148  * object, or if another thread calls stop_all() after this method is
1149  * called but before it has returned. It will also be thrown if the
1150  * Thread::TaskManager object's is_error() method would return true
1151  * because its internal thread pool loop implementation has thrown
1152  * std::bad_alloc, or a thread has failed to start correctly because
1153  * pthread has run out of resources. (On systems with
1154  * over-commit/lazy-commit combined with virtual memory (swap), it is
1155  * rarely useful to check for memory exhaustion, and if a reasonable
1156  * maximum thread count has been chosen for the Thread::TaskManager
1157  * object pthread should not run out of other resources, but there may
1158  * be some specialized cases where the return value of is_error() is
1159  * useful.) If this exception is thrown, some tasks may nonetheless
1160  * have already started by virtue of the call to this function.
1161  * @exception Cgu::Thread::ParallelError This exception will be thrown
1162  * if an exception propagates from the 'func' callable object when it
1163  * executes on being applied to one or more elements of the source
1164  * ranges. Such an exception will not stop an attempt being made to
1165  * apply 'func' (successfully or unsuccessfully) to all elements in
1166  * the source ranges subject to the maximum.
1167  * Cgu::Thread::ParallelError will be thrown after such attempted
1168  * application has finished.
1169  * @exception Cgu::Thread::MutexError This exception will be thrown if
1170  * initialization of a mutex used by this function fails. (It is
1171  * often not worth checking for this, as it means either memory is
1172  * exhausted or pthread has run out of other resources to create new
1173  * mutexes.) If this exception is thrown, no tasks will start.
1174  * @exception Cgu::Thread::CondError This exception will be thrown if
1175  * initialization of a condition variable used by this function fails.
1176  * (It is often not worth checking for this, as it means either memory
1177  * is exhausted or pthread has run out of other resources to create
1178  * new condition variables.) If this exception is thrown, no tasks
1179  * will start.
1180  * @note An exception might also be thrown if the copy or move
1181  * constructor of the 'func' callable objects throws. If such an
1182  * exception is thrown, no tasks will start.
1183  *
1184  * Since 2.0.20/2.2.3
1185  */
1186 template <class SourceIterator1, class SourceIterator2, class DestIterator, class Func>
1187 std::tuple<SourceIterator1, SourceIterator2, DestIterator>
1189  SourceIterator1 first1,
1190  SourceIterator1 last1,
1191  SourceIterator2 first2,
1192  DestIterator dest,
1193  int max,
1194  Func&& func) {
1195 
1196  if (first1 == last1 || !max) return std::make_tuple(first1, first2, dest);
1197 
1198  typedef typename std::iterator_traits<SourceIterator1>::value_type Arg1Type;
1199  typedef typename std::iterator_traits<SourceIterator1>::difference_type DiffType;
1200  typedef typename std::iterator_traits<SourceIterator2>::value_type Arg2Type;
1201  typedef typename std::remove_const<typename std::remove_reference<Func>::type>::type FType;
1202  typedef decltype(func(*first1, *first2)) DestType;
1203 
1204  Mutex mutex;
1205  Cond cond;
1206  DiffType start_count = 0;
1207  DiffType done_count = 0;
1208  bool error = false;
1209 
1210  // a specialization of std::numeric_limits::max() for all arithmetic
1211  // types is required by §3.9.1/8 of the standard. The iterator
1212  // difference type must be a signed integer type (§24.2.1/1). All
1213  // signed integer types are arithmetic types (§3.9.1/2, §3.9.1/7 and
1214  // §3.9.1/8).
1215  const DiffType local_max =
1216  (max >= 0) ? max : std::numeric_limits<DiffType>::max();
1217 
1218  // intermediate results have to be held in an array so destination
1219  // ordering can be enforced when using insert interators. This
1220  // causes some inefficiency for non-random access iterators
1221  std::unique_ptr<DestType[]> results(new DestType[std::min(local_max,
1222  std::distance(first1, last1))]);
1223 
1224  // construct SafeFunctorArg objects so that they can be shared
1225  // between different tasks
1227  Cgu::Callback::make_ref(&ParallelHelper2::transform2_func<FType, Arg1Type, Arg2Type, DestType>,
1228  std::forward<Func>(func))
1229  };
1231  Cgu::Callback::make(&ParallelHelper2::fail_func<DiffType>,
1232  &mutex,
1233  &cond,
1234  &error,
1235  &done_count)
1236  };
1237 
1238  for (; first1 != last1 && start_count < local_max; ++first1, ++first2, ++start_count) {
1239  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
1240  Cgu::Callback::lambda<>(std::bind(&ParallelHelper2::transform2_cb_func<Arg1Type, Arg2Type, DestType, DiffType, SourceIterator1, SourceIterator2>,
1241  s_task,
1242  first1,
1243  first2,
1244  &mutex,
1245  &cond,
1246  &done_count,
1247  results.get(),
1248  start_count))
1249  );
1250  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
1251  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
1252  );
1253 
1254  tm.add_task(std::move(task_cb), std::move(fail_cb));
1255  }
1256 
1257  Mutex::Lock l{mutex};
1258  while (start_count > done_count) cond.wait(mutex);
1259  if (error) throw ParallelError();
1260  for (DiffType index = 0; index < start_count; ++dest, ++index) {
1261  *dest = std::move(results[index]);
1262  }
1263  return std::make_tuple(first1, first2, dest);
1264 }
1265 
1266 } // namespace Thread
1267 
1268 } // namespace Cgu
1269 
1270 #endif // CGU_PARALLEL_H