diff --git a/bastion/Cargo.toml b/bastion/Cargo.toml index 8e18c9c4..a2b7925a 100644 --- a/bastion/Cargo.toml +++ b/bastion/Cargo.toml @@ -49,6 +49,10 @@ futures-timer = "3.0.0" fxhash = "0.2" lazy_static = "1.4" log = "0.4" +ipc-channel = "0.14.0" +serde = { version = "1.0", features = ["derive"] } + + # TODO: https://github.com/cogciprocate/qutex/pull/5 # TODO: https://github.com/cogciprocate/qutex/pull/6 bastion-qutex = { version = "0.2", features = ["async_await"] } diff --git a/bastion/src/lib.rs b/bastion/src/lib.rs index bc08f12c..cafd0261 100644 --- a/bastion/src/lib.rs +++ b/bastion/src/lib.rs @@ -83,6 +83,8 @@ pub mod message; pub mod path; pub mod supervisor; +pub mod parallel; + /// /// Prelude of Bastion pub mod prelude { diff --git a/bastion/src/parallel/callbacks.rs b/bastion/src/parallel/callbacks.rs new file mode 100644 index 00000000..b3a8687b --- /dev/null +++ b/bastion/src/parallel/callbacks.rs @@ -0,0 +1,27 @@ +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; +use std::boxed::Box; +use std::io; + +pub type CallbackFunc = dyn FnMut() -> io::Result<()> + Send + Sync + 'static; +pub type SafeCallbackFunc = Arc>>; + +#[derive(Default)] +pub struct ProcessCallbacks { + pub before_start: Option, + pub before_restart: Option, + pub after_restart: Option, + pub after_stop: Option, +} + +impl Debug for ProcessCallbacks { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + fmt.debug_struct("ProcessCallbacks") + .field("before_start", &self.before_start.is_some()) + .field("before_restart", &self.before_start.is_some()) + .field("after_restart", &self.before_start.is_some()) + .field("after_stop", &self.before_start.is_some()) + .finish() + } +} + diff --git a/bastion/src/parallel/macros.rs b/bastion/src/parallel/macros.rs new file mode 100644 index 00000000..e69de29b diff --git a/bastion/src/parallel/mod.rs b/bastion/src/parallel/mod.rs new file mode 100644 index 00000000..0e6abdf4 --- /dev/null +++ b/bastion/src/parallel/mod.rs @@ -0,0 +1,3 @@ +pub mod callbacks; +pub mod ops; +pub mod process; diff --git a/bastion/src/parallel/ops.rs b/bastion/src/parallel/ops.rs new file mode 100644 index 00000000..e69de29b diff --git a/bastion/src/parallel/process.rs b/bastion/src/parallel/process.rs new file mode 100644 index 00000000..faa66bcb --- /dev/null +++ b/bastion/src/parallel/process.rs @@ -0,0 +1,152 @@ +use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use std::ffi::OsStr; +use std::ffi::OsString; +use std::process::Stdio; +use std::io; +use super::callbacks::*; + +#[derive(Debug)] +pub struct ProcessData { + pub(crate) callbacks: ProcessCallbacks, + pub(crate) envs: HashMap, +} + +impl Default for ProcessData { + fn default() -> Self { + Self { + callbacks: ProcessCallbacks::default(), + envs: std::env::vars_os().collect(), + } + } +} + + +#[derive(Debug, Default)] +pub struct Builder { + pub(crate) stdin: Option, + pub(crate) stdout: Option, + pub(crate) stderr: Option, + pub(crate) data: ProcessData, +} + +impl Builder { + pub fn new() -> Self { + Self { + stdin: None, + stdout: None, + stderr: None, + data: ProcessData::default() + } + } + + /// Process data which has given to spawned Process + pub(crate) fn data(&mut self, proc_data: ProcessData) -> &mut Self { + self.data = proc_data; + self + } + + /// Set an environment variable in the spawned process. Equivalent to `Command::env` + pub fn env(&mut self, key: K, val: V) -> &mut Self + where + K: AsRef, + V: AsRef, + { + self.data.envs + .insert(key.as_ref().to_owned(), val.as_ref().to_owned()); + self + } + + /// Set environment variables in the spawned process. Equivalent to `Command::envs` + pub fn envs(&mut self, vars: I) -> &mut Self + where + I: IntoIterator, + K: AsRef, + V: AsRef, + { + self.data.envs.extend( + vars.into_iter() + .map(|(k, v)| (k.as_ref().to_owned(), v.as_ref().to_owned())), + ); + self + } + + /// + /// Removes an environment variable in the spawned process. Equivalent to `Command::env_remove` + pub fn env_remove>(&mut self, key: K) -> &mut Self { + self.data.envs.remove(key.as_ref()); + self + } + + /// + /// Clears all environment variables in the spawned process. Equivalent to `Command::env_clear` + pub fn env_clear(&mut self) -> &mut Self { + self.data.envs.clear(); + self + } + + /// + /// Captures the `stdin` of the spawned process, allowing you to manually send data via `JoinHandle::stdin` + pub fn stdin>(&mut self, cfg: T) -> &mut Self { + self.stdin = Some(cfg.into()); + self + } + + /// + /// Captures the `stdout` of the spawned process, allowing you to manually receive data via `JoinHandle::stdout` + pub fn stdout>(&mut self, cfg: T) -> &mut Self { + self.stdout = Some(cfg.into()); + self + } + + /// + /// Captures the `stderr` of the spawned process, allowing you to manually receive data via `JoinHandle::stderr` + pub fn stderr>(&mut self, cfg: T) -> &mut Self { + self.stderr = Some(cfg.into()); + self + } + + /// + /// Process before start + #[cfg(unix)] + pub fn before_start(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.before_start = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } + + /// + /// Process before restart + #[cfg(unix)] + pub fn before_restart(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.before_restart = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } + + /// + /// Process after restart + #[cfg(unix)] + pub fn after_restart(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.after_restart = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } + + /// + /// Process after stop + #[cfg(unix)] + pub fn after_stop(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.after_stop = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } +}