Skip to content

Commit

Permalink
Switch to using ZMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
sfegan committed Feb 21, 2025
1 parent 966063e commit 5f82390
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
8 changes: 7 additions & 1 deletion include/diagnostics/reduced_event_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

#pragma once

#include <thread>
#include <memory>

#include <io/zmq_inproc_push_pull.hpp>
#include <iact_data/event_visitor.hpp>
#include <iact_data/waveform_treatment_event_visitor.hpp>

Expand Down Expand Up @@ -76,7 +80,9 @@ class ReducedEventWriterParallelEventVisitor:
calin::iact_data::waveform_treatment_event_visitor::OptimalWindowSumWaveformTreatmentParallelEventVisitor* gain2_visitor_ = nullptr;
bool adopt_gain_visitors_ = false;

std::mutex event_writer_mutex_;
std::unique_ptr<std::thread> writer_thread_;
std::unique_ptr<calin::io::zmq_inproc::ZMQPusher> zmq_pusher_;
std::unique_ptr<calin::io::zmq_inproc::ZMQInprocPushPull> zmq_push_pull_;
std::unique_ptr<calin::ix::diagnostics::reduced_event::ReducedEvent_StreamWriter> event_writer_;

calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration run_config_;
Expand Down
50 changes: 35 additions & 15 deletions src/diagnostics/reduced_event_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,24 @@ bool ReducedEventWriterParallelEventVisitor::visit_telescope_run(
run_config_writer->write(*run_config);
delete run_config_writer;

event_writer_.reset(calin::ix::diagnostics::reduced_event::ReducedEvent::NewHDFStreamWriter(filename, config_.event_group()));
zmq_push_pull_ = std::make_unique<calin::io::zmq_inproc::ZMQInprocPushPull>();

writer_thread_ = std::make_unique<std::thread>([this,filename](){
std::unique_ptr<calin::io::zmq_inproc::ZMQPuller> zmq_puller {
zmq_push_pull_->new_puller(calin::io::zmq_inproc::ZMQBindOrConnect::BIND) };
std::unique_ptr<calin::ix::diagnostics::reduced_event::ReducedEvent_StreamWriter> writer {
calin::ix::diagnostics::reduced_event::ReducedEvent::NewHDFStreamWriter(filename, config_.event_group()) };
while(zmq_puller->wait_for_data()) {
std::pair<google::protobuf::Arena*, calin::ix::diagnostics::reduced_event::ReducedEvent*> arena_event;
if(not zmq_puller->pull_assert_size(&arena_event, sizeof(arena_event),true))break;
writer->write(*arena_event.second);
delete(arena_event.first);
}
});

zmq_pusher_.reset(zmq_push_pull_->new_pusher(calin::io::zmq_inproc::ZMQBindOrConnect::CONNECT));
} else {
zmq_pusher_.reset(parent_->zmq_push_pull_->new_pusher(calin::io::zmq_inproc::ZMQBindOrConnect::CONNECT));
}

return true;
Expand All @@ -114,7 +131,12 @@ bool ReducedEventWriterParallelEventVisitor::visit_telescope_run(
bool ReducedEventWriterParallelEventVisitor::leave_telescope_run(
calin::ix::provenance::chronicle::ProcessingRecord* processing_record)
{
event_writer_.reset();
zmq_pusher_.reset();
if(parent_ == nullptr) {
zmq_push_pull_.reset();
writer_thread_->join();
writer_thread_.reset();
}
return true;
}

Expand Down Expand Up @@ -179,29 +201,27 @@ namespace {
bool ReducedEventWriterParallelEventVisitor::visit_telescope_event(uint64_t seq_index,
calin::ix::iact_data::telescope_event::TelescopeEvent* event)
{
calin::ix::diagnostics::reduced_event::ReducedEvent reduced_event;
reduced_event.set_local_event_number(event->local_event_number());
reduced_event.set_trigger_type(
auto* arena = new google::protobuf::Arena();
auto* reduced_event = google::protobuf::Arena::CreateMessage<calin::ix::diagnostics::reduced_event::ReducedEvent>(arena);

reduced_event->set_local_event_number(event->local_event_number());
reduced_event->set_trigger_type(
static_cast<calin::ix::diagnostics::reduced_event::TriggerType>(event->trigger_type()));
reduced_event.set_absolute_event_time_ns(event->absolute_event_time().time_ns());
reduced_event->set_absolute_event_time_ns(event->absolute_event_time().time_ns());
if(config_.write_gain1() and gain1_visitor_ and gain1_visitor_->is_same_event(seq_index)) {
copy_gain(reduced_event.mutable_gain1(), gain1_visitor_, config_);
copy_gain(reduced_event->mutable_gain1(), gain1_visitor_, config_);
}
if(config_.write_gain2() and gain2_visitor_ and gain2_visitor_->is_same_event(seq_index)) {
copy_gain(reduced_event.mutable_gain2(), gain2_visitor_, config_);
copy_gain(reduced_event->mutable_gain2(), gain2_visitor_, config_);
}
if(config_.write_l0_trigger_map() and event->has_trigger_map() and event->trigger_map().trigger_image_size() > 0) {
auto* l0map = reduced_event.mutable_l0_trigger_map();
auto* l0map = reduced_event->mutable_l0_trigger_map();
l0map->mutable_trigger_hit()->Add(
event->trigger_map().trigger_image().begin(), event->trigger_map().trigger_image().end());
}

if(parent_) {
std::lock_guard<std::mutex> lock(parent_->event_writer_mutex_);
parent_->event_writer_->write(reduced_event);
} else {
event_writer_->write(reduced_event);
}
std::pair<google::protobuf::Arena*, calin::ix::diagnostics::reduced_event::ReducedEvent*> arena_event{arena, reduced_event};
zmq_pusher_->push(&arena_event, sizeof(arena_event));

return true;
}
Expand Down

0 comments on commit 5f82390

Please sign in to comment.