Skip to content

Commit

Permalink
Change fifo to pipe in shim like go shim
Browse files Browse the repository at this point in the history
add io monitor to inspect io fd change

Signed-off-by: jokemanfire <[email protected]>
  • Loading branch information
jokemanfire committed Oct 10, 2024
1 parent 217f0ee commit a2ec4fb
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ simple_logger = { version = "5.0", default-features = false }
tempfile = "3.6"
thiserror = "1.0"
time = { version = "0.3.29", features = ["serde", "std", "formatting"] }
tokio = "1.26"
tokio = "1.40"
tonic = "0.11"
tonic-build = "0.11"
tower = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ license.workspace = true
repository.workspace = true
homepage.workspace = true


[[bin]]
# Overwrite the binary name so it can be referred as "io.containerd.runc.v2-rs" from containerd.
# Note: the runtime's binary name must start with "io.containerd.runc" in order to
Expand All @@ -36,10 +37,12 @@ serde.workspace = true
serde_json.workspace = true
time.workspace = true
uuid.workspace = true
lazy_static = "1.4.0"

# Async dependencies
async-trait.workspace = true
tokio = { workspace = true, features = ["full"] }
futures.workspace = true

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs.workspace = true
Expand Down
35 changes: 23 additions & 12 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use std::{

use containerd_shim::{
api::{ExecProcessRequest, Options},
io_error, other, other_error,
util::IntoOption,
Error,
io_error, other, other_error, Error,
};
use log::{debug, warn};
use nix::{
Expand All @@ -43,7 +41,7 @@ use nix::{
};
use oci_spec::runtime::{LinuxNamespaceType, Spec};
use runc::{
io::{Io, NullIo, FIFO},
io::{IOOption, Io, NullIo, PipedIo},
options::GlobalOpts,
Runc, Spawner,
};
Expand Down Expand Up @@ -76,8 +74,8 @@ pub struct ProcessIO {

pub fn create_io(
id: &str,
_io_uid: u32,
_io_gid: u32,
io_uid: u32,
io_gid: u32,
stdio: &Stdio,
) -> containerd_shim::Result<ProcessIO> {
let mut pio = ProcessIO::default();
Expand All @@ -100,19 +98,32 @@ pub fn create_io(

if scheme == FIFO_SCHEME {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),

// let io = FIFO {
// stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
// stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
// stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
// };
// pio.copy = false;

if stdio.stdin.is_empty() {
debug!("stdin is empty");
}
let opts = IOOption {
open_stdin: !stdio.stdin.is_empty(),
open_stdout: !stdio.stdout.is_empty(),
open_stderr: !stdio.stderr.is_empty(),
};
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
pio.copy = true;

pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::{
sync::oneshot::{channel, Receiver, Sender},
};

use crate::io::Stdio;
use crate::{common::ProcessIO, io::Stdio};

#[async_trait]
pub trait Process {
Expand Down Expand Up @@ -71,6 +71,7 @@ pub struct ProcessTemplate<S> {
pub state: Status,
pub id: String,
pub stdio: Stdio,
pub io: Option<Arc<ProcessIO>>,
pub pid: i32,
pub exit_code: i32,
pub exited_at: Option<OffsetDateTime>,
Expand All @@ -86,6 +87,7 @@ impl<S> ProcessTemplate<S> {
state: Status::CREATED,
id: id.to_string(),
stdio,
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down
36 changes: 31 additions & 5 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl RuncFactory {
(Some(s), None)
} else {
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
create_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
create_opts.io = ref_pio.io.clone();
init.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};

let resp = init
Expand All @@ -178,6 +180,22 @@ impl RuncFactory {
}
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
}
if !init.stdio.stdin.is_empty() {
let stdin_clone = init.stdio.stdin.clone();
let stdin_w = init.stdin.clone();
// Open the write side in advance to make sure read side will not block,
// open it in another thread otherwise it will block too.
tokio::spawn(async move {
if let Ok(stdin_w_file) = OpenOptions::new()
.write(true)
.open(stdin_clone.as_str())
.await
{
let mut lock_guard = stdin_w.lock().unwrap();
*lock_guard = Some(stdin_w_file);
}
});
}
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
init.pid = pid;
Expand Down Expand Up @@ -232,6 +250,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
stderr: req.stderr.to_string(),
terminal: req.terminal,
},
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down Expand Up @@ -299,6 +318,8 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
);
}
}

debug!("Do close io complete");
self.exit_signal.signal();
Ok(())
}
Expand Down Expand Up @@ -394,8 +415,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
(Some(s), None)
} else {
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
exec_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
exec_opts.io = ref_pio.io.clone();
p.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};
//TODO checkpoint support
let exec_result = self
Expand Down Expand Up @@ -457,6 +480,8 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {

async fn delete(&self, p: &mut ExecProcess) -> Result<()> {
self.exit_signal.signal();

debug!("Do close io complete");
let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id));
remove_file(exec_pid_path).await.unwrap_or_default();
Ok(())
Expand Down Expand Up @@ -632,7 +657,7 @@ where
async fn copy_io_or_console<P>(
p: &mut ProcessTemplate<P>,
socket: Option<ConsoleSocket>,
pio: Option<ProcessIO>,
pio: Option<Arc<ProcessIO>>,
exit_signal: Arc<ExitSignal>,
) -> Result<()> {
if p.stdio.terminal {
Expand Down Expand Up @@ -670,6 +695,7 @@ impl Spawner for ShimExecutor {
}
};
let pid = child.id().unwrap();

let (stdout, stderr, exit_code) = tokio::join!(
read_std(child.stdout),
read_std(child.stderr),
Expand Down
8 changes: 5 additions & 3 deletions crates/runc-shim/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ use containerd_shim::{
event::Event,
io_error,
monitor::{Subject, Topic},
protos::{events::task::TaskExit, protobuf::MessageDyn},
protos::{events::task::TaskExit, protobuf::MessageDyn, ttrpc::context::with_timeout},
util::{
convert_to_timestamp, read_options, read_pid_from_file, read_runtime, read_spec, timestamp,
write_str_to_file,
},
Config, Context, DeleteResponse, Error, Flags, StartOpts,
Config, DeleteResponse, Error, Flags, StartOpts,
};

use log::{debug, error, warn};

use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::{
Expand Down Expand Up @@ -219,7 +221,7 @@ async fn forward(
tokio::spawn(async move {
while let Some((topic, e)) = rx.recv().await {
publisher
.publish(Context::default(), &topic, &ns, e)
.publish(with_timeout(5000000000), &topic, &ns, e)
.await
.unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e));
}
Expand Down
Loading

0 comments on commit a2ec4fb

Please sign in to comment.