diff --git a/DESCRIPTION b/DESCRIPTION index 5c34752b..696c6749 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -18,17 +18,22 @@ BugReports: https://github.com/r-lib/callr/issues Depends: R (>= 3.4) Imports: + otel, processx (>= 3.6.1), R6, utils Suggests: asciicast (>= 2.3.1), cli (>= 1.1.0), + otelsdk, ps, rprojroot, spelling, testthat (>= 3.2.0), withr (>= 2.3.0) +Remotes: + r-lib/otel, + r-lib/otelsdk Config/Needs/website: r-lib/asciicast, glue, diff --git a/R/callr-package.R b/R/callr-package.R index dbd2579d..93ffca7b 100644 --- a/R/callr-package.R +++ b/R/callr-package.R @@ -13,6 +13,8 @@ #' @keywords internal "_PACKAGE" +otel_tracer_name <- "org.r-lib.callr" + ## usethis namespace: start ## usethis namespace: end NULL diff --git a/R/eval.R b/R/eval.R index 70fdcad6..36916e7c 100644 --- a/R/eval.R +++ b/R/eval.R @@ -201,6 +201,13 @@ r <- function( options <- setup_callbacks(options) options <- setup_r_binary_and_args(options) + if (otel::is_tracing()) { + otel::start_span("callr::r", attributes = otel::as_attributes(options)) + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } + out <- run_r(options) get_result(output = out, options) diff --git a/R/hook.R b/R/hook.R index c5241b1b..780d7e01 100644 --- a/R/hook.R +++ b/R/hook.R @@ -6,6 +6,34 @@ common_hook <- function() { detach("tools:callr") } env <- readRDS(`__envfile__`) + + # OpenTelemetry setup + has_otel <- nzchar(Sys.getenv("TRACEPARENT")) && + requireNamespace("otel", quietly = TRUE) + assign(envir = env$`__callr_data__`, "has_otel", has_otel) + if (has_otel) { + hdrs <- as.list(c( + traceparent = Sys.getenv("TRACEPARENT"), + tracestate = Sys.getenv("TRACESTATE"), + baggage = Sys.getenv("BAGGAGE") + )) + prtctx <- otel::extract_http_context(hdrs) + reg.finalizer( + env$`__callr_data__`, + function(e) e$otel_span$end(), + onexit = TRUE + ) + assign( + envir = env$`__callr_data__`, + "otel_span", + otel::start_span( + "callr subprocess", + options = list(parent = prtctx), + scope = .GlobalEnv + ) + ) + } + do.call("attach", list(env, pos = length(search()), name = "tools:callr")) data <- env$`__callr_data__` data$pxlib <- data$load_client_lib( @@ -13,7 +41,7 @@ common_hook <- function() { data$pxdir ) options(error = function() invokeRestart("abort")) - rm(list = c("data", "env")) + rm(list = c("data", "env", "has_otel")) lapply( c( diff --git a/R/r-process.R b/R/r-process.R index 9a82cd64..31ed421c 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -55,6 +55,18 @@ rp_init <- function(self, private, super, options) { options <- setup_context(options) options <- setup_r_binary_and_args(options) + otel_session <- otel::start_session( + "callr::r_process", + attributes = otel::as_attributes(options) + ) + otel::log_debug("start r_process") + if (otel::is_tracing()) { + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } + options$otel_session <- otel_session + private$options <- options with_envvar( @@ -80,8 +92,13 @@ rp_init <- function(self, private, super, options) { rp_get_result <- function(self, private) { if (self$is_alive()) { + private$options$otel_session$add_event( + "get_result", + attributes = list(done = FALSE) + ) throw(new_error("Still alive")) } + on.exit(private$options$otel_session$end(status_code = "auto"), add = TRUE) ## This is artificial... out <- list( diff --git a/R/r-session.R b/R/r-session.R index 26419e26..e225f98a 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -215,6 +215,8 @@ r_session <- R6::R6Class( private = list( finalize = function() { + private$options$otel_session$add_event("finalizer") + private$options$otel_session$end() unlink(private$tmp_output_file) unlink(private$tmp_error_file) unlink(private$options$tmp_files, recursive = TRUE) @@ -259,6 +261,18 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { options <- setup_context(options) options <- setup_r_binary_and_args(options, script_file = FALSE) + otel_session <- otel::start_session( + "callr::r_session", + attributes = otel::as_attributes(options) + ) + if (otel::is_tracing()) { + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } + otel::log_debug("callr::r_session start") + options$otel_session <- otel_session + private$options <- options prepare_client_files() @@ -289,7 +303,7 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { private$state <- "starting" if (wait) { - otel::start_span("r_session$initialize() wait", session = otel_session) + otel::start_span("r_session$initialize() wait") timeout <- wait_timeout have_until <- Sys.time() + as.difftime(timeout / 1000, units = "secs") pr <- self$poll_io(timeout) @@ -333,6 +347,8 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { } rs_read <- function(self, private) { + otel::local_session(private$options$otel_session) + spn <- otel::start_span("r_session$read") if (!is.null(private$buffer)) { # There is a partial message in the buffer, try to finish it. out <- private$read_buffer() @@ -342,6 +358,7 @@ rs_read <- function(self, private) { } if (!length(out)) { if (processx::processx_conn_is_incomplete(private$pipe)) { + spn$set_attribute("message", FALSE) return() } if (self$is_alive()) { @@ -372,7 +389,15 @@ rs_read <- function(self, private) { ) } } - if (length(out)) private$parse_msg(out) + if (length(out)) { + spn$set_attribute("message", TRUE) + if (!is.null(out$header$code)) { + spn$set_attribute("status_code", out$header$code) + } + private$parse_msg(out) + } else { + spn$set_attribute("message", FALSE) + } } rs__read_buffer <- function(self, private) { @@ -439,6 +464,8 @@ rs__parse_header <- function(line) { } rs_close <- function(self, private, grace) { + otel::local_session(private$options$otel_session) + otel::start_span("r_session$close") processx::processx_conn_close(self$get_input_connection()) self$poll_process(grace) self$kill() @@ -451,10 +478,13 @@ rs_close <- function(self, private, grace) { processx::processx_conn_close(private$pipe) processx::processx_conn_close(self$get_output_connection()) processx::processx_conn_close(self$get_error_connection()) + private$options$otel_session$end() invisible() } rs_call <- function(self, private, func, args, package) { + otel::local_session(private$options$otel_session) + otel::start_span("r_session$call") ## We only allow a new command if the R session is idle. ## This allows keeping a clean state ## TODO: do we need a state at all? diff --git a/R/rcmd.R b/R/rcmd.R index c908cec7..d43c4379 100644 --- a/R/rcmd.R +++ b/R/rcmd.R @@ -72,6 +72,13 @@ rcmd <- function( ## This cleans up everything... on.exit(unlink(options$tmp_files, recursive = TRUE), add = TRUE) + if (otel::is_tracing()) { + otel::start_span("callr::rcmd", attributes = otel::as_attributes(options)) + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } + run_r(options) } diff --git a/R/rscript.R b/R/rscript.R index 983fd966..a4ca6116 100644 --- a/R/rscript.R +++ b/R/rscript.R @@ -46,6 +46,16 @@ rscript <- function( ## This cleans up everything... on.exit(unlink(options$tmp_files, recursive = TRUE), add = TRUE) + if (otel::is_tracing()) { + otel::start_span( + "callr::rscript", + attributes = otel::as_attributes(options) + ) + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } + invisible(run_r(options)) } diff --git a/R/run.R b/R/run.R index 72ba7937..49e62209 100644 --- a/R/run.R +++ b/R/run.R @@ -12,6 +12,8 @@ run_r <- function(options) { (!is.null(stdout) && !is.null(stderr) && stdout == stderr) ) + otel::log_debug("callr start subprocess") + res <- with( options, with_envvar( diff --git a/R/script.R b/R/script.R index 4c919fd5..95e8aa16 100644 --- a/R/script.R +++ b/R/script.R @@ -43,6 +43,9 @@ make_vanilla_script_expr <- function( e2$trace <- e2$trace[-(1:cut), ] } + if (callr_data$has_otel) { + callr_data$otel_span$record_exception(e2) + } base::saveRDS( base::list("error", e2, e), file = base::paste0(`__res__`, ".error") @@ -74,7 +77,8 @@ make_vanilla_script_expr <- function( if (messages) { message <- function() { substitute({ - pxlib <- base::as.environment("tools:callr")$`__callr_data__`$pxlib + callr_data <- base::as.environment("tools:callr")$`__callr_data__` + pxlib <- callr_data$pxlib if (base::is.null(e$code)) { e$code <- "301" } @@ -83,6 +87,12 @@ make_vanilla_script_expr <- function( pxlib$base64_encode(base::serialize(e, NULL)) ) data <- base::paste0(e$code, " ", base::nchar(msg), "\n", msg) + if (callr_data$has_otel) { + callr$data$otel_span$add_event( + "callr message", + attributes = list(status_code = e$code) + ) + } pxlib$write_fd(3L, data) if (