Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion rust_qsim/src/bin/local_qsim_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,44 @@ fn main() {
let total_thread_count = config.partitioning().num_parts + 1;
let barrier = Arc::new(Barrier::new(total_thread_count as usize));

// Configuring the routing adapter. We need
// - the IP address of the router service
// - the configuration of the simulation
// - the shutdown handles of the executor (= receiver of shutdown signals from the controller)
// The AsyncExecutor will spawn a thread for the routing service adapter and an async runtime.
let executor = AsyncExecutor::from_config(&config, barrier.clone());
let factory = RoutingServiceAdapterFactory::new(
vec![&args.router_ip],
config.clone(),
executor.shutdown_handles(),
);

// Spawning the routing service adapter in a separate thread. The adapter will be run in its own tokio runtime.
// This function returns
// - the join handle of the adapter thread
// - a channel for sending requests to the adapter
// - a channel for sending shutdown signal for the adapter
let (router_handle, send, send_sd) = executor.spawn_thread("router", factory);

// The request sender is passed to the controller.
let mut services = ExternalServices::default();
services.insert(ExternalServiceType::Routing("pt".into()), send.into());

let scenario = GlobalScenario::build(config);
// Load scenario
let scenario = GlobalScenario::load(config);

// Create controller
let controller = LocalControllerBuilder::default()
.global_scenario(scenario)
.external_services(services)
.global_barrier(barrier)
.build()
.unwrap();

// Run controller
let sim_handles = controller.run();

// Wait for the controller to finish and the routing adapter to finish.
controller::try_join(
sim_handles,
vec![AdapterHandleBuilder::default()
Expand Down
4 changes: 2 additions & 2 deletions rust_qsim/src/external_services/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ impl From<Response> for InternalRoutingResponse {
pub struct RoutingServiceAdapterFactory {
ip: Vec<String>,
config: Arc<Config>,
shutdown_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
shutdown_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
}

impl RoutingServiceAdapterFactory {
pub fn new(
ip: Vec<impl Into<String>>,
config: Arc<Config>,
shutdown_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
shutdown_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
) -> Self {
Self {
ip: ip.into_iter().map(|s| s.into()).collect(),
Expand Down
2 changes: 1 addition & 1 deletion rust_qsim/src/simulation/controller/local_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn run_channel_from_args() {
let config = Arc::new(Config::from(args));

// Load and adapt scenario
let scenario = GlobalScenario::build(config);
let scenario = GlobalScenario::load(config);

// Create and run simulation
let controller = LocalControllerBuilder::default()
Expand Down
8 changes: 5 additions & 3 deletions rust_qsim/src/simulation/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ fn create_events(
Rc::new(RefCell::new(events))
}

/// Have this more complicated join logic, so that threads in the back of the handle vec can also
/// cause the main thread to panic.
/// Joins all simulation threads and then shuts down all adapter threads.
pub fn try_join(mut handles: IntMap<u32, JoinHandle<()>>, adapters: Vec<AdapterHandle>) {
while !handles.is_empty() {
sleep(Duration::from_secs(1)); // test for finished threads once a second
Expand All @@ -207,7 +206,10 @@ pub fn try_join(mut handles: IntMap<u32, JoinHandle<()>>, adapters: Vec<AdapterH
}
for i in finished {
let handle = handles.remove(&i).unwrap();
handle.join().expect("Error in a thread");
let name = handle.thread().name().unwrap().to_string();
handle
.join()
.unwrap_or_else(|_| panic!("Error in adapter thread {:?}", name));
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust_qsim/src/simulation/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct GlobalScenario {
}

impl GlobalScenario {
pub fn build(config: Arc<Config>) -> Self {
pub fn load(config: Arc<Config>) -> Self {
id::load_from_file(&io::resolve_path(
config.context(),
&config.proto_files().ids,
Expand Down
2 changes: 1 addition & 1 deletion rust_qsim/tests/test_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl TestExecutor<'_> {
&mut self,
subscribers: HashMap<u32, Vec<Box<OnEventFnBuilder>>>,
) -> IntMap<u32, JoinHandle<()>> {
let scenario = GlobalScenario::build(self.config.clone());
let scenario = GlobalScenario::load(self.config.clone());

let controller = LocalControllerBuilder::default()
.global_scenario(scenario)
Expand Down