parallel_for.h 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // This file is part of libigl, a simple c++ geometry processing library.
  2. //
  3. // Copyright (C) 2016 Alec Jacobson <alecjacobson@gmail.com>
  4. //
  5. // This Source Code Form is subject to the terms of the Mozilla Public License
  6. // v. 2.0. If a copy of the MPL was not distributed with this file, You can
  7. // obtain one at http://mozilla.org/MPL/2.0/.
  8. #ifndef IGL_PARALLEL_FOR_H
  9. #define IGL_PARALLEL_FOR_H
  10. #include "igl_inline.h"
  11. #include <functional>
  12. //#warning "Defining IGL_PARALLEL_FOR_FORCE_SERIAL"
  13. //#define IGL_PARALLEL_FOR_FORCE_SERIAL
  14. namespace igl
  15. {
  16. // PARALLEL_FOR Functional implementation of a basic, open-mp style, parallel
  17. // for loop. If the inner block of a for-loop can be rewritten/encapsulated in
  18. // a single (anonymous/lambda) function call `func` so that the serial code
  19. // looks like:
  20. //
  21. // for(int i = 0;i<loop_size;i++)
  22. // {
  23. // func(i);
  24. // }
  25. //
  26. // then `parallel_for(loop_size,func,min_parallel)` will use as many threads as
  27. // available on the current hardware to parallelize this for loop so long as
  28. // loop_size<min_parallel, otherwise it will just use a serial for loop.
  29. //
  30. // Inputs:
  31. // loop_size number of iterations. I.e. for(int i = 0;i<loop_size;i++) ...
  32. // func function handle taking iteration index as only argument to compute
  33. // inner block of for loop I.e. for(int i ...){ func(i); }
  34. // min_parallel min size of loop_size such that parallel (non-serial)
  35. // thread pooling should be attempted {0}
  36. // Returns true iff thread pool was invoked
  37. template<typename Index, typename FunctionType >
  38. inline bool parallel_for(
  39. const Index loop_size,
  40. const FunctionType & func,
  41. const size_t min_parallel=0);
  42. // PARALLEL_FOR Functional implementation of an open-mp style, parallel for
  43. // loop with accumulation. For example, serial code separated into n chunks
  44. // (each to be parallelized with a thread) might look like:
  45. //
  46. // Eigen::VectorXd S;
  47. // const auto & prep_func = [&S](int n){ S = Eigen:VectorXd::Zero(n); };
  48. // const auto & func = [&X,&S](int i, int t){ S(t) += X(i); };
  49. // const auto & accum_func = [&S,&sum](int t){ sum += S(t); };
  50. // prep_func(n);
  51. // for(int i = 0;i<loop_size;i++)
  52. // {
  53. // func(i,i%n);
  54. // }
  55. // double sum = 0;
  56. // for(int t = 0;t<n;t++)
  57. // {
  58. // accum_func(t);
  59. // }
  60. //
  61. // Inputs:
  62. // loop_size number of iterations. I.e. for(int i = 0;i<loop_size;i++) ...
  63. // prep_func function handle taking n >= number of threads as only
  64. // argument
  65. // func function handle taking iteration index i and thread id t as only
  66. // arguments to compute inner block of for loop I.e.
  67. // for(int i ...){ func(i,t); }
  68. // accum_func function handle taking thread index as only argument, to be
  69. // called after all calls of func, e.g., for serial accumulation across
  70. // all n (potential) threads, see n in description of prep_func.
  71. // min_parallel min size of loop_size such that parallel (non-serial)
  72. // thread pooling should be attempted {0}
  73. // Returns true iff thread pool was invoked
  74. template<
  75. typename Index,
  76. typename PrepFunctionType,
  77. typename FunctionType,
  78. typename AccumFunctionType
  79. >
  80. inline bool parallel_for(
  81. const Index loop_size,
  82. const PrepFunctionType & prep_func,
  83. const FunctionType & func,
  84. const AccumFunctionType & accum_func,
  85. const size_t min_parallel=0);
  86. }
  87. // Implementation
  88. #include <cmath>
  89. #include <cassert>
  90. #include <thread>
  91. #include <vector>
  92. #include <algorithm>
  93. template<typename Index, typename FunctionType >
  94. inline bool igl::parallel_for(
  95. const Index loop_size,
  96. const FunctionType & func,
  97. const size_t min_parallel)
  98. {
  99. using namespace std;
  100. // no op preparation/accumulation
  101. const auto & no_op = [](const size_t /*n/t*/){};
  102. // two-parameter wrapper ignoring thread id
  103. const auto & wrapper = [&func](Index i,size_t /*t*/){ func(i); };
  104. return parallel_for(loop_size,no_op,wrapper,no_op,min_parallel);
  105. }
  106. template<
  107. typename Index,
  108. typename PreFunctionType,
  109. typename FunctionType,
  110. typename AccumFunctionType>
  111. inline bool igl::parallel_for(
  112. const Index loop_size,
  113. const PreFunctionType & prep_func,
  114. const FunctionType & func,
  115. const AccumFunctionType & accum_func,
  116. const size_t min_parallel)
  117. {
  118. assert(loop_size>=0);
  119. if(loop_size==0) return false;
  120. // Estimate number of threads in the pool
  121. // http://ideone.com/Z7zldb
  122. const static size_t sthc = std::thread::hardware_concurrency();
  123. const size_t nthreads =
  124. #ifdef IGL_PARALLEL_FOR_FORCE_SERIAL
  125. 0;
  126. #else
  127. loop_size<min_parallel?0:(sthc==0?8:sthc);
  128. #endif
  129. if(nthreads==0)
  130. {
  131. // serial
  132. prep_func(1);
  133. for(Index i = 0;i<loop_size;i++) func(i,0);
  134. accum_func(0);
  135. return false;
  136. }else
  137. {
  138. // Size of a slice for the range functions
  139. Index slice =
  140. std::max(
  141. (Index)std::round((loop_size+1)/static_cast<double>(nthreads)),(Index)1);
  142. // [Helper] Inner loop
  143. const auto & range = [&func](const Index k1, const Index k2, const size_t t)
  144. {
  145. for(Index k = k1; k < k2; k++) func(k,t);
  146. };
  147. prep_func(nthreads);
  148. // Create pool and launch jobs
  149. std::vector<std::thread> pool;
  150. pool.reserve(nthreads);
  151. // Inner range extents
  152. Index i1 = 0;
  153. Index i2 = std::min(0 + slice, loop_size);
  154. {
  155. size_t t = 0;
  156. for (; t+1 < nthreads && i1 < loop_size; ++t)
  157. {
  158. pool.emplace_back(range, i1, i2, t);
  159. i1 = i2;
  160. i2 = std::min(i2 + slice, loop_size);
  161. }
  162. if (i1 < loop_size)
  163. {
  164. pool.emplace_back(range, i1, loop_size, t);
  165. }
  166. }
  167. // Wait for jobs to finish
  168. for (std::thread &t : pool) if (t.joinable()) t.join();
  169. // Accumulate across threads
  170. for(size_t t = 0;t<nthreads;t++)
  171. {
  172. accum_func(t);
  173. }
  174. return true;
  175. }
  176. }
  177. //#ifndef IGL_STATIC_LIBRARY
  178. //#include "parallel_for.cpp"
  179. //#endif
  180. #endif