Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
src/run/runner/wall_time/golang/testdata/fuego.txt filter=lfs diff=lfs merge=lfs -text
src/run/runner/wall_time/golang/testdata/simple.txt filter=lfs diff=lfs merge=lfs -text
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
lfs: true
- uses: moonrepo/setup-rust@v1
- run: cargo test --all
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ debugid = "0.8.0"
memmap2 = "0.9.5"
nix = { version = "0.29.0", features = ["fs", "user"] }
futures = "0.3.31"
statrs = { version = "0.18.0", default-features = false }

[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.17.0"
Expand Down
113 changes: 97 additions & 16 deletions src/run/runner/helpers/run_command_with_log_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,37 @@ use std::future::Future;
use std::io::{Read, Write};
use std::process::Command;
use std::process::ExitStatus;
use std::sync::{Arc, Mutex};
use std::thread;

struct CmdRunnerOptions<F> {
on_process_spawned: Option<F>,
capture_stdout: bool,
}

impl<F> Default for CmdRunnerOptions<F> {
fn default() -> Self {
Self {
on_process_spawned: None,
capture_stdout: false,
}
}
}

/// Run a command and log its output to stdout and stderr
///
/// # Arguments
/// - `cmd`: The command to run.
/// - `cb`: A callback function that takes the process ID and returns a result.
/// - `options`: Configuration options for the runner (e.g. capture output, run callback)
///
/// # Returns
///
/// The exit status of the command.
///
pub async fn run_command_with_log_pipe_and_callback<F, Fut>(
/// A tuple containing:
/// - `ExitStatus`: The exit status of the executed command
/// - `Option<String>`: Captured stdout if `capture_stdout` was true, otherwise None
async fn run_command_with_log_pipe_and_options<F, Fut>(
mut cmd: Command,
cb: F,
) -> Result<ExitStatus>
options: CmdRunnerOptions<F>,
) -> Result<(ExitStatus, Option<String>)>
where
F: FnOnce(u32) -> Fut,
Fut: Future<Output = anyhow::Result<()>>,
Expand All @@ -29,14 +44,23 @@ where
mut reader: impl Read,
mut writer: impl Write,
log_prefix: Option<&str>,
captured_output: Option<Arc<Mutex<Vec<u8>>>>,
) -> Result<()> {
let prefix = log_prefix.unwrap_or("");
let mut buffer = [0; 1024];
let mut capture_guard = captured_output
.as_ref()
.map(|capture| capture.lock().unwrap());
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}

if let Some(ref mut output) = capture_guard {
output.extend_from_slice(&buffer[..bytes_read]);
}

suspend_progress_bar(|| {
writer.write_all(&buffer[..bytes_read]).unwrap();
trace!(
Expand All @@ -57,19 +81,76 @@ where
.context("failed to spawn the process")?;
let stdout = process.stdout.take().expect("unable to get stdout");
let stderr = process.stderr.take().expect("unable to get stderr");
thread::spawn(move || {
log_tee(stdout, std::io::stdout(), None).unwrap();
});

thread::spawn(move || {
log_tee(stderr, std::io::stderr(), Some("[stderr]")).unwrap();
});
let captured_stdout = if options.capture_stdout {
Some(Arc::new(Mutex::new(Vec::new())))
} else {
None
};
let (stdout_handle, stderr_handle) = {
let stdout_capture = captured_stdout.clone();
let stdout_handle = thread::spawn(move || {
log_tee(stdout, std::io::stdout(), None, stdout_capture).unwrap();
});
let stderr_handle = thread::spawn(move || {
log_tee(stderr, std::io::stderr(), Some("[stderr]"), None).unwrap();
});

cb(process.id()).await?;
(stdout_handle, stderr_handle)
};

process.wait().context("failed to wait for the process")
if let Some(cb) = options.on_process_spawned {
cb(process.id()).await?;
}

let exit_status = process.wait().context("failed to wait for the process")?;
let _ = (stdout_handle.join().unwrap(), stderr_handle.join().unwrap());

let stdout_output = captured_stdout
.map(|capture| String::from_utf8_lossy(&capture.lock().unwrap()).to_string());
Ok((exit_status, stdout_output))
}

pub async fn run_command_with_log_pipe_and_callback<F, Fut>(
cmd: Command,
cb: F,
) -> Result<(ExitStatus, Option<String>)>
where
F: FnOnce(u32) -> Fut,
Fut: Future<Output = anyhow::Result<()>>,
{
run_command_with_log_pipe_and_options(
cmd,
CmdRunnerOptions {
on_process_spawned: Some(cb),
capture_stdout: false,
},
)
.await
}

pub async fn run_command_with_log_pipe(cmd: Command) -> Result<ExitStatus> {
run_command_with_log_pipe_and_callback(cmd, async |_| Ok(())).await
let (exit_status, _) = run_command_with_log_pipe_and_options(
cmd,
CmdRunnerOptions::<fn(u32) -> futures::future::Ready<anyhow::Result<()>>> {
on_process_spawned: None,
capture_stdout: false,
},
)
.await?;
Ok(exit_status)
}

pub async fn run_command_with_log_pipe_capture_stdout(
cmd: Command,
) -> Result<(ExitStatus, String)> {
let (exit_status, stdout) = run_command_with_log_pipe_and_options(
cmd,
CmdRunnerOptions::<fn(u32) -> futures::future::Ready<anyhow::Result<()>>> {
on_process_spawned: None,
capture_stdout: true,
},
)
.await?;
Ok((exit_status, stdout.unwrap_or_default()))
}
14 changes: 12 additions & 2 deletions src/run/runner/wall_time/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::run::instruments::mongo_tracer::MongoTracer;
use crate::run::runner::executor::Executor;
use crate::run::runner::helpers::env::{get_base_injected_env, is_codspeed_debug_enabled};
use crate::run::runner::helpers::get_bench_command::get_bench_command;
use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe;
use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe_capture_stdout;
use crate::run::runner::wall_time::golang;
use crate::run::runner::{ExecutorName, RunData};
use crate::run::{check_system::SystemInfo, config::Config};
use async_trait::async_trait;
Expand Down Expand Up @@ -181,7 +182,16 @@ impl Executor for WallTimeExecutor {
cmd.args(["sh", "-c", &bench_cmd]);
debug!("cmd: {cmd:?}");

run_command_with_log_pipe(cmd).await
let (status, stdout) = run_command_with_log_pipe_capture_stdout(cmd).await?;

if config.command.trim().starts_with("go test") {
let results_folder = run_data.profile_folder.join("results");
std::fs::create_dir_all(&results_folder)?;

golang::collect_walltime_results(&stdout, &results_folder)?;
}

Ok(status)
}
};

Expand Down
35 changes: 35 additions & 0 deletions src/run/runner/wall_time/golang/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::prelude::*;
use std::path::Path;

mod parser;
mod walltime_results;

pub fn collect_walltime_results(stdout: &str, dst_dir: &Path) -> Result<()> {
let benchmarks = parser::BenchmarkData::process_raw_results(parser::RawOutput::parse(stdout)?)
.into_iter()
.map(|result| {
let uri = format!("{}::{}", result.package, result.name);
walltime_results::WalltimeBenchmark::from_runtime_data(
result.name,
uri,
result.iters.into_iter().map(|i| i as u128).collect(),
result.times.into_iter().map(|t| t as u128).collect(),
None,
)
})
.collect::<Vec<_>>();
debug!("Parsed {} benchmarks", benchmarks.len());

let pid = std::process::id();
let creator = walltime_results::Creator {
name: "runner".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
pid,
};
let results = walltime_results::WalltimeResults::new(benchmarks, creator)?;

let mut file = std::fs::File::create(dst_dir.join(format!("{pid}.json")))?;
serde_json::to_writer_pretty(&mut file, &results)?;

Ok(())
}
Loading