Skip to content

Commit b935ad5

Browse files
committed
Parallelize leave_run
1 parent 5f8b02b commit b935ad5

File tree

1 file changed

+30
-2
lines changed

1 file changed

+30
-2
lines changed

src/iact_data/parallel_event_dispatcher.cpp

+30-2
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ void ParallelEventDispatcher::do_parallel_dispatcher_loops(
368368
calin::io::data_source::DataSourceFactory<
369369
calin::ix::iact_data::telescope_event::TelescopeEvent>* src_factory,
370370
unsigned nthread, unsigned log_frequency,
371-
std::chrono::system_clock::time_point& start_time,
371+
std::chrono::system_clock::time_point& start_time,
372372
std::atomic<uint_fast64_t>& ndispatched)
373373
{
374374
std::vector<ParallelEventDispatcher*> sub_dispatchers;
@@ -423,6 +423,8 @@ void ParallelEventDispatcher::do_parallel_dispatcher_loops(
423423
}
424424

425425
for(auto& i : threads)i.join();
426+
threads.clear();
427+
threads_active = 0;
426428

427429
if(exceptions_raised) {
428430
for(auto* d : sub_dispatchers) {
@@ -437,7 +439,33 @@ void ParallelEventDispatcher::do_parallel_dispatcher_loops(
437439

438440
for(auto* d : sub_dispatchers)
439441
{
440-
d->dispatch_leave_run();
442+
++threads_active;
443+
threads.emplace_back([d,&threads_active,&exceptions_raised](){
444+
try {
445+
d->dispatch_leave_run();
446+
} catch(const std::exception& x) {
447+
util::log::LOG(util::log::FATAL) << x.what();
448+
++exceptions_raised;
449+
--threads_active;
450+
return;
451+
}
452+
--threads_active;
453+
});
454+
}
455+
456+
for(auto& i : threads)i.join();
457+
threads.clear();
458+
threads_active = 0;
459+
460+
if(exceptions_raised) {
461+
for(auto* d : sub_dispatchers) {
462+
delete d;
463+
}
464+
throw std::runtime_error("Exception(s) thrown in threaded merge processing");
465+
}
466+
467+
for(auto* d : sub_dispatchers)
468+
{
441469
d->dispatch_merge_results();
442470
delete d;
443471
}

0 commit comments

Comments
 (0)