Skip to content

Commit

Permalink
Change fifo to pipe in shim like go shim
Browse files Browse the repository at this point in the history
use tokio pipe to resovle pipe's problem

Signed-off-by: jokemanfire <[email protected]>
  • Loading branch information
jokemanfire committed Oct 12, 2024
1 parent 217f0ee commit ddd9ac8
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ simple_logger = { version = "5.0", default-features = false }
tempfile = "3.6"
thiserror = "1.0"
time = { version = "0.3.29", features = ["serde", "std", "formatting"] }
tokio = "1.26"
tokio = "1.40"
tonic = "0.11"
tonic-build = "0.11"
tower = "0.4"
Expand Down
1 change: 1 addition & 0 deletions crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ license.workspace = true
repository.workspace = true
homepage.workspace = true


[[bin]]
# Overwrite the binary name so it can be referred as "io.containerd.runc.v2-rs" from containerd.
# Note: the runtime's binary name must start with "io.containerd.runc" in order to
Expand Down
35 changes: 23 additions & 12 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use std::{

use containerd_shim::{
api::{ExecProcessRequest, Options},
io_error, other, other_error,
util::IntoOption,
Error,
io_error, other, other_error, Error,
};
use log::{debug, warn};
use nix::{
Expand All @@ -43,7 +41,7 @@ use nix::{
};
use oci_spec::runtime::{LinuxNamespaceType, Spec};
use runc::{
io::{Io, NullIo, FIFO},
io::{IOOption, Io, NullIo, PipedIo},
options::GlobalOpts,
Runc, Spawner,
};
Expand Down Expand Up @@ -76,8 +74,8 @@ pub struct ProcessIO {

pub fn create_io(
id: &str,
_io_uid: u32,
_io_gid: u32,
io_uid: u32,
io_gid: u32,
stdio: &Stdio,
) -> containerd_shim::Result<ProcessIO> {
let mut pio = ProcessIO::default();
Expand All @@ -100,19 +98,32 @@ pub fn create_io(

if scheme == FIFO_SCHEME {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),

// let io = FIFO {
// stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
// stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
// stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
// };
// pio.copy = false;

if stdio.stdin.is_empty() {
debug!("stdin is empty");
}
let opts = IOOption {
open_stdin: !stdio.stdin.is_empty(),
open_stdout: !stdio.stdout.is_empty(),
open_stderr: !stdio.stderr.is_empty(),
};
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
pio.copy = true;

pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::{
sync::oneshot::{channel, Receiver, Sender},
};

use crate::io::Stdio;
use crate::{common::ProcessIO, io::Stdio};

#[async_trait]
pub trait Process {
Expand Down Expand Up @@ -71,6 +71,7 @@ pub struct ProcessTemplate<S> {
pub state: Status,
pub id: String,
pub stdio: Stdio,
pub io: Option<Arc<ProcessIO>>,
pub pid: i32,
pub exit_code: i32,
pub exited_at: Option<OffsetDateTime>,
Expand All @@ -86,6 +87,7 @@ impl<S> ProcessTemplate<S> {
state: Status::CREATED,
id: id.to_string(),
stdio,
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down
33 changes: 28 additions & 5 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl RuncFactory {
(Some(s), None)
} else {
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
create_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
create_opts.io = ref_pio.io.clone();
init.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};

let resp = init
Expand All @@ -178,6 +180,22 @@ impl RuncFactory {
}
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
}
if !init.stdio.stdin.is_empty() {
let stdin_clone = init.stdio.stdin.clone();
let stdin_w = init.stdin.clone();
// Open the write side in advance to make sure read side will not block,
// open it in another thread otherwise it will block too.
tokio::spawn(async move {
if let Ok(stdin_w_file) = OpenOptions::new()
.write(true)
.open(stdin_clone.as_str())
.await
{
let mut lock_guard = stdin_w.lock().unwrap();
*lock_guard = Some(stdin_w_file);
}
});
}
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
init.pid = pid;
Expand Down Expand Up @@ -232,6 +250,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
stderr: req.stderr.to_string(),
terminal: req.terminal,
},
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down Expand Up @@ -299,6 +318,7 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
);
}
}

self.exit_signal.signal();
Ok(())
}
Expand Down Expand Up @@ -394,8 +414,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
(Some(s), None)
} else {
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
exec_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
exec_opts.io = ref_pio.io.clone();
p.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};
//TODO checkpoint support
let exec_result = self
Expand Down Expand Up @@ -632,7 +654,7 @@ where
async fn copy_io_or_console<P>(
p: &mut ProcessTemplate<P>,
socket: Option<ConsoleSocket>,
pio: Option<ProcessIO>,
pio: Option<Arc<ProcessIO>>,
exit_signal: Arc<ExitSignal>,
) -> Result<()> {
if p.stdio.terminal {
Expand Down Expand Up @@ -670,6 +692,7 @@ impl Spawner for ShimExecutor {
}
};
let pid = child.id().unwrap();

let (stdout, stderr, exit_code) = tokio::join!(
read_std(child.stdout),
read_std(child.stderr),
Expand Down
Loading

0 comments on commit ddd9ac8

Please sign in to comment.