diff --git a/Cargo.toml b/Cargo.toml index 8733101e..b284897c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ simple_logger = { version = "5.0", default-features = false } tempfile = "3.6" thiserror = "1.0" time = { version = "0.3.29", features = ["serde", "std", "formatting"] } -tokio = "1.26" +tokio = "1.40" tonic = "0.11" tonic-build = "0.11" tower = "0.4" diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index d13c52e4..451ca851 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -15,6 +15,7 @@ license.workspace = true repository.workspace = true homepage.workspace = true + [[bin]] # Overwrite the binary name so it can be referred as "io.containerd.runc.v2-rs" from containerd. # Note: the runtime's binary name must start with "io.containerd.runc" in order to @@ -36,10 +37,12 @@ serde.workspace = true serde_json.workspace = true time.workspace = true uuid.workspace = true +lazy_static = "1.4.0" # Async dependencies async-trait.workspace = true tokio = { workspace = true, features = ["full"] } +futures.workspace = true [target.'cfg(target_os = "linux")'.dependencies] cgroups-rs.workspace = true diff --git a/crates/runc-shim/src/common.rs b/crates/runc-shim/src/common.rs index 5a84e4df..ade2d88c 100644 --- a/crates/runc-shim/src/common.rs +++ b/crates/runc-shim/src/common.rs @@ -29,9 +29,7 @@ use std::{ use containerd_shim::{ api::{ExecProcessRequest, Options}, - io_error, other, other_error, - util::IntoOption, - Error, + io_error, other, other_error, Error, }; use log::{debug, warn}; use nix::{ @@ -43,7 +41,7 @@ use nix::{ }; use oci_spec::runtime::{LinuxNamespaceType, Spec}; use runc::{ - io::{Io, NullIo, FIFO}, + io::{IOOption, Io, NullIo, PipedIo}, options::GlobalOpts, Runc, Spawner, }; @@ -76,8 +74,8 @@ pub struct ProcessIO { pub fn create_io( id: &str, - _io_uid: u32, - _io_gid: u32, + io_uid: u32, + io_gid: u32, stdio: &Stdio, ) -> containerd_shim::Result { let mut pio = ProcessIO::default(); @@ -100,19 +98,32 @@ pub fn create_io( if scheme == FIFO_SCHEME { debug!( - "create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}", + "create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}", id, stdio.stdin.as_str(), stdio.stdout.as_str(), stdio.stderr.as_str() ); - let io = FIFO { - stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()), - stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()), - stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()), + + // let io = FIFO { + // stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()), + // stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()), + // stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()), + // }; + // pio.copy = false; + + if stdio.stdin.is_empty() { + debug!("stdin is empty"); + } + let opts = IOOption { + open_stdin: !stdio.stdin.is_empty(), + open_stdout: !stdio.stdout.is_empty(), + open_stderr: !stdio.stderr.is_empty(), }; + let io = PipedIo::new(io_uid, io_gid, &opts).unwrap(); + pio.copy = true; + pio.io = Some(Arc::new(io)); - pio.copy = false; } Ok(pio) } diff --git a/crates/runc-shim/src/processes.rs b/crates/runc-shim/src/processes.rs index f0b982d0..63b9f131 100644 --- a/crates/runc-shim/src/processes.rs +++ b/crates/runc-shim/src/processes.rs @@ -37,7 +37,7 @@ use tokio::{ sync::oneshot::{channel, Receiver, Sender}, }; -use crate::io::Stdio; +use crate::{common::ProcessIO, io::Stdio}; #[async_trait] pub trait Process { @@ -71,6 +71,7 @@ pub struct ProcessTemplate { pub state: Status, pub id: String, pub stdio: Stdio, + pub io: Option>, pub pid: i32, pub exit_code: i32, pub exited_at: Option, @@ -86,6 +87,7 @@ impl ProcessTemplate { state: Status::CREATED, id: id.to_string(), stdio, + io: None, pid: 0, exit_code: 0, exited_at: None, diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/runc.rs index 92bb0f23..bf3ad6c0 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/runc.rs @@ -163,8 +163,10 @@ impl RuncFactory { (Some(s), None) } else { let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?; - create_opts.io = pio.io.as_ref().cloned(); - (None, Some(pio)) + let ref_pio = Arc::new(pio); + create_opts.io = ref_pio.io.clone(); + init.io = Some(ref_pio.clone()); + (None, Some(ref_pio)) }; let resp = init @@ -178,6 +180,22 @@ impl RuncFactory { } return Err(runtime_error(bundle, e, "OCI runtime create failed").await); } + if !init.stdio.stdin.is_empty() { + let stdin_clone = init.stdio.stdin.clone(); + let stdin_w = init.stdin.clone(); + // Open the write side in advance to make sure read side will not block, + // open it in another thread otherwise it will block too. + tokio::spawn(async move { + if let Ok(stdin_w_file) = OpenOptions::new() + .write(true) + .open(stdin_clone.as_str()) + .await + { + let mut lock_guard = stdin_w.lock().unwrap(); + *lock_guard = Some(stdin_w_file); + } + }); + } copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?; let pid = read_file_to_str(pid_path).await?.parse::()?; init.pid = pid; @@ -232,6 +250,7 @@ impl ProcessFactory for RuncExecFactory { stderr: req.stderr.to_string(), terminal: req.terminal, }, + io: None, pid: 0, exit_code: 0, exited_at: None, @@ -299,6 +318,8 @@ impl ProcessLifecycle for RuncInitLifecycle { ); } } + + debug!("Do close io complete"); self.exit_signal.signal(); Ok(()) } @@ -394,8 +415,10 @@ impl ProcessLifecycle for RuncExecLifecycle { (Some(s), None) } else { let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?; - exec_opts.io = pio.io.as_ref().cloned(); - (None, Some(pio)) + let ref_pio = Arc::new(pio); + exec_opts.io = ref_pio.io.clone(); + p.io = Some(ref_pio.clone()); + (None, Some(ref_pio)) }; //TODO checkpoint support let exec_result = self @@ -457,6 +480,8 @@ impl ProcessLifecycle for RuncExecLifecycle { async fn delete(&self, p: &mut ExecProcess) -> Result<()> { self.exit_signal.signal(); + + debug!("Do close io complete"); let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id)); remove_file(exec_pid_path).await.unwrap_or_default(); Ok(()) @@ -632,7 +657,7 @@ where async fn copy_io_or_console

( p: &mut ProcessTemplate

, socket: Option, - pio: Option, + pio: Option>, exit_signal: Arc, ) -> Result<()> { if p.stdio.terminal { @@ -670,6 +695,7 @@ impl Spawner for ShimExecutor { } }; let pid = child.id().unwrap(); + let (stdout, stderr, exit_code) = tokio::join!( read_std(child.stdout), read_std(child.stderr), diff --git a/crates/runc-shim/src/service.rs b/crates/runc-shim/src/service.rs index 2decc527..ebee2542 100644 --- a/crates/runc-shim/src/service.rs +++ b/crates/runc-shim/src/service.rs @@ -27,14 +27,16 @@ use containerd_shim::{ event::Event, io_error, monitor::{Subject, Topic}, - protos::{events::task::TaskExit, protobuf::MessageDyn}, + protos::{events::task::TaskExit, protobuf::MessageDyn, ttrpc::context::with_timeout}, util::{ convert_to_timestamp, read_options, read_pid_from_file, read_runtime, read_spec, timestamp, write_str_to_file, }, - Config, Context, DeleteResponse, Error, Flags, StartOpts, + Config, DeleteResponse, Error, Flags, StartOpts, }; + use log::{debug, error, warn}; + use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::{ @@ -219,7 +221,7 @@ async fn forward( tokio::spawn(async move { while let Some((topic, e)) = rx.recv().await { publisher - .publish(Context::default(), &topic, &ns, e) + .publish(with_timeout(5000000000), &topic, &ns, e) .await .unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e)); } diff --git a/crates/runc/src/io.rs b/crates/runc/src/io.rs index d95c8b14..2e131203 100644 --- a/crates/runc/src/io.rs +++ b/crates/runc/src/io.rs @@ -13,22 +13,20 @@ See the License for the specific language governing permissions and limitations under the License. */ + #[cfg(not(feature = "async"))] use std::io::{Read, Write}; use std::{ fmt::Debug, fs::{File, OpenOptions}, io::Result, - os::unix::{fs::OpenOptionsExt, io::AsRawFd}, + os::unix::fs::OpenOptionsExt, process::Stdio, sync::Mutex, }; - -use log::debug; -use nix::unistd::{Gid, Uid}; -use os_pipe::{PipeReader, PipeWriter}; #[cfg(feature = "async")] use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::unix::pipe::{self, Receiver, Sender}; use crate::Command; @@ -100,8 +98,8 @@ impl Default for IOOption { /// When one side of the pipe is closed, the state will be represented with [`None`]. #[derive(Debug)] pub struct Pipe { - rd: PipeReader, - wr: PipeWriter, + rd: Mutex>, + wr: Mutex>, } #[derive(Debug)] @@ -113,8 +111,11 @@ pub struct PipedIo { impl Pipe { fn new() -> std::io::Result { - let (rd, wr) = os_pipe::pipe()?; - Ok(Self { rd, wr }) + let (rd, wr) = pipe::pipe()?; + Ok(Self { + rd: Mutex::new(Some(wr)), + wr: Mutex::new(Some(rd)), + }) } } @@ -128,25 +129,16 @@ impl PipedIo { } fn create_pipe( - uid: u32, - gid: u32, + _uid: u32, + _gid: u32, enabled: bool, - stdin: bool, + _stdin: bool, ) -> std::io::Result> { if !enabled { return Ok(None); } let pipe = Pipe::new()?; - let uid = Some(Uid::from_raw(uid)); - let gid = Some(Gid::from_raw(gid)); - if stdin { - let rd = pipe.rd.try_clone()?; - nix::unistd::fchown(rd.as_raw_fd(), uid, gid)?; - } else { - let wr = pipe.wr.try_clone()?; - nix::unistd::fchown(wr.as_raw_fd(), uid, gid)?; - } Ok(Some(pipe)) } } @@ -154,94 +146,140 @@ impl PipedIo { impl Io for PipedIo { #[cfg(not(feature = "async"))] fn stdin(&self) -> Option> { - self.stdin.as_ref().and_then(|pipe| { - pipe.wr - .try_clone() - .map(|x| Box::new(x) as Box) - .ok() - }) + self.stdin + .as_ref() + .map(|pipe| { + if let Ok(mut guard) = pipe.wr.try_lock() { + if let Some(writer) = guard.take() { + return Some(Box::new(writer) as Box); + } + } + None + }) + .unwrap_or(None) } #[cfg(feature = "async")] fn stdin(&self) -> Option> { - self.stdin.as_ref().and_then(|pipe| { - let fd = pipe.wr.as_raw_fd(); - tokio_pipe::PipeWrite::from_raw_fd_checked(fd) - .map(|x| Box::new(x) as Box) - .ok() - }) + self.stdin + .as_ref() + .map(|pipe| { + if let Ok(mut guard) = pipe.wr.try_lock() { + if let Some(writer) = guard.take() { + return Some(Box::new(writer) as Box); + } + } + None + }) + .unwrap_or(None) } #[cfg(not(feature = "async"))] fn stdout(&self) -> Option> { - self.stdout.as_ref().and_then(|pipe| { - pipe.rd - .try_clone() - .map(|x| Box::new(x) as Box) - .ok() - }) + self.stdout + .as_ref() + .map(|pipe| { + if let Ok(mut guard) = pipe.rd.try_lock() { + if let Some(reader) = guard.take() { + return Some(Box::new(reader) as Box); + } + } + None + }) + .unwrap_or(None) } #[cfg(feature = "async")] fn stdout(&self) -> Option> { - self.stdout.as_ref().and_then(|pipe| { - let fd = pipe.rd.as_raw_fd(); - tokio_pipe::PipeRead::from_raw_fd_checked(fd) - .map(|x| Box::new(x) as Box) - .ok() - }) + self.stdout + .as_ref() + .map(|pipe| { + if let Ok(mut guard) = pipe.rd.try_lock() { + if let Some(reader) = guard.take() { + return Some(Box::new(reader) as Box); + } + } + None + }) + .unwrap_or(None) } #[cfg(not(feature = "async"))] fn stderr(&self) -> Option> { - self.stderr.as_ref().and_then(|pipe| { - pipe.rd - .try_clone() - .map(|x| Box::new(x) as Box) - .ok() - }) + self.stderr + .as_ref() + .map(|pipe| { + if let Ok(mut guard) = pipe.rd.try_lock() { + if let Some(reader) = guard.take() { + return Some(Box::new(reader) as Box); + } + } + None + }) + .unwrap_or(None) } #[cfg(feature = "async")] fn stderr(&self) -> Option> { - self.stderr.as_ref().and_then(|pipe| { - let fd = pipe.rd.as_raw_fd(); - tokio_pipe::PipeRead::from_raw_fd_checked(fd) - .map(|x| Box::new(x) as Box) - .ok() - }) + self.stderr + .as_ref() + .map(|pipe| { + if let Ok(mut guard) = pipe.rd.try_lock() { + if let Some(reader) = guard.take() { + return Some(Box::new(reader) as Box); + } + } + None + }) + .unwrap_or(None) } // Note that this internally use [`std::fs::File`]'s `try_clone()`. // Thus, the files passed to commands will be not closed after command exit. fn set(&self, cmd: &mut Command) -> std::io::Result<()> { if let Some(p) = self.stdin.as_ref() { - let pr = p.rd.try_clone()?; - cmd.stdin(pr); + let pr = + p.rd.lock() + .map_err(|e| std::io::Error::other(e.to_string()))? + .take(); + if let Some(pr) = pr { + cmd.stdin( + pr.into_blocking_fd() + .map_err(|e| std::io::Error::other(e.to_string()))?, + ); + } } if let Some(p) = self.stdout.as_ref() { - let pw = p.wr.try_clone()?; - cmd.stdout(pw); + let pw = + p.wr.lock() + .map_err(|e| std::io::Error::other(e.to_string()))? + .take(); + if let Some(pw) = pw { + cmd.stdout( + pw.into_blocking_fd() + .map_err(|e| std::io::Error::other(e.to_string()))?, + ); + } } if let Some(p) = self.stderr.as_ref() { - let pw = p.wr.try_clone()?; - cmd.stdout(pw); + let pw = + p.wr.lock() + .map_err(|e| std::io::Error::other(e.to_string()))? + .take(); + if let Some(pw) = pw { + cmd.stderr( + pw.into_blocking_fd() + .map_err(|e| std::io::Error::other(e.to_string()))?, + ); + } } Ok(()) } - fn close_after_start(&self) { - if let Some(p) = self.stdout.as_ref() { - nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e)); - } - - if let Some(p) = self.stderr.as_ref() { - nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e)); - } - } + fn close_after_start(&self) {} } /// IO driver to direct output/error messages to /dev/null. diff --git a/crates/runc/src/lib.rs b/crates/runc/src/lib.rs index 8b9fd67a..25b1e694 100644 --- a/crates/runc/src/lib.rs +++ b/crates/runc/src/lib.rs @@ -383,7 +383,6 @@ impl Runc { Ok(()) }); } - let (status, pid, stdout, stderr) = self.spawner.execute(cmd).await?; if status.success() { let output = if combined_output { @@ -425,6 +424,7 @@ impl Runc { } args.push(id.to_string()); let mut cmd = self.command(&args)?; + match opts { Some(CreateOpts { io: Some(io), .. }) => { io.set(&mut cmd).map_err(Error::UnavailableIO)?; @@ -618,6 +618,7 @@ impl Spawner for DefaultExecutor { let mut cmd = cmd; let child = cmd.spawn().map_err(Error::ProcessSpawnFailed)?; let pid = child.id().unwrap(); + let result = child .wait_with_output() .await