HArD::Core3D
Hybrid Arbitrary Degree::Core 3D - Library to implement 3D schemes with vertex, edge, face and cell polynomials as unknowns
parallel_for.hpp
Go to the documentation of this file.
1 #ifndef PARALLEL_FOR
2 #define PARALLEL_FOR
3 
4 #include <thread>
5 #include <Eigen/Sparse>
6 
7 namespace HArDCore3D
8 {
9 
16  template<typename MatrixType>
18  {
20  SystemVectors(std::vector<MatrixType> sys, std::vector<Eigen::VectorXd> vec):
21  systems(sys),
22  vectors(vec)
23  {
24  };
25 
26  std::vector<MatrixType> systems;
27  std::vector<Eigen::VectorXd> vectors;
28  };
29 
31  static std::pair<std::vector<int>, std::vector<int>>
32  distributeLoad(size_t nb_elements, unsigned nb_threads)
33  {
34  // Vectors of start and end indices
35  std::vector<int> start(nb_threads);
36  std::vector<int> end(nb_threads);
37 
38  // Compute the batch size and the remainder
39  unsigned batch_size = nb_elements / nb_threads;
40  unsigned batch_remainder = nb_elements % nb_threads;
41 
42  // Distribute the remainder over the threads to get the start and end indices for each thread
43  for (unsigned i = 0; i < nb_threads; ++i) {
44  if (i < batch_remainder){
45  start[i] = i * batch_size + i;
46  end[i] = start[i] + batch_size + 1;
47  }
48  else{
49  start[i] = i * batch_size + batch_remainder;
50  end[i] = start[i] + batch_size;
51  }
52  }
53 
54  return std::make_pair(start, end);
55  }
56 
58  static inline void parallel_for(unsigned nb_elements,
59  std::function<void(size_t start, size_t end)> functor,
60  bool use_threads = true)
61  {
62  unsigned nb_threads_hint = std::thread::hardware_concurrency();
63  unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
64 
65  // Generate the start and end indices
66  auto [start, end] = distributeLoad(nb_elements, nb_threads);
67 
68  std::vector<std::thread> my_threads(nb_threads);
69 
70  if (use_threads) {
71  // Multithread execution
72  for (unsigned i = 0; i < nb_threads; ++i) {
73  my_threads[i] = std::thread(functor, start[i], end[i]);
74  }
75  } else {
76  // Single thread execution (for easy debugging)
77  for(unsigned i = 0; i < nb_threads; ++i) {
78  functor(start[i], end[i]);
79  }
80  }
81 
82  // Wait for the other thread to finish their task
83  if (use_threads) {
84  std::for_each(my_threads.begin(), my_threads.end(), std::mem_fn(&std::thread::join));
85  }
86  }
87 
89  static inline SystemVectors<Eigen::SparseMatrix<double>>
91  size_t nb_elements, //< nb of elements over which the threading will be done
92  std::vector<std::pair<size_t, size_t>> size_systems, //< sizes of each system to assemble
93  std::vector<size_t> size_vectors, //< sizes of each vector to assemble
94  std::function<void(size_t start, size_t end, std::vector<std::list<Eigen::Triplet<double>>> * triplets, std::vector<Eigen::VectorXd> * vecs)> batch_local_assembly, //< procedure to compute all the local contributions to the matrices (put in the triplets) and vectors between start and end points (e.g. elements indices)
95  bool use_threads = true //< determine if threaded process is used or not
96  )
97  {
98  // Matrices and vectors
99  std::vector<Eigen::SparseMatrix<double>> systems;
100  systems.reserve(size_systems.size());
101  std::vector<Eigen::VectorXd> vectors;
102  for (auto size : size_vectors){
103  vectors.emplace_back(Eigen::VectorXd::Zero(size));
104  }
105 
106  if (use_threads) {
107  // Select the number of threads
108  unsigned nb_threads_hint = std::thread::hardware_concurrency();
109  unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
110 
111  // Generate the start and end indices
112  auto [start, end] = distributeLoad(nb_elements, nb_threads);
113 
114  // Create vectors of triplets and vectors
115  std::vector<std::vector<std::list<Eigen::Triplet<double> > > > triplets(nb_threads, std::vector<std::list<Eigen::Triplet<double> > >(size_systems.size()));
116  std::vector<std::vector<Eigen::VectorXd>> vecs(nb_threads, vectors);
117 
118  // Assign a task to each thread
119  std::vector<std::thread> my_threads(nb_threads);
120  for (unsigned i = 0; i < nb_threads; ++i) {
121  my_threads[i] = std::thread(batch_local_assembly, start[i], end[i], &triplets[i], &vecs[i]);
122  // std::cout << "Thread " << i << ": " << start[i] << " " << end[i] << std::endl;
123  }
124 
125  // Wait for the other threads to finish their task
126  std::for_each(my_threads.begin(), my_threads.end(), std::mem_fn(&std::thread::join));
127 
128  // Create systems from triplets
129  for (size_t i = 0; i < size_systems.size(); i++){
130  systems.emplace_back(Eigen::SparseMatrix<double>(size_systems[i].first, size_systems[i].second));
131  size_t n_triplets = 0;
132  for (auto triplets_thread : triplets) {
133  n_triplets += triplets_thread[i].size();
134  }
135  std::vector<Eigen::Triplet<double>> all_triplets(n_triplets);
136  auto triplet_index = all_triplets.begin();
137  for (auto triplets_thread : triplets) {
138  triplet_index = std::copy(triplets_thread[i].begin(), triplets_thread[i].end(), triplet_index);
139  }
140  systems[i].setFromTriplets(all_triplets.begin(), all_triplets.end());
141  }
142 
143  for (size_t i = 0; i < size_vectors.size(); i++){
144  for (auto vec_thread : vecs){
145  vectors[i] += vec_thread[i];
146  }
147  }
148 
149  } else {
150  std::vector<std::list<Eigen::Triplet<double> > > triplets(size_systems.size());
151  batch_local_assembly(0, nb_elements, &triplets, &vectors);
152  for (size_t i = 0; i < size_systems.size(); i++){
153  systems.emplace_back(Eigen::SparseMatrix<double>(size_systems[i].first, size_systems[i].second));
154  systems[i].setFromTriplets(triplets[i].begin(), triplets[i].end());
155  }
156  }
157  return SystemVectors(systems, vectors);
158  }
159 
161  static inline std::pair<Eigen::SparseMatrix<double>, Eigen::VectorXd>
163  size_t nb_elements, //< nb of elements over which the threading will be done
164  size_t size_system, //< size of the system to assemble
165  std::function<void(size_t start, size_t end, std::list<Eigen::Triplet<double>> * triplets, Eigen::VectorXd * rhs)> batch_local_assembly, //< procedure to compute all the local contributions to the matrix (put in the triplets) and rhs (put in the vector) between start and end points (e.g. elements indices)
166  bool use_threads = true //< determine if threaded process is used or not
167  )
168  {
169  std::function<void(size_t start, size_t end, std::vector<std::list<Eigen::Triplet<double>>> * triplets, std::vector<Eigen::VectorXd> * vecs)>
170  new_batch_local_assembly = [&](size_t start, size_t end, std::vector<std::list<Eigen::Triplet<double>>> * triplets, std::vector<Eigen::VectorXd> * vecs)-> void
171  {batch_local_assembly(start, end, &(*triplets)[0], &(*vecs)[0]);};
172  auto [systems, vectors] = parallel_assembly_system(nb_elements, {std::make_pair(size_system, size_system)}, {size_system}, new_batch_local_assembly, use_threads);
173  return std::make_pair(systems[0], vectors[0]);
174  }
175 
176 
178  static inline std::tuple<Eigen::SparseMatrix<double>, Eigen::VectorXd, Eigen::SparseMatrix<double>, Eigen::VectorXd>
180  size_t nb_elements, //< nb of elements over which the threading will be done
181  size_t size_system1, //< size of the system 1 to assemble (must be square, corresponds to first matrix and vector=rhs)
182  std::pair<size_t, size_t> size_Mat2, //< size of the second matrix to assemble (can be rectangular)
183  size_t size_b2, //< size of second vector
184  std::function<void(size_t start, size_t end, std::list<Eigen::Triplet<double>> * triplets1, Eigen::VectorXd * vec1, std::list<Eigen::Triplet<double>> * triplets2, Eigen::VectorXd * vec2)> batch_local_assembly, //< procedure to compute all the local contributions to the matrices (put in the triplets) and vectors between start and end points (e.g. elements indices)
185  bool use_threads = true //< determine if threaded process is used or not
186  )
187  {
188  std::function<void(size_t start, size_t end, std::vector<std::list<Eigen::Triplet<double>>> * triplets, std::vector<Eigen::VectorXd> * vecs)>
189  new_batch_local_assembly = [&](size_t start, size_t end, std::vector<std::list<Eigen::Triplet<double>>> * triplets, std::vector<Eigen::VectorXd> * vecs)-> void
190  {batch_local_assembly(start, end, &(*triplets)[0], &(*vecs)[0], &(*triplets)[1], &(*vecs)[1]);};
191  auto [systems, vectors] = parallel_assembly_system(nb_elements, {std::make_pair(size_system1, size_system1), size_Mat2}, {size_system1, size_b2}, new_batch_local_assembly, use_threads);
192  return std::make_tuple(systems[0], vectors[0], systems[1], vectors[1]);
193  }
195 
196 } // end of namespace HArDCore3D
197 #endif
static SystemVectors< Eigen::SparseMatrix< double > > parallel_assembly_system(size_t nb_elements, std::vector< std::pair< size_t, size_t >> size_systems, std::vector< size_t > size_vectors, std::function< void(size_t start, size_t end, std::vector< std::list< Eigen::Triplet< double >>> *triplets, std::vector< Eigen::VectorXd > *vecs)> batch_local_assembly, bool use_threads=true)
Function to assemble global matrices from a procedure that compute local triplets.
Definition: parallel_for.hpp:90
static std::pair< std::vector< int >, std::vector< int > > distributeLoad(size_t nb_elements, unsigned nb_threads)
Function to distribute elements (considered as jobs) over threads. It returns a pair of vectors indic...
Definition: parallel_for.hpp:32
static void parallel_for(unsigned nb_elements, std::function< void(size_t start, size_t end)> functor, bool use_threads=true)
Generic function to execute threaded processes.
Definition: parallel_for.hpp:58
SystemVectors(std::vector< MatrixType > sys, std::vector< Eigen::VectorXd > vec)
Constructor.
Definition: parallel_for.hpp:20
std::vector< Eigen::VectorXd > vectors
Definition: parallel_for.hpp:27
std::vector< MatrixType > systems
Definition: parallel_for.hpp:24
bool use_threads
Definition: HHO_DiffAdvecReac.hpp:47
Definition: ddr-magnetostatics.hpp:40
Struct to store the systems and vector.
Definition: parallel_for.hpp:18