c++-gtk-utils
parallel.h
Go to the documentation of this file.
1 /* Copyright (C) 2013 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 #ifndef CGU_PARALLEL_H
40 #define CGU_PARALLEL_H
41 
42 #include <utility> // for std::move and std::forward
43 #include <memory> // for std::unique_ptr
44 #include <iterator> // for std::iterator_traits and std::distance
45 #include <exception> // for std::exception
46 #include <functional> // for std::bind
47 #include <type_traits> // for std::remove_reference and std::remove_const
48 
49 #include <c++-gtk-utils/callback.h>
51 #include <c++-gtk-utils/mutex.h>
53 
54 namespace Cgu {
55 
56 namespace Thread {
57 
58 struct ParallelError: public std::exception {
59  virtual const char* what() const throw() {return "ParallelError\n";}
60 };
61 
62 #ifndef DOXYGEN_PARSING
63 
64 namespace ParallelHelper {
65 
66 template <class ArgType, class DiffType, class Iterator>
67 void for_each_cb_func(const Cgu::Callback::SafeFunctorArg<ArgType&>& s_task,
68  Iterator iter,
69  Mutex* m, Cond* cond,
70  DiffType* remaining) {
71  s_task(*iter);
72  Mutex::Lock l{*m};
73  --*remaining;
74  cond->signal();
75 }
76 
77 template <class FType, class ArgType, class DestType>
78 void transform1_func(const FType& func,
79  ArgType& arg,
80  DestType& res) {
81  res = func(arg);
82 }
83 
84 
85 template <class ArgType, class DestType, class DiffType, class SourceIterator>
86 void transform1_cb_func(const Cgu::Callback::SafeFunctorArg<ArgType&, DestType&>& s_task,
87  SourceIterator iter,
88  Mutex* m, Cond* cond,
89  DestType* results,
90  DiffType index,
91  DiffType* remaining) {
92  DestType res;
93  s_task(*iter, res);
94  Mutex::Lock l{*m};
95  // move to 'results' within the mutex because g++ <= 4.7 does not
96  // correctly implement the C++11 memory model on some 64 bit
97  // platforms (this is a slight pessimization for gcc >= 4.8)
98  results[index] = std::move(res);
99  --*remaining;
100  cond->signal();
101 }
102 
103 template <class FType, class Arg1Type,
104  class Arg2Type, class DestType>
105 void transform2_func(const FType& func,
106  Arg1Type& arg1,
107  Arg2Type& arg2,
108  DestType& res) {
109  res = func(arg1, arg2);
110 }
111 
112 
113 template <class Arg1Type, class Arg2Type, class DestType,
114  class DiffType, class SourceIterator1, class SourceIterator2>
115 void transform2_cb_func(const Cgu::Callback::SafeFunctorArg<Arg1Type&, Arg2Type&, DestType&>& s_task,
116  SourceIterator1 iter1,
117  SourceIterator2 iter2,
118  Mutex* m, Cond* cond,
119  DestType* results,
120  DiffType index,
121  DiffType* remaining) {
122  DestType res;
123  s_task(*iter1, *iter2, res);
124  Mutex::Lock l{*m};
125  // move to 'results' within the mutex because g++ <= 4.7 does not
126  // correctly implement the C++11 memory model on some 64 bit
127  // platforms (this is a slight pessimization for gcc >= 4.8)
128  results[index] = std::move(res);
129  --*remaining;
130  cond->signal();
131 }
132 
133 template <class DiffType>
134 void fail_func(Mutex* m, Cond* cond,
135  bool* error, DiffType* remaining) {
136  Mutex::Lock l{*m};
137  --*remaining;
138  *error = true;
139  cond->signal();
140 }
141 
142 } // namespace ParallelHelper
143 
144 #endif // DOXYGEN_PARSING
145 
146 /**
147  * \#include <c++-gtk-utils/parallel.h>
148  *
149  * This function applies a callable object to each element of a
150  * container in the range ['first', 'last'), by executing each such
151  * application as a task of a Thread::TaskManager object. Tasks are
152  * added to the Thread::TaskManager object in the order in which the
153  * respective elements appear in the container, but no other ordering
154  * arises, and they will execute in parallel to the extent that the
155  * Thread::TaskManager object has sufficient threads available to do
156  * so.
157  *
158  * Apart from that, and that this function returns void, it does the
159  * same as std::for_each(). It can mutate container elements if the
160  * callable object takes its argument by non-const reference. It will
161  * not return until the callable object has been applied to all of the
162  * elements in the range ['first', 'last').
163  *
164  * This function can be called by a task running on the same
165  * TaskManager object.
166  *
167  * @param tm The Thread::TaskManager object on which the tasks will
168  * run.
169  * @param first The beginning of the range to which 'func' is to be
170  * applied.
171  * @param last One past the last element to which 'func' is to be
172  * applied.
173  * @param func A callable object to be applied to each element in the
174  * range ['first', 'last'), such as formed by a lambda expression or
175  * the result of std::bind. It should take a single unbound argument
176  * of the value type of the container to which 'first' and 'last'
177  * relate or a const or non-const reference to that type. Any return
178  * value is discarded. If an exception propagates from 'func', the
179  * exception will be consumed while the for each loop is running, and
180  * an attempt will still be made to apply 'func' to all remaining
181  * elements in the range ['first', 'last'), and only after that
182  * attempt has completed will the exception Cgu::Thread::ParallelError
183  * be thrown.
184  * @exception std::bad_alloc This exception will be thrown if memory
185  * is exhausted and the system throws in that case. (On systems with
186  * over-commit/lazy-commit combined with virtual memory (swap), it is
187  * rarely useful to check for memory exhaustion). If this exception
188  * is thrown, some tasks may nonetheless have already started by
189  * virtue of the call to this function, but subsequent ones will not.
190  * @exception Cgu::Thread::TaskError This exception will be thrown if
191  * stop_all() has previously been called on the Thread::TaskManager
192  * object, or if another thread calls stop_all() after this method is
193  * called but before it has returned. It will also be thrown if the
194  * Thread::TaskManager object's is_error() method would return true
195  * because its internal thread pool loop implementation has thrown
196  * std::bad_alloc, or a thread has failed to start correctly. (On
197  * systems with over-commit/lazy-commit combined with virtual memory
198  * (swap), it is rarely useful to check for memory exhaustion, but
199  * there may be some specialized cases where the return value of
200  * is_error() is useful.) If this exception is thrown, some tasks may
201  * nonetheless have already started by virtue of the call to this
202  * function.
203  * @exception Cgu::Thread::ParallelError This exception will be thrown
204  * if an exception propagates from the 'func' callable object when it
205  * executes on being applied to one or more elements of the container.
206  * Such an exception will not stop an attempt being made to apply
207  * 'func' (successfully or unsuccessfully) to all elements in the
208  * range ['first', 'last'). Cgu::Thread::ParallelError will be thrown
209  * after such attempted application has finished.
210  * @exception Cgu::Thread::MutexError This exception will be thrown if
211  * initialization of a mutex used by this function fails. (It is
212  * often not worth checking for this, as it means either memory is
213  * exhausted or pthread has run out of other resources to create new
214  * mutexes.) If this exception is thrown, no tasks will start.
215  * @exception Cgu::Thread::CondError This exception will be thrown if
216  * initialization of a condition variable used by this function fails.
217  * (It is often not worth checking for this, as it means either memory
218  * is exhausted or pthread has run out of other resources to create
219  * new condition variables.) If this exception is thrown, no tasks
220  * will start.
221  * @note An exception might also be thrown if the copy or move
222  * constructor of the 'func' callable objects throws. If such an
223  * exception is thrown, no tasks will start.
224  *
225  * Since 2.0.19/2.2.2
226  */
227 template <class Iterator, class Func>
229  Iterator first,
230  Iterator last,
231  Func&& func) {
232 
233  typedef typename std::iterator_traits<Iterator>::value_type ArgType;
234  typedef typename std::iterator_traits<Iterator>::difference_type DiffType;
235 
236  DiffType remaining = std::distance(first, last);
237 
238  Mutex mutex;
239  Cond cond;
240  bool error = false;
241 
242  // construct SafeFunctorArg objects so that they can be shared
243  // between different tasks
245  Cgu::Callback::lambda<ArgType&>(std::forward<Func>(func))
246  };
248  Cgu::Callback::make(&ParallelHelper::fail_func<DiffType>,
249  &mutex,
250  &cond,
251  &error,
252  &remaining)
253  };
254 
255  for (; first != last; ++first) {
256  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
257  Cgu::Callback::make_ref(&ParallelHelper::for_each_cb_func<ArgType, DiffType, Iterator>,
258  s_task,
259  first,
260  &mutex,
261  &cond,
262  &remaining)
263  );
264  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
265  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
266  );
267 
268  tm.add_task(std::move(task_cb), std::move(fail_cb));
269  }
270 
271  Mutex::Lock l{mutex};
272  while (remaining > 0) cond.wait(mutex);
273  if (error) throw ParallelError();
274 }
275 
276 /**
277  * \#include <c++-gtk-utils/parallel.h>
278  *
279  * This function maps over a container in the range ['first', 'last'),
280  * applying a unary callable object to each element of the container
281  * in that range and storing the result in the destination range, by
282  * executing each such application as a task of a Thread::TaskManager
283  * object. Tasks are added to the Thread::TaskManager object in the
284  * order in which the respective elements appear in the source
285  * container, but no other ordering arises, and they will execute in
286  * parallel to the extent that the Thread::TaskManager object has
287  * sufficient threads available to do so.
288  *
289  * Apart from that, this function does the same as the version of
290  * std::transform() taking a unary function. It will not return until
291  * the callable object has been applied to all of the elements in the
292  * range ['first', 'last').
293  *
294  * This function can be called by a task running on the same
295  * TaskManager object (a result can therefore be delivered
296  * asynchronously by calling this function in a task created by the
297  * make_task_when(), make_task_compose() or make_task_when_full()
298  * Thread::TaskManager methods). A task can carry out a map-reduce
299  * operation by passing the result of calling this function to
300  * std::accumulate() to perform a fold-left or fold-right on that
301  * result.
302  *
303  * A separate overload of this function takes a binary callable
304  * object.
305  *
306  * Here is a trivial example of a map-reduce operation which maps over
307  * a vector by multiplying each element by 2 in separate tasks, and
308  * then folds-left using std::accumulate() (std::accumulate() can fold
309  * using any callable object, but in this example the default of
310  * addition is used):
311  *
312  * @code
313  * using namespace Cgu;
314  * std::vector<int> v{1, 2, 3, 4, 5};
315  * Thread::TaskManager tm;
316  *
317  * Thread::parallel_transform(tm,
318  * v.begin(),
319  * v.end(),
320  * v.begin(),
321  * [] (int elt) {return elt * 2;});
322  * // res will be equal to 30
323  * int res = std::accumulate(v.begin(), v.end(), 0);
324  * @endcode
325  *
326  * @param tm The Thread::TaskManager object on which the tasks will
327  * run.
328  * @param first The beginning of the range to which 'func' is to be
329  * applied.
330  * @param last One past the last element to which 'func' is to be
331  * applied.
332  * @param dest The beginning of the range to which the result of
333  * applying 'func' to the elements in the range ['first', 'last') is
334  * to be stored. As in the case of std::transform, this can overlap
335  * with or be the same as the source range. It may also be an
336  * inserter iterator.
337  * @param func A callable object to be applied to each element in the
338  * range ['first', 'last'), such as formed by a lambda expression or
339  * the result of std::bind. It should take a single unbound argument
340  * of the value type of the container to which 'first' and 'last'
341  * relate or a const or non-const reference to that type. If an
342  * exception propagates from 'func', the exception will be consumed
343  * while the transform loop is running, and an attempt will still be
344  * made to apply 'func' to all remaining elements in the range
345  * ['first', 'last'), and only after that attempt has completed will
346  * the exception Cgu::Thread::ParallelError be thrown.
347  * @exception std::bad_alloc This exception will be thrown if memory
348  * is exhausted and the system throws in that case. (On systems with
349  * over-commit/lazy-commit combined with virtual memory (swap), it is
350  * rarely useful to check for memory exhaustion). If this exception
351  * is thrown, some tasks may nonetheless have already started by
352  * virtue of the call to this function, but subsequent ones will not.
353  * @exception Cgu::Thread::TaskError This exception will be thrown if
354  * stop_all() has previously been called on the Thread::TaskManager
355  * object, or if another thread calls stop_all() after this method is
356  * called but before it has returned. It will also be thrown if the
357  * Thread::TaskManager object's is_error() method would return true
358  * because its internal thread pool loop implementation has thrown
359  * std::bad_alloc, or a thread has failed to start correctly. (On
360  * systems with over-commit/lazy-commit combined with virtual memory
361  * (swap), it is rarely useful to check for memory exhaustion, but
362  * there may be some specialized cases where the return value of
363  * is_error() is useful.) If this exception is thrown, some tasks may
364  * nonetheless have already started by virtue of the call to this
365  * function.
366  * @exception Cgu::Thread::ParallelError This exception will be thrown
367  * if an exception propagates from the 'func' callable object when it
368  * executes on being applied to one or more elements of the source
369  * container. Such an exception will not stop an attempt being made
370  * to apply 'func' (successfully or unsuccessfully) to all elements in
371  * the range ['first', 'last'). Cgu::Thread::ParallelError will be
372  * thrown after such attempted application has finished.
373  * @exception Cgu::Thread::MutexError This exception will be thrown if
374  * initialization of a mutex used by this function fails. (It is
375  * often not worth checking for this, as it means either memory is
376  * exhausted or pthread has run out of other resources to create new
377  * mutexes.) If this exception is thrown, no tasks will start.
378  * @exception Cgu::Thread::CondError This exception will be thrown if
379  * initialization of a condition variable used by this function fails.
380  * (It is often not worth checking for this, as it means either memory
381  * is exhausted or pthread has run out of other resources to create
382  * new condition variables.) If this exception is thrown, no tasks
383  * will start.
384  * @note An exception might also be thrown if the copy or move
385  * constructor of the 'func' callable objects throws. If such an
386  * exception is thrown, no tasks will start.
387  *
388  * Since 2.0.19/2.2.2
389  */
390 template <class SourceIterator, class DestIterator, class Func>
392  SourceIterator first,
393  SourceIterator last,
394  DestIterator dest,
395  Func&& func) {
396 
397  if (first == last) return;
398 
399  typedef typename std::iterator_traits<SourceIterator>::value_type ArgType;
400  typedef typename std::iterator_traits<SourceIterator>::difference_type DiffType;
401  typedef typename std::remove_const<typename std::remove_reference<Func>::type>::type FType;
402  typedef decltype(func(*first)) DestType;
403 
404  DiffType elts = std::distance(first, last);
405  DiffType remaining = elts;
406  ScopedHandle<DestType*> results{new DestType[elts]};
407 
408  Mutex mutex;
409  Cond cond;
410  bool error = false;
411 
412  // construct SafeFunctorArg objects so that they can be shared
413  // between different tasks
415  Cgu::Callback::make_ref(&ParallelHelper::transform1_func<FType, ArgType, DestType>,
416  std::forward<Func>(func))
417  };
419  Cgu::Callback::make(&ParallelHelper::fail_func<DiffType>,
420  &mutex,
421  &cond,
422  &error,
423  &remaining)
424  };
425 
426  for (DiffType index = 0; first != last; ++first, ++index) {
427  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
428  Cgu::Callback::lambda<>(std::bind(&ParallelHelper::transform1_cb_func<ArgType, DestType, DiffType, SourceIterator>,
429  s_task,
430  first,
431  &mutex,
432  &cond,
433  results.get(),
434  index,
435  &remaining))
436  );
437  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
438  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
439  );
440 
441  tm.add_task(std::move(task_cb), std::move(fail_cb));
442  }
443 
444  Mutex::Lock l{mutex};
445  while (remaining > 0) cond.wait(mutex);
446  if (error) throw ParallelError();
447  for (DiffType index = 0; index < elts; ++dest, ++index) {
448  *dest = std::move(results[index]);
449  }
450 }
451 
452 /**
453  * \#include <c++-gtk-utils/parallel.h>
454  *
455  * This function maps over two containers, one in the range ['first1',
456  * 'last1') and the other beginning at 'first2', applying a binary
457  * callable object to each element of the containers in those ranges
458  * and storing the result in the destination range, by executing each
459  * such application as a task of a Thread::TaskManager object. Tasks
460  * are added to the Thread::TaskManager object in the order in which
461  * the respective elements appear in the source containers, but no
462  * other ordering arises, and they will execute in parallel to the
463  * extent that the Thread::TaskManager object has sufficient threads
464  * available to do so.
465  *
466  * Apart from that, this function does the same as the version of
467  * std::transform() taking a binary function. It will not return
468  * until the callable object has been applied to all of the elements
469  * in the source ranges.
470  *
471  * This function can be called by a task running on the same
472  * TaskManager object (a result can therefore be delivered
473  * asynchronously by calling this function in a task created by the
474  * make_task_when(), make_task_compose() or make_task_when_full()
475  * Thread::TaskManager methods). A task can carry out a map-reduce
476  * operation by passing the result of calling this function to
477  * std::accumulate() to perform a fold-left or fold-right on that
478  * result.
479  *
480  * A separate overload of this function takes a unary callable object.
481  *
482  * Here is a trivial example of a map-reduce operation which maps over
483  * two vectors by adding respective elements of the vectors in
484  * separate tasks, and then folds-left using std::accumulate()
485  * (std::accumulate() can fold using any callable object, but in this
486  * example the default of addition is used):
487  *
488  * @code
489  * using namespace Cgu;
490  * std::vector<int> v1{2, 4, 6, 8, 10};
491  * std::vector<int> v2{10, 20, 30, 40, 50};
492  * std::vector<int> v3;
493  * Thread::TaskManager tm;
494  *
495  * Thread::parallel_transform(tm,
496  * v1.begin(),
497  * v1.end(),
498  * v2.begin(),
499  * std::back_inserter(v3),
500  * std::plus<int>());
501  * // res will be equal to 180
502  * int res = std::accumulate(v3.begin(), v3.end(), 0);
503  * @endcode
504  *
505  * @param tm The Thread::TaskManager object on which the tasks will
506  * run.
507  * @param first1 The beginning of the range which is to be passed as
508  * the first argument of 'func'.
509  * @param last1 One past the last element of the range which is to be
510  * passed as the first argument of 'func'.
511  * @param first2 The beginning of the range which is to be passed as
512  * the second argument of 'func'.
513  * @param dest The beginning of the range to which the result of
514  * applying 'func' to the elements in the source ranges is to be
515  * stored. As in the case of std::transform, this can overlap with or
516  * be the same as one of the source ranges. It may also be an
517  * inserter iterator.
518  * @param func A binary callable object to be applied to each element
519  * in the source ranges, such as formed by a lambda expression or the
520  * result of std::bind. It should take two unbound arguments of the
521  * value types of the containers to which 'first1' and 'first2' relate
522  * or const or non-const references to those types. If an exception
523  * propagates from 'func', the exception will be consumed while the
524  * transform loop is running, and an attempt will still be made to
525  * apply 'func' to all remaining elements of the source ranges, and
526  * only after that attempt has completed will the exception
527  * Cgu::Thread::ParallelError be thrown.
528  * @exception std::bad_alloc This exception will be thrown if memory
529  * is exhausted and the system throws in that case. (On systems with
530  * over-commit/lazy-commit combined with virtual memory (swap), it is
531  * rarely useful to check for memory exhaustion). If this exception
532  * is thrown, some tasks may nonetheless have already started by
533  * virtue of the call to this function, but subsequent ones will not.
534  * @exception Cgu::Thread::TaskError This exception will be thrown if
535  * stop_all() has previously been called on the Thread::TaskManager
536  * object, or if another thread calls stop_all() after this method is
537  * called but before it has returned. It will also be thrown if the
538  * Thread::TaskManager object's is_error() method would return true
539  * because its internal thread pool loop implementation has thrown
540  * std::bad_alloc, or a thread has failed to start correctly. (On
541  * systems with over-commit/lazy-commit combined with virtual memory
542  * (swap), it is rarely useful to check for memory exhaustion, but
543  * there may be some specialized cases where the return value of
544  * is_error() is useful.) If this exception is thrown, some tasks may
545  * nonetheless have already started by virtue of the call to this
546  * function.
547  * @exception Cgu::Thread::ParallelError This exception will be thrown
548  * if an exception propagates from the 'func' callable object when it
549  * executes on being applied to one or more elements of the source
550  * ranges. Such an exception will not stop an attempt being made to
551  * apply 'func' (successfully or unsuccessfully) to all elements in
552  * the source ranges. Cgu::Thread::ParallelError will be thrown after
553  * such attempted application has finished.
554  * @exception Cgu::Thread::MutexError This exception will be thrown if
555  * initialization of a mutex used by this function fails. (It is
556  * often not worth checking for this, as it means either memory is
557  * exhausted or pthread has run out of other resources to create new
558  * mutexes.) If this exception is thrown, no tasks will start.
559  * @exception Cgu::Thread::CondError This exception will be thrown if
560  * initialization of a condition variable used by this function fails.
561  * (It is often not worth checking for this, as it means either memory
562  * is exhausted or pthread has run out of other resources to create
563  * new condition variables.) If this exception is thrown, no tasks
564  * will start.
565  * @note An exception might also be thrown if the copy or move
566  * constructor of the 'func' callable objects throws. If such an
567  * exception is thrown, no tasks will start.
568  *
569  * Since 2.0.19/2.2.2
570  */
571 template <class SourceIterator1, class SourceIterator2, class DestIterator, class Func>
573  SourceIterator1 first1,
574  SourceIterator1 last1,
575  SourceIterator2 first2,
576  DestIterator dest,
577  Func&& func) {
578 
579  if (first1 == last1) return;
580 
581  typedef typename std::iterator_traits<SourceIterator1>::value_type Arg1Type;
582  typedef typename std::iterator_traits<SourceIterator1>::difference_type DiffType;
583  typedef typename std::iterator_traits<SourceIterator2>::value_type Arg2Type;
584  typedef typename std::remove_const<typename std::remove_reference<Func>::type>::type FType;
585  typedef decltype(func(*first1, *first2)) DestType;
586 
587  DiffType elts = std::distance(first1, last1);
588  DiffType remaining = elts;
589  ScopedHandle<DestType*> results{new DestType[elts]};
590 
591  Mutex mutex;
592  Cond cond;
593  bool error = false;
594 
595  // construct SafeFunctorArg objects so that they can be shared
596  // between different tasks
598  Cgu::Callback::make_ref(&ParallelHelper::transform2_func<FType, Arg1Type, Arg2Type, DestType>,
599  std::forward<Func>(func))
600  };
602  Cgu::Callback::make(&ParallelHelper::fail_func<DiffType>,
603  &mutex,
604  &cond,
605  &error,
606  &remaining)
607  };
608 
609  for (DiffType index = 0; first1 != last1; ++first1, ++first2, ++index) {
610  std::unique_ptr<const Cgu::Callback::Callback> task_cb(
611  Cgu::Callback::lambda<>(std::bind(&ParallelHelper::transform2_cb_func<Arg1Type, Arg2Type, DestType, DiffType, SourceIterator1, SourceIterator2>,
612  s_task,
613  first1,
614  first2,
615  &mutex,
616  &cond,
617  results.get(),
618  index,
619  &remaining))
620  );
621  std::unique_ptr<const Cgu::Callback::Callback> fail_cb(
622  Cgu::Callback::lambda<>([s_fail] () {s_fail();})
623  );
624 
625  tm.add_task(std::move(task_cb), std::move(fail_cb));
626  }
627 
628  Mutex::Lock l{mutex};
629  while (remaining > 0) cond.wait(mutex);
630  if (error) throw ParallelError();
631  for (DiffType index = 0; index < elts; ++dest, ++index) {
632  *dest = std::move(results[index]);
633  }
634 }
635 
636 } // namespace Thread
637 
638 } // namespace Cgu
639 
640 #endif // CGU_PARALLEL_H