diff --git a/src/input/target/mod.rs b/src/input/target/mod.rs index f5a96514..8caddbe3 100644 --- a/src/input/target/mod.rs +++ b/src/input/target/mod.rs @@ -1,7 +1,7 @@ use std::{ error::Error, io, - sync::{Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex}, thread, time::Duration, }; @@ -360,7 +360,7 @@ pub struct TargetDriver { composite_device: Option, scheduled_events: Vec, tx: mpsc::Sender, - rx: mpsc::Receiver, + rx: Option>, } impl TargetDriver { @@ -385,7 +385,7 @@ impl TargetDriver implementation: Arc::new(Mutex::new(device)), composite_device: None, scheduled_events: Vec::new(), - rx, + rx: Some(rx), tx, } } @@ -399,72 +399,117 @@ impl TargetDriver pub async fn run(mut self, dbus_path: String) -> Result<(), Box> { log::debug!("Started running target device: {dbus_path}"); + // Spawn a task to wait for commands/input events + let implementation = self.implementation.clone(); + let type_id = self.type_id; + let rx = self.rx.take(); + let (writer_tx, mut writer_rx) = mpsc::channel(16); + let writer_task = tokio::task::spawn(async move { + let Some(rx) = rx else { + log::error!("No target command receiver was found on the target device!"); + return; + }; + + let type_id = type_id.as_str(); + if let Err(e) = Self::receive_commands(implementation, type_id, rx, writer_tx).await { + log::debug!("Failed to receive target commands: {e:?}"); + } + }); + // Spawn a blocking task to run the target device. The '?' operator should // be avoided in this task so cleanup tasks can run to remove the DBus // interface and stop the device if an error occurs. let client = self.client(); - let task = + let reader_task = tokio::task::spawn_blocking(move || -> Result<(), Box> { let mut composite_device = self.composite_device; - let mut rx = self.rx; - let mut implementation = self.implementation.lock().unwrap(); - - // Start the DBus interface for the device - implementation.start_dbus_interface(self.dbus.clone(), dbus_path.clone(), client); + { + let mut implementation = self.implementation.lock().unwrap(); + + // Start the DBus interface for the device + implementation.start_dbus_interface( + self.dbus.clone(), + dbus_path.clone(), + client, + ); + } log::debug!("Target device running: {dbus_path}"); loop { - // Find any scheduled events that are ready to be sent - let mut ready_events = vec![]; - let mut i = 0; - while i < self.scheduled_events.len() { - if self.scheduled_events[i].is_ready() { - let event = self.scheduled_events.remove(i); - ready_events.push(event); - continue; + { + // Find any scheduled events that are ready to be sent + let mut ready_events = vec![]; + let mut i = 0; + while i < self.scheduled_events.len() { + if self.scheduled_events[i].is_ready() { + let event = self.scheduled_events.remove(i); + ready_events.push(event); + continue; + } + i += 1; } - i += 1; - } - for event in ready_events.drain(..) { - if let Err(e) = implementation.write_event(event.into()) { - log::error!("Error writing event: {e:?}"); - break; + for event in ready_events.drain(..) { + let mut implementation = self.implementation.lock().unwrap(); + if let Err(e) = implementation.write_event(event.into()) { + log::error!("Error writing event: {e:?}"); + break; + } } - } - - // Receive commands/input events - if let Err(e) = TargetDriver::receive_commands( - self.type_id.as_str(), - &mut composite_device, - &mut rx, - &mut implementation, - ) { - log::debug!("Error receiving commands: {e:?}"); - break; - } - // Poll the implementation for scheduled input events - if let Some(mut scheduled_events) = implementation.scheduled_events() { - self.scheduled_events.append(&mut scheduled_events); - } + // Receive commands/input events + //if let Err(e) = TargetDriver::receive_commands( + // self.type_id.as_str(), + // &mut composite_device, + // &mut rx, + // &mut implementation, + //) { + // log::debug!("Error receiving commands: {e:?}"); + // break; + //} + + // Receive from the writer thread + match writer_rx.try_recv() { + Ok(device) => { + composite_device = Some(device); + } + Err(err) => match err { + TryRecvError::Empty => (), + TryRecvError::Disconnected => { + log::debug!("Writer thread disconnected"); + break; + } + }, + } - // Poll the implementation for output events - let events = match implementation.poll(&composite_device) { - Ok(events) => events, - Err(e) => { - log::error!("Error polling target device: {e:?}"); - break; + // Poll the implementation for scheduled input events + { + let mut implementation = self.implementation.lock().unwrap(); + if let Some(mut scheduled_events) = implementation.scheduled_events() { + self.scheduled_events.append(&mut scheduled_events); + } } - }; - for event in events.into_iter() { - let Some(ref client) = composite_device else { - break; - }; - // Send the output event to source devices - let result = client.blocking_process_output_event(event); - if let Err(e) = result { - return Err(e.to_string().into()); + // Poll the implementation for output events + let events = { + let mut implementation = self.implementation.lock().unwrap(); + match implementation.poll(&composite_device) { + Ok(events) => events, + Err(e) => { + log::error!("Error polling target device: {e:?}"); + break; + } + } + }; + for event in events.into_iter() { + let Some(ref client) = composite_device else { + break; + }; + + // Send the output event to source devices + let result = client.blocking_process_output_event(event); + if let Err(e) = result { + return Err(e.to_string().into()); + } } } @@ -474,6 +519,7 @@ impl TargetDriver // Stop the device log::debug!("Target device stopping: {dbus_path}"); + let mut implementation = self.implementation.lock().unwrap(); implementation.stop_dbus_interface(self.dbus, dbus_path.clone()); implementation.stop()?; log::debug!("Target device stopped: {dbus_path}"); @@ -482,64 +528,132 @@ impl TargetDriver }); // Wait for the device to finish running. - if let Err(e) = task.await? { + let (reader_result, writer_result) = tokio::join!(reader_task, writer_task); + if let Err(e) = reader_result { + return Err(e.to_string().into()); + } + if let Err(e) = writer_result { return Err(e.to_string().into()); } Ok(()) } - /// Read commands sent to this device from the channel until it is - /// empty. - fn receive_commands( + /// Read and process commands sent to this device from the channel in a loop + async fn receive_commands( + implementation: Arc>, type_id: &str, - composite_device: &mut Option, - rx: &mut mpsc::Receiver, - implementation: &mut MutexGuard<'_, T>, + mut rx: mpsc::Receiver, + writer_tx: mpsc::Sender, ) -> Result<(), Box> { - const MAX_COMMANDS: u8 = 64; - let mut commands_processed = 0; + const BUFFER_SIZE: usize = 1024; + let mut buffer = Vec::with_capacity(BUFFER_SIZE); loop { - match rx.try_recv() { - Ok(cmd) => match cmd { - TargetCommand::WriteEvent(event) => { - implementation.write_event(event)?; - } - TargetCommand::SetCompositeDevice(device) => { - *composite_device = Some(device.clone()); - implementation.on_composite_device_attached(device)?; - } - TargetCommand::GetCapabilities(sender) => { - let capabilities = implementation.get_capabilities().unwrap_or_default(); - sender.blocking_send(capabilities)?; - } - TargetCommand::GetType(sender) => { - sender.blocking_send(type_id.to_string())?; - } - TargetCommand::ClearState => { - implementation.clear_state(); - } - TargetCommand::Stop => { - implementation.stop()?; - return Err("Target device stopped".into()); - } - }, - Err(e) => match e { - TryRecvError::Empty => return Ok(()), - TryRecvError::Disconnected => { - log::debug!("Receive channel disconnected"); - return Err("Receive channel disconnected".into()); - } - }, - }; + let num = rx.recv_many(&mut buffer, BUFFER_SIZE).await; + if num == 0 { + log::warn!("Unable to receive more commands. Channel closed."); + break; + } + for cmd in buffer.drain(..) { + Self::process_command(&implementation, type_id, cmd, &writer_tx).await?; + } + } - // Only process MAX_COMMANDS messages at a time - commands_processed += 1; - if commands_processed >= MAX_COMMANDS { - return Ok(()); + Ok(()) + } + + /// Process the given target command + async fn process_command( + implementation: &Arc>, + type_id: &str, + cmd: TargetCommand, + writer_tx: &mpsc::Sender, + ) -> Result<(), Box> { + match cmd { + TargetCommand::WriteEvent(event) => { + let mut implementation = implementation.lock().unwrap(); + implementation.write_event(event)?; + } + TargetCommand::SetCompositeDevice(device) => { + writer_tx.send(device.clone()).await?; + let mut implementation = implementation.lock().unwrap(); + implementation.on_composite_device_attached(device)?; + } + TargetCommand::GetCapabilities(sender) => { + let capabilities = { + let implementation = implementation.lock().unwrap(); + implementation.get_capabilities().unwrap_or_default() + }; + sender.send(capabilities).await?; + } + TargetCommand::GetType(sender) => { + sender.send(type_id.to_string()).await?; + } + TargetCommand::ClearState => { + let mut implementation = implementation.lock().unwrap(); + implementation.clear_state(); + } + TargetCommand::Stop => { + let mut implementation = implementation.lock().unwrap(); + implementation.stop()?; + return Err("Target device stopped".into()); } } + + Ok(()) } + + //// Read commands sent to this device from the channel until it is + //// empty. + //fn receive_commands( + // type_id: &str, + // composite_device: &mut Option, + // rx: &mut mpsc::Receiver, + // implementation: &mut MutexGuard<'_, T>, + //) -> Result<(), Box> { + // const MAX_COMMANDS: u8 = 64; + // let mut commands_processed = 0; + // loop { + // match rx.try_recv() { + // Ok(cmd) => match cmd { + // TargetCommand::WriteEvent(event) => { + // implementation.write_event(event)?; + // } + // TargetCommand::SetCompositeDevice(device) => { + // *composite_device = Some(device.clone()); + // implementation.on_composite_device_attached(device)?; + // } + // TargetCommand::GetCapabilities(sender) => { + // let capabilities = implementation.get_capabilities().unwrap_or_default(); + // sender.blocking_send(capabilities)?; + // } + // TargetCommand::GetType(sender) => { + // sender.blocking_send(type_id.to_string())?; + // } + // TargetCommand::ClearState => { + // implementation.clear_state(); + // } + // TargetCommand::Stop => { + // implementation.stop()?; + // return Err("Target device stopped".into()); + // } + // }, + // Err(e) => match e { + // TryRecvError::Empty => return Ok(()), + // TryRecvError::Disconnected => { + // log::debug!("Receive channel disconnected"); + // return Err("Receive channel disconnected".into()); + // } + // }, + // }; + + // // Only process MAX_COMMANDS messages at a time + // commands_processed += 1; + // if commands_processed >= MAX_COMMANDS { + // return Ok(()); + // } + // } + //} } /// A [TargetDevice] is any virtual input device that emits input events