diff --git a/crates/runc-shim/src/common.rs b/crates/runc-shim/src/common.rs index d6dd8bb7..bd00839a 100644 --- a/crates/runc-shim/src/common.rs +++ b/crates/runc-shim/src/common.rs @@ -29,11 +29,9 @@ use std::{ use containerd_shim::{ api::{ExecProcessRequest, Options}, - io_error, other, other_error, - util::IntoOption, - Error, + io_error, other, other_error, Error, }; -use log::{debug, warn}; +use log::{debug, info, warn}; use nix::{ cmsg_space, sys::{ @@ -43,7 +41,7 @@ use nix::{ }; use oci_spec::runtime::{LinuxNamespaceType, Spec}; use runc::{ - io::{Io, NullIo, FIFO}, + io::{IOOption, Io, NullIo, PipedIo}, options::GlobalOpts, Runc, Spawner, }; @@ -76,8 +74,8 @@ pub struct ProcessIO { pub fn create_io( id: &str, - _io_uid: u32, - _io_gid: u32, + io_uid: u32, + io_gid: u32, stdio: &Stdio, ) -> containerd_shim::Result { let mut pio = ProcessIO::default(); @@ -100,19 +98,32 @@ pub fn create_io( if scheme == FIFO_SCHEME { debug!( - "create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}", + "create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}", id, stdio.stdin.as_str(), stdio.stdout.as_str(), stdio.stderr.as_str() ); - let io = FIFO { - stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()), - stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()), - stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()), + + // let io = FIFO { + // stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()), + // stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()), + // stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()), + // }; + // pio.copy = false; + + if stdio.stdin.is_empty() { + debug!("stdin is empty"); + } + let opts = IOOption { + open_stdin: !stdio.stdin.is_empty(), + open_stdout: !stdio.stdout.is_empty(), + open_stderr: !stdio.stderr.is_empty(), }; + let io = PipedIo::new(io_uid, io_gid, &opts).unwrap(); + pio.copy = true; + pio.io = Some(Arc::new(io)); - pio.copy = false; } Ok(pio) } diff --git a/crates/runc-shim/src/processes.rs b/crates/runc-shim/src/processes.rs index f0b982d0..63b9f131 100644 --- a/crates/runc-shim/src/processes.rs +++ b/crates/runc-shim/src/processes.rs @@ -37,7 +37,7 @@ use tokio::{ sync::oneshot::{channel, Receiver, Sender}, }; -use crate::io::Stdio; +use crate::{common::ProcessIO, io::Stdio}; #[async_trait] pub trait Process { @@ -71,6 +71,7 @@ pub struct ProcessTemplate { pub state: Status, pub id: String, pub stdio: Stdio, + pub io: Option>, pub pid: i32, pub exit_code: i32, pub exited_at: Option, @@ -86,6 +87,7 @@ impl ProcessTemplate { state: Status::CREATED, id: id.to_string(), stdio, + io: None, pid: 0, exit_code: 0, exited_at: None, diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/runc.rs index 92bb0f23..231ca0ad 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/runc.rs @@ -163,8 +163,10 @@ impl RuncFactory { (Some(s), None) } else { let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?; - create_opts.io = pio.io.as_ref().cloned(); - (None, Some(pio)) + let ref_pio = Arc::new(pio); + create_opts.io = ref_pio.io.clone(); + init.io = Some(ref_pio.clone()); + (None, Some(ref_pio)) }; let resp = init @@ -178,6 +180,22 @@ impl RuncFactory { } return Err(runtime_error(bundle, e, "OCI runtime create failed").await); } + if !init.stdio.stdin.is_empty() { + let stdin_clone = init.stdio.stdin.clone(); + let stdin_w = init.stdin.clone(); + // Open the write side in advance to make sure read side will not block, + // open it in another thread otherwise it will block too. + tokio::spawn(async move { + if let Ok(stdin_w_file) = OpenOptions::new() + .write(true) + .open(stdin_clone.as_str()) + .await + { + let mut lock_guard = stdin_w.lock().unwrap(); + *lock_guard = Some(stdin_w_file); + } + }); + } copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?; let pid = read_file_to_str(pid_path).await?.parse::()?; init.pid = pid; @@ -232,6 +250,7 @@ impl ProcessFactory for RuncExecFactory { stderr: req.stderr.to_string(), terminal: req.terminal, }, + io: None, pid: 0, exit_code: 0, exited_at: None, @@ -394,8 +413,10 @@ impl ProcessLifecycle for RuncExecLifecycle { (Some(s), None) } else { let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?; - exec_opts.io = pio.io.as_ref().cloned(); - (None, Some(pio)) + let ref_pio = Arc::new(pio); + exec_opts.io = ref_pio.io.clone(); + p.io = Some(ref_pio.clone()); + (None, Some(ref_pio)) }; //TODO checkpoint support let exec_result = self @@ -457,6 +478,15 @@ impl ProcessLifecycle for RuncExecLifecycle { async fn delete(&self, p: &mut ExecProcess) -> Result<()> { self.exit_signal.signal(); + //close pipe read + if !p.stdio.is_null() { + if let Some(c) = p.io.clone() { + if let Some(io) = c.io.clone() { + io.close_all_sid(); + } + } + } + debug!("Do close io complete"); let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id)); remove_file(exec_pid_path).await.unwrap_or_default(); Ok(()) @@ -568,6 +598,7 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc( p: &mut ProcessTemplate

, socket: Option, - pio: Option, + pio: Option>, exit_signal: Arc, ) -> Result<()> { if p.stdio.terminal { @@ -670,6 +702,7 @@ impl Spawner for ShimExecutor { } }; let pid = child.id().unwrap(); + let (stdout, stderr, exit_code) = tokio::join!( read_std(child.stdout), read_std(child.stderr), diff --git a/crates/runc/src/io.rs b/crates/runc/src/io.rs index d95c8b14..a4d4c39f 100644 --- a/crates/runc/src/io.rs +++ b/crates/runc/src/io.rs @@ -75,6 +75,9 @@ pub trait Io: Debug + Send + Sync { /// Only close write side (should be stdout/err "from" runc process) fn close_after_start(&self); + + /// Close read side + fn close_all_sid(&self); } #[derive(Debug, Clone)] @@ -227,7 +230,7 @@ impl Io for PipedIo { if let Some(p) = self.stderr.as_ref() { let pw = p.wr.try_clone()?; - cmd.stdout(pw); + cmd.stderr(pw); } Ok(()) @@ -242,6 +245,17 @@ impl Io for PipedIo { nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e)); } } + + fn close_all_sid(&self) { + if let Some(p) = self.stdout.as_ref() { + debug!("close pipe read from stdout"); + nix::unistd::close(p.rd.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e)); + } + if let Some(p) = self.stderr.as_ref() { + debug!("close pipe read from stderr"); + nix::unistd::close(p.rd.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e)); + } + } } /// IO driver to direct output/error messages to /dev/null. @@ -273,6 +287,8 @@ impl Io for NullIo { let mut m = self.dev_null.lock().unwrap(); let _ = m.take(); } + + fn close_all_sid(&self) {} } /// Io driver based on Stdio::inherited(), to direct outputs/errors to stdio. @@ -296,6 +312,8 @@ impl Io for InheritedStdIo { } fn close_after_start(&self) {} + + fn close_all_sid(&self) {} } /// Io driver based on Stdio::piped(), to capture outputs/errors from runC. @@ -319,6 +337,8 @@ impl Io for PipedStdIo { } fn close_after_start(&self) {} + + fn close_all_sid(&self) {} } /// FIFO for the scenario that set FIFO for command Io. @@ -353,6 +373,8 @@ impl Io for FIFO { } fn close_after_start(&self) {} + + fn close_all_sid(&self) {} } #[cfg(test)] diff --git a/crates/runc/src/lib.rs b/crates/runc/src/lib.rs index 8b9fd67a..25b1e694 100644 --- a/crates/runc/src/lib.rs +++ b/crates/runc/src/lib.rs @@ -383,7 +383,6 @@ impl Runc { Ok(()) }); } - let (status, pid, stdout, stderr) = self.spawner.execute(cmd).await?; if status.success() { let output = if combined_output { @@ -425,6 +424,7 @@ impl Runc { } args.push(id.to_string()); let mut cmd = self.command(&args)?; + match opts { Some(CreateOpts { io: Some(io), .. }) => { io.set(&mut cmd).map_err(Error::UnavailableIO)?; @@ -618,6 +618,7 @@ impl Spawner for DefaultExecutor { let mut cmd = cmd; let child = cmd.spawn().map_err(Error::ProcessSpawnFailed)?; let pid = child.id().unwrap(); + let result = child .wait_with_output() .await