Skip to content

Commit

Permalink
Change fifo to pipe in shim like go shim
Browse files Browse the repository at this point in the history
add io monitor to inspect io fd change

Signed-off-by: jokemanfire <[email protected]>
  • Loading branch information
jokemanfire committed Sep 30, 2024
1 parent 217f0ee commit 75f933d
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 33 deletions.
3 changes: 3 additions & 0 deletions crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ license.workspace = true
repository.workspace = true
homepage.workspace = true


[[bin]]
# Overwrite the binary name so it can be referred as "io.containerd.runc.v2-rs" from containerd.
# Note: the runtime's binary name must start with "io.containerd.runc" in order to
Expand All @@ -36,10 +37,12 @@ serde.workspace = true
serde_json.workspace = true
time.workspace = true
uuid.workspace = true
lazy_static = "1.4.0"

# Async dependencies
async-trait.workspace = true
tokio = { workspace = true, features = ["full"] }
futures.workspace = true

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs.workspace = true
Expand Down
35 changes: 23 additions & 12 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use std::{

use containerd_shim::{
api::{ExecProcessRequest, Options},
io_error, other, other_error,
util::IntoOption,
Error,
io_error, other, other_error, Error,
};
use log::{debug, warn};
use nix::{
Expand All @@ -43,7 +41,7 @@ use nix::{
};
use oci_spec::runtime::{LinuxNamespaceType, Spec};
use runc::{
io::{Io, NullIo, FIFO},
io::{IOOption, Io, NullIo, PipedIo},
options::GlobalOpts,
Runc, Spawner,
};
Expand Down Expand Up @@ -76,8 +74,8 @@ pub struct ProcessIO {

pub fn create_io(
id: &str,
_io_uid: u32,
_io_gid: u32,
io_uid: u32,
io_gid: u32,
stdio: &Stdio,
) -> containerd_shim::Result<ProcessIO> {
let mut pio = ProcessIO::default();
Expand All @@ -100,19 +98,32 @@ pub fn create_io(

if scheme == FIFO_SCHEME {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),

// let io = FIFO {
// stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
// stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
// stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
// };
// pio.copy = false;

if stdio.stdin.is_empty() {
debug!("stdin is empty");
}
let opts = IOOption {
open_stdin: !stdio.stdin.is_empty(),
open_stdout: !stdio.stdout.is_empty(),
open_stderr: !stdio.stderr.is_empty(),
};
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
pio.copy = true;

pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::{
sync::oneshot::{channel, Receiver, Sender},
};

use crate::io::Stdio;
use crate::{common::ProcessIO, io::Stdio};

#[async_trait]
pub trait Process {
Expand Down Expand Up @@ -71,6 +71,7 @@ pub struct ProcessTemplate<S> {
pub state: Status,
pub id: String,
pub stdio: Stdio,
pub io: Option<Arc<ProcessIO>>,
pub pid: i32,
pub exit_code: i32,
pub exited_at: Option<OffsetDateTime>,
Expand All @@ -86,6 +87,7 @@ impl<S> ProcessTemplate<S> {
state: Status::CREATED,
id: id.to_string(),
stdio,
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down
133 changes: 123 additions & 10 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use oci_spec::runtime::{LinuxResources, Process};
use runc::{Command, Runc, Spawner};
use tokio::{
fs::{remove_file, File, OpenOptions},
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, BufReader},
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader},
};

use super::{
Expand All @@ -63,6 +63,7 @@ use crate::{
CreateConfig, Log, ProcessIO, ShimExecutor, INIT_PID_FILE, LOG_JSON_FILE,
},
io::Stdio,
service::add_monitor_io,
};

pub type ExecProcess = ProcessTemplate<RuncExecLifecycle>;
Expand Down Expand Up @@ -163,8 +164,10 @@ impl RuncFactory {
(Some(s), None)
} else {
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
create_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
create_opts.io = ref_pio.io.clone();
init.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};

let resp = init
Expand All @@ -178,6 +181,22 @@ impl RuncFactory {
}
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
}
if !init.stdio.stdin.is_empty() {
let stdin_clone = init.stdio.stdin.clone();
let stdin_w = init.stdin.clone();
// Open the write side in advance to make sure read side will not block,
// open it in another thread otherwise it will block too.
tokio::spawn(async move {
if let Ok(stdin_w_file) = OpenOptions::new()
.write(true)
.open(stdin_clone.as_str())
.await
{
let mut lock_guard = stdin_w.lock().unwrap();
*lock_guard = Some(stdin_w_file);
}
});
}
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
init.pid = pid;
Expand Down Expand Up @@ -232,6 +251,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
stderr: req.stderr.to_string(),
terminal: req.terminal,
},
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down Expand Up @@ -299,6 +319,15 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
);
}
}
// close pipe read
if !p.stdio.is_null() {
if let Some(c) = p.io.clone() {
if let Some(io) = c.io.clone() {
io.close_read_side();
}
}
}
debug!("Do close io complete");
self.exit_signal.signal();
Ok(())
}
Expand Down Expand Up @@ -394,8 +423,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
(Some(s), None)
} else {
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
exec_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
exec_opts.io = ref_pio.io.clone();
p.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};
//TODO checkpoint support
let exec_result = self
Expand Down Expand Up @@ -457,6 +488,15 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {

async fn delete(&self, p: &mut ExecProcess) -> Result<()> {
self.exit_signal.signal();
//close pipe read
if !p.stdio.is_null() {
if let Some(c) = p.io.clone() {
if let Some(io) = c.io.clone() {
io.close_read_side();
}
}
}
debug!("Do close io complete");
let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id));
remove_file(exec_pid_path).await.unwrap_or_default();
Ok(())
Expand Down Expand Up @@ -495,7 +535,7 @@ async fn copy_console(
.open(stdio.stdin.as_str())
.await
.map_err(io_error!(e, "failed to open stdin"))?;
spawn_copy(stdin, console_stdin, exit_signal.clone(), None::<fn()>);
spawn_copy_no_recvs(stdin, console_stdin, exit_signal.clone(), None::<fn()>);
}

if !stdio.stdout.is_empty() {
Expand All @@ -516,7 +556,7 @@ async fn copy_console(
.open(stdio.stdout.as_str())
.await
.map_err(io_error!(e, "open stdout for read"))?;
spawn_copy(
spawn_copy_no_recvs(
console_stdout,
stdout,
exit_signal,
Expand All @@ -535,6 +575,10 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
if !pio.copy {
return Ok(());
};
let mut rs = add_monitor_io(pio.io.clone().unwrap()).await;
//change owner of rs stdin no need spawn_copy
let stdout_recv = rs.remove(0);
let stderr_recv = rs.remove(0);
if let Some(io) = &pio.io {
if let Some(w) = io.stdin() {
debug!("copy_io: pipe stdin from {}", stdio.stdin.as_str());
Expand All @@ -544,7 +588,7 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
.open(stdio.stdin.as_str())
.await
.map_err(io_error!(e, "open stdin"))?;
spawn_copy(stdin, w, exit_signal.clone(), None::<fn()>);
spawn_copy_no_recvs(stdin, w, exit_signal.clone(), None::<fn()>);
}
}

Expand All @@ -568,8 +612,10 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
stdout,
exit_signal.clone(),
Some(move || {
debug!("stdout exit.....................");
drop(stdout_r);
}),
stdout_recv,
);
}
}
Expand All @@ -594,8 +640,10 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
stderr,
exit_signal,
Some(move || {
debug!("stderr exit.....................");
drop(stderr_r);
}),
stderr_recv,
);
}
}
Expand All @@ -604,7 +652,71 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
Ok(())
}

fn spawn_copy<R, W, F>(from: R, to: W, exit_signal: Arc<ExitSignal>, on_close: Option<F>)
fn spawn_copy<R, W, F>(
from: R,
to: W,
exit_signal: Arc<ExitSignal>,
on_close: Option<F>,
mut r: tokio::sync::mpsc::Receiver<i64>,
) where
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
F: FnOnce() + Send + 'static,
{
let mut src = from;
let mut dst = to;

tokio::spawn(async move {
let mut buffer: Vec<u8> = vec![0u8; 1024];
//Change to loop and use time out, to make sure the read_buf will not hangon forever
loop {
let mut if_cn = true;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
tokio::select! {
_ = exit_signal.wait() =>{
debug!("container exit");
if_cn = false;
}
r = src.read_buf(&mut buffer) => {
match r{
//Read n=0 but read_buf not close means pipe close
Ok(n) => {
if n == 0{
if_cn = false;
}else{
//Need sure the dist write complete?
let d_w = dst.write_all(&buffer).await;
if d_w.is_err(){
if_cn = false;
}
buffer.clear();
}
},
Err(_) => {
debug!("read exit");
if_cn = false;
},
}
}
c = r.recv() =>{
debug!("fd error io exit!! recv {:?}",c);
if_cn = false;
}
}
});
//Timeout will continue unitl recv the io close
let _ = result.await;
if !if_cn {
break;
}
}
if let Some(f) = on_close {
f();
}
});
}

fn spawn_copy_no_recvs<R, W, F>(from: R, to: W, exit_signal: Arc<ExitSignal>, on_close: Option<F>)
where
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
Expand Down Expand Up @@ -632,7 +744,7 @@ where
async fn copy_io_or_console<P>(
p: &mut ProcessTemplate<P>,
socket: Option<ConsoleSocket>,
pio: Option<ProcessIO>,
pio: Option<Arc<ProcessIO>>,
exit_signal: Arc<ExitSignal>,
) -> Result<()> {
if p.stdio.terminal {
Expand Down Expand Up @@ -670,6 +782,7 @@ impl Spawner for ShimExecutor {
}
};
let pid = child.id().unwrap();

let (stdout, stderr, exit_code) = tokio::join!(
read_std(child.stdout),
read_std(child.stderr),
Expand Down
Loading

0 comments on commit 75f933d

Please sign in to comment.