From 2376d526949e02cfde85d91aa271719defea93e0 Mon Sep 17 00:00:00 2001 From: Son Luong Ngoc Date: Mon, 8 Dec 2025 00:54:13 +0100 Subject: [PATCH] re_grpc: wire up OSS RequestMetadata In open source remote execution setup, request metdata let the server group cache requests using a build (invocation) and build targets. --- .../src/actions/impls/run.rs | 6 +++ app/buck2_build_api/src/actions.rs | 1 + .../execute/action_execution_target.rs | 18 ++++++++ .../src/actions/execute/action_executor.rs | 2 + .../src/execute/cache_uploader.rs | 2 + app/buck2_execute/src/execute/target.rs | 15 +++++++ app/buck2_execute/src/re/action_identity.rs | 16 +++++++ app/buck2_execute/src/re/client.rs | 17 +++++-- app/buck2_execute/src/re/metadata.rs | 6 ++- app/buck2_execute/src/re/uploader.rs | 19 ++++++-- .../src/executors/action_cache.rs | 10 ++++- .../src/executors/caching.rs | 28 +++++++++--- app/buck2_execute_impl/src/executors/re.rs | 8 +++- app/buck2_test/src/orchestrator.rs | 25 +++++++++++ remote_execution/oss/re_grpc/src/client.rs | 44 ++++++++++++++----- remote_execution/oss/re_grpc/src/metadata.rs | 5 +++ 16 files changed, 194 insertions(+), 28 deletions(-) diff --git a/app/buck2_action_impl/src/actions/impls/run.rs b/app/buck2_action_impl/src/actions/impls/run.rs index 12fdcf960271..99de66692229 100644 --- a/app/buck2_action_impl/src/actions/impls/run.rs +++ b/app/buck2_action_impl/src/actions/impls/run.rs @@ -381,6 +381,7 @@ enum ExecuteResult { executor_preference: ExecutorPreference, action_and_blobs: ActionDigestAndBlobs, input_files_bytes: u64, + request: CommandExecutionRequest, }, } @@ -1131,6 +1132,7 @@ impl RunAction { executor_preference: req.executor_preference, action_and_blobs, input_files_bytes: req.paths().input_files_bytes(), + request: req, }) } @@ -1446,6 +1448,7 @@ impl Action for RunAction { executor_preference, action_and_blobs, input_files_bytes, + request, ) = match self.execute_inner(ctx).await? { ExecuteResult::LocalDepFileHit(outputs, metadata) => { return Ok((outputs, metadata)); @@ -1456,12 +1459,14 @@ impl Action for RunAction { executor_preference, action_and_blobs, input_files_bytes, + request, } => ( result, dep_file_bundle, executor_preference, action_and_blobs, input_files_bytes, + request, ), }; @@ -1489,6 +1494,7 @@ impl Action for RunAction { let re_result = result.action_result.take(); let upload_result = ctx .cache_upload( + &request, &action_and_blobs, &result, re_result, diff --git a/app/buck2_build_api/src/actions.rs b/app/buck2_build_api/src/actions.rs index 7278cfe78a02..95bb521a30ec 100644 --- a/app/buck2_build_api/src/actions.rs +++ b/app/buck2_build_api/src/actions.rs @@ -262,6 +262,7 @@ pub trait ActionExecutionCtx: Send + Sync { async fn cache_upload( &mut self, + request: &CommandExecutionRequest, action: &ActionDigestAndBlobs, execution_result: &CommandExecutionResult, re_result: Option, diff --git a/app/buck2_build_api/src/actions/execute/action_execution_target.rs b/app/buck2_build_api/src/actions/execute/action_execution_target.rs index 4933c652fc0f..3f029a1ab133 100644 --- a/app/buck2_build_api/src/actions/execute/action_execution_target.rs +++ b/app/buck2_build_api/src/actions/execute/action_execution_target.rs @@ -86,4 +86,22 @@ impl CommandExecutionTarget for ActionExecutionTarget<'_> { identifier: self.action.identifier().unwrap_or("").to_owned(), } } + + fn action_mnemonic(&self) -> Option { + Some(self.action.category().as_str().to_owned()) + } + + fn target_label(&self) -> Option { + self.action + .owner() + .unpack_target_label() + .map(ToString::to_string) + } + + fn configuration_hash(&self) -> Option { + self.action + .owner() + .unpack_target_label() + .map(|label| label.cfg().output_hash().as_str().to_owned()) + } } diff --git a/app/buck2_build_api/src/actions/execute/action_executor.rs b/app/buck2_build_api/src/actions/execute/action_executor.rs index 36288d11a415..c4dcf9ad346d 100644 --- a/app/buck2_build_api/src/actions/execute/action_executor.rs +++ b/app/buck2_build_api/src/actions/execute/action_executor.rs @@ -593,6 +593,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { async fn cache_upload( &mut self, + request: &CommandExecutionRequest, action_digest_and_blobs: &ActionDigestAndBlobs, execution_result: &CommandExecutionResult, re_result: Option, @@ -608,6 +609,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { digest_config: self.digest_config(), mergebase: self.mergebase().0.as_ref(), re_platform: self.re_platform(), + paths: request.paths(), }, execution_result, re_result, diff --git a/app/buck2_execute/src/execute/cache_uploader.rs b/app/buck2_execute/src/execute/cache_uploader.rs index b40f978ec2c0..f9774aa5a4a1 100644 --- a/app/buck2_execute/src/execute/cache_uploader.rs +++ b/app/buck2_execute/src/execute/cache_uploader.rs @@ -17,6 +17,7 @@ use remote_execution::TActionResult2; use crate::digest_config::DigestConfig; use crate::execute::action_digest_and_blobs::ActionDigestAndBlobs; use crate::execute::dep_file_digest::DepFileDigest; +use crate::execute::request::CommandExecutionPaths; use crate::execute::result::CommandExecutionResult; use crate::execute::target::CommandExecutionTarget; use crate::materialize::materializer::Materializer; @@ -26,6 +27,7 @@ pub struct CacheUploadInfo<'a> { pub digest_config: DigestConfig, pub mergebase: &'a Option, pub re_platform: &'a remote_execution::Platform, + pub paths: &'a CommandExecutionPaths, } #[async_trait] diff --git a/app/buck2_execute/src/execute/target.rs b/app/buck2_execute/src/execute/target.rs index 59972168e486..131546bbd3dd 100644 --- a/app/buck2_execute/src/execute/target.rs +++ b/app/buck2_execute/src/execute/target.rs @@ -18,4 +18,19 @@ pub trait CommandExecutionTarget: Send + Sync + Debug { fn as_proto_action_key(&self) -> buck2_data::ActionKey; fn as_proto_action_name(&self) -> buck2_data::ActionName; + + /// Optional mnemonic describing the action kind (e.g. `CxxCompile`). + fn action_mnemonic(&self) -> Option { + None + } + + /// Optional configured target label this action belongs to. + fn target_label(&self) -> Option { + None + } + + /// Optional hash identifying the build configuration of the target. + fn configuration_hash(&self) -> Option { + None + } } diff --git a/app/buck2_execute/src/re/action_identity.rs b/app/buck2_execute/src/re/action_identity.rs index 382d45d8a8ca..837ef5d67876 100644 --- a/app/buck2_execute/src/re/action_identity.rs +++ b/app/buck2_execute/src/re/action_identity.rs @@ -28,6 +28,14 @@ pub struct ReActionIdentity<'a> { /// Details about the action collected while uploading pub paths: &'a CommandExecutionPaths, + /// Optional action id (usually the action digest hash) used for request metadata. + pub action_id: Option, + + /// Optional action mnemonic, target label and configuration hash used for OSS RE metadata. + pub action_mnemonic: Option, + pub target_label: Option, + pub configuration_hash: Option, + //// Trace ID which started the execution of this action, to be added on the RE side pub trace_id: TraceId, } @@ -37,6 +45,7 @@ impl<'a> ReActionIdentity<'a> { target: &'a dyn CommandExecutionTarget, executor_action_key: Option<&str>, paths: &'a CommandExecutionPaths, + action_id: Option, ) -> Self { let mut action_key = target.re_action_key(); if let Some(executor_action_key) = executor_action_key { @@ -44,12 +53,19 @@ impl<'a> ReActionIdentity<'a> { } let trace_id = get_dispatcher().trace_id().to_owned(); + let action_mnemonic = target.action_mnemonic(); + let target_label = target.target_label(); + let configuration_hash = target.configuration_hash(); Self { _target: target, action_key, affinity_key: target.re_affinity_key(), paths, + action_id, + action_mnemonic, + target_label, + configuration_hash, trace_id, } } diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index cc065551367c..a01fed31a2e6 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -923,13 +923,16 @@ impl RemoteExecutionClientImpl { action_digest: ActionDigest, use_case: RemoteExecutorUseCase, ) -> buck2_error::Result> { + let mut metadata = use_case.metadata(None); + metadata.action_id = Some(action_digest.raw_digest().to_string()); + let res = with_error_handler( "action_cache", self.get_session_id(), self.client() .get_action_cache_client() .get_action_result( - use_case.metadata(None), + metadata, ActionResultRequest { digest: action_digest.to_re(), ..Default::default() @@ -1298,6 +1301,7 @@ impl RemoteExecutionClientImpl { ..Default::default() }), respect_file_symlinks: Some(self.respect_file_symlinks), + action_id: Some(action_digest.raw_digest().to_string()), ..use_case.metadata(Some(identity)) }; @@ -1376,13 +1380,17 @@ impl RemoteExecutionClientImpl { return Ok((Vec::new(), TLocalCacheStats::default())); } let expected_blobs = digests.len(); + let mut metadata = use_case.metadata(identity); + metadata.action_id = identity + .and_then(|id| id.action_id.clone()) + .or(metadata.action_id); let response = with_error_handler( "download_typed_blobs", self.get_session_id(), self.client() .get_cas_client() .download( - use_case.metadata(identity), + metadata, DownloadRequest { inlined_digests: Some(digests), ..Default::default() @@ -1420,13 +1428,15 @@ impl RemoteExecutionClientImpl { use_case: RemoteExecutorUseCase, ) -> buck2_error::Result<(Vec, TLocalCacheStats)> { let re_action = format!("download_blob for digest {digest}"); + let mut metadata = use_case.metadata(None); + metadata.action_id = Some(digest.hash.clone()); let response = with_error_handler( re_action.as_str(), self.get_session_id(), self.client() .get_cas_client() .download( - use_case.metadata(None), + metadata, DownloadRequest { inlined_digests: Some(vec![digest.clone()]), ..Default::default() @@ -1601,6 +1611,7 @@ impl RemoteExecutionClientImpl { attributes, ..Default::default() }), + action_id: Some(digest.raw_digest().to_string()), ..use_case.metadata(None) }, WriteActionResultRequest { diff --git a/app/buck2_execute/src/re/metadata.rs b/app/buck2_execute/src/re/metadata.rs index 47d0bfe189d4..c10d441e3b88 100644 --- a/app/buck2_execute/src/re/metadata.rs +++ b/app/buck2_execute/src/re/metadata.rs @@ -30,14 +30,18 @@ impl RemoteExecutionMetadataExt for RemoteExecutorUseCase { RemoteExecutionMetadata { use_case_id: self.as_str().to_owned(), buck_info: Some(BuckInfo { - build_id: trace_id, + build_id: trace_id.clone(), ..Default::default() }), + correlated_invocations_id: Some(trace_id), action_history_info: identity.map(|identity| ActionHistoryInfo { action_key: identity.action_key.clone(), disable_retry_on_oom: false, ..Default::default() }), + action_mnemonic: identity.and_then(|identity| identity.action_mnemonic.clone()), + target_id: identity.and_then(|identity| identity.target_label.clone()), + configuration_id: identity.and_then(|identity| identity.configuration_hash.clone()), ..Default::default() } } diff --git a/app/buck2_execute/src/re/uploader.rs b/app/buck2_execute/src/re/uploader.rs index 8449911e4c0a..fc6900797f0a 100644 --- a/app/buck2_execute/src/re/uploader.rs +++ b/app/buck2_execute/src/re/uploader.rs @@ -175,7 +175,10 @@ impl Uploader { }) } else { let client = client.clone(); - let metadata = use_case.metadata(identity); + let mut metadata = use_case.metadata(identity); + if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + metadata.action_id = Some(id); + } let request = GetDigestsTtlRequest { digests: input_digests.iter().map(|d| d.to_re()).collect(), ..Default::default() @@ -432,12 +435,17 @@ impl Uploader { // Upload if !upload_files.is_empty() || !upload_blobs.is_empty() { + let mut metadata = use_case.metadata(identity); + if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + metadata.action_id = Some(id); + } with_error_handler( "upload", client.get_session_id(), - client.get_raw_re_client() + client + .get_raw_re_client() .upload( - use_case.metadata(identity), + metadata, UploadRequest { files_with_digest: Some(upload_files), inlined_blobs_with_digest: Some(upload_blobs), @@ -666,7 +674,10 @@ fn query_digest_ttls<'s>( input_digests: Vec, ) -> BoxFuture<'s, buck2_error::Result>> { let client = client.dupe(); - let metadata = use_case.metadata(identity); + let mut metadata = use_case.metadata(identity); + if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + metadata.action_id = Some(id); + } let request = GetDigestsTtlRequest { digests: input_digests.iter().map(|d| d.to_re()).collect(), ..Default::default() diff --git a/app/buck2_execute_impl/src/executors/action_cache.rs b/app/buck2_execute_impl/src/executors/action_cache.rs index 839bc7d9c388..68c4f9a370bf 100644 --- a/app/buck2_execute_impl/src/executors/action_cache.rs +++ b/app/buck2_execute_impl/src/executors/action_cache.rs @@ -107,7 +107,12 @@ async fn query_action_cache_and_download_result( ) .await; - let identity = None; // TODO(#503): implement this + let identity = Some(ReActionIdentity::new( + command.target, + re_action_key.as_deref(), + command.request.paths(), + Some(action_digest.raw_digest().to_string()), + )); if upload_all_actions { match re_client .upload( @@ -116,7 +121,7 @@ async fn query_action_cache_and_download_result( action_blobs, ProjectRelativePath::empty(), request.paths().input_directory(), - identity, + identity.as_ref(), digest_config, deduplicate_get_digests_ttl_calls, ) @@ -167,6 +172,7 @@ async fn query_action_cache_and_download_result( command.target, re_action_key.as_deref(), command.request.paths(), + Some(action_digest.raw_digest().to_string()), ); let response = ActionCacheResult(response, cache_type.to_proto()); diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 4e3a51721c0d..358cdd4de367 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -22,9 +22,9 @@ use buck2_directory::directory::entry::DirectoryEntry; use buck2_error::BuckErrorContext; use buck2_events::dispatch::span_async; use buck2_execute::digest::CasDigestToReExt; -use buck2_execute::digest_config::DigestConfig; use buck2_execute::directory::ActionDirectoryMember; use buck2_execute::directory::directory_to_re_tree; +use buck2_execute::execute::action_digest::ActionDigest; use buck2_execute::execute::action_digest_and_blobs::ActionDigestAndBlobs; use buck2_execute::execute::blobs::ActionBlobs; use buck2_execute::execute::cache_uploader::CacheUploadInfo; @@ -33,6 +33,7 @@ use buck2_execute::execute::cache_uploader::IntoRemoteDepFile; use buck2_execute::execute::cache_uploader::UploadCache; use buck2_execute::execute::result::CommandExecutionResult; use buck2_execute::materialize::materializer::Materializer; +use buck2_execute::re::action_identity::ReActionIdentity; use buck2_execute::re::client::ActionCacheWriteType; use buck2_execute::re::error::RemoteExecutionError; use buck2_execute::re::manager::ManagedRemoteExecutionClient; @@ -119,7 +120,7 @@ impl CacheUploader { name: Some(info.target.as_proto_action_name()), action_digest: digest_str.clone(), }, - async { + async move { let mut file_digests = Vec::new(); let mut tree_digests = Vec::new(); @@ -150,10 +151,11 @@ impl CacheUploader { // upload ActionResult to ActionCache let result: TActionResult2 = match self .upload_files_and_directories( + info, result, &mut file_digests, &mut tree_digests, - info.digest_config, + &digest, ) .await? { @@ -315,15 +317,20 @@ impl CacheUploader { async fn upload_files_and_directories( &self, + info: &CacheUploadInfo<'_>, result: &CommandExecutionResult, file_digests: &mut Vec, tree_digests: &mut Vec, - digest_config: DigestConfig, + action_digest: &ActionDigest, ) -> buck2_error::Result> { + let digest_config = info.digest_config; let mut upload_futs = vec![]; let mut output_files: Vec = Vec::new(); let mut output_directories: Vec = Vec::new(); + // Precompute the action_id string once since it's the same for all directory uploads. + let action_id = action_digest.raw_digest().to_string(); + for output_result in result.resolve_outputs(&self.artifact_fs) { let (output, value) = output_result?; match value.entry().as_ref() { @@ -379,7 +386,16 @@ impl CacheUploader { ..Default::default() }); - let identity = None; // TODO(#503): implement this + // ReActionIdentity contains references so it cannot be moved into the async + // block. Create it inside the closure instead. The action_id is precomputed + // above to avoid repeated string allocations. + let identity = ReActionIdentity::new( + info.target, + None, // re_action_key not available in cache upload context + info.paths, + Some(action_id.clone()), + ); + let fut = async move { self.re_client .upload( @@ -388,7 +404,7 @@ impl CacheUploader { &action_blobs, output.path(), &d.dupe().as_immutable(), - identity, + Some(&identity), digest_config, self.deduplicate_get_digests_ttl_calls, ) diff --git a/app/buck2_execute_impl/src/executors/re.rs b/app/buck2_execute_impl/src/executors/re.rs index 48a4f50fa4eb..ecee300bf874 100644 --- a/app/buck2_execute_impl/src/executors/re.rs +++ b/app/buck2_execute_impl/src/executors/re.rs @@ -373,8 +373,12 @@ impl PreparedCommandExecutor for ReExecutor { )?; } - let identity = - ReActionIdentity::new(*target, self.re_action_key.as_deref(), request.paths()); + let identity = ReActionIdentity::new( + *target, + self.re_action_key.as_deref(), + request.paths(), + Some(action_and_blobs.action.raw_digest().to_string()), + ); // TODO(bobyf, torozco): remote execution probably needs to explicitly handle cancellations let manager = self diff --git a/app/buck2_test/src/orchestrator.rs b/app/buck2_test/src/orchestrator.rs index ac51f7ff533e..621318a015fe 100644 --- a/app/buck2_test/src/orchestrator.rs +++ b/app/buck2_test/src/orchestrator.rs @@ -1091,6 +1091,7 @@ impl BuckTestOrchestrator<'_> { digest_config, mergebase: &None, re_platform: executor.re_platform(), + paths: request.paths(), }; let _result = match executor .cache_upload( @@ -2200,6 +2201,18 @@ impl CommandExecutionTarget for TestTarget<'_> { identifier: "".to_owned(), } } + + fn action_mnemonic(&self) -> Option { + Some("test".to_owned()) + } + + fn target_label(&self) -> Option { + Some(self.target.to_string()) + } + + fn configuration_hash(&self) -> Option { + Some(self.target.cfg().output_hash().as_str().to_owned()) + } } fn create_action_key_suffix(stage: &TestStage) -> String { @@ -2253,6 +2266,18 @@ impl CommandExecutionTarget for LocalResourceTarget<'_> { identifier: "".to_owned(), } } + + fn action_mnemonic(&self) -> Option { + Some("setup_local_resource".to_owned()) + } + + fn target_label(&self) -> Option { + Some(self.target.to_string()) + } + + fn configuration_hash(&self) -> Option { + Some(self.target.cfg().output_hash().as_str().to_owned()) + } } struct TestExecutor { diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index e9011a0c4dff..6e2cb925059e 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -94,6 +94,7 @@ use tonic::transport::Channel; use tonic::transport::Identity; use tonic::transport::Uri; use tonic::transport::channel::ClientTlsConfig; +use uuid::Uuid; use crate::error::*; use crate::metadata::*; @@ -1744,20 +1745,43 @@ fn with_re_metadata( .insert_bin("re-metadata-bin", MetadataValue::from_bytes(&encoded)); } else { let mut encoded = Vec::new(); + let RemoteExecutionMetadata { + correlated_invocations_id, + buck_info, + action_id, + action_mnemonic, + target_id, + configuration_id, + .. + } = metadata; + + let correlated_invocations_id = correlated_invocations_id + .or_else(|| buck_info.as_ref().map(|b| b.build_id.clone())) + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let (tool_invocation_id, tool_version) = match buck_info { + Some(buck_info) => { + let tool_version = if buck_info.version.is_empty() { + "dev".to_owned() + } else { + buck_info.version + }; + (buck_info.build_id, tool_version) + } + None => (String::new(), "dev".to_owned()), + }; + RequestMetadata { tool_details: Some(ToolDetails { tool_name: "buck2".to_owned(), - // TODO(#503): Pull the BuckVersion::get_unique_id() from BuckDaemon - tool_version: "0.1.0".to_owned(), + tool_version, }), - action_id: "".to_owned(), - tool_invocation_id: metadata - .buck_info - .map_or(String::new(), |buck_info| buck_info.build_id), - correlated_invocations_id: "".to_owned(), - action_mnemonic: "".to_owned(), - target_id: "".to_owned(), - configuration_id: "".to_owned(), + action_id: action_id.unwrap_or_default(), + tool_invocation_id, + correlated_invocations_id, + action_mnemonic: action_mnemonic.unwrap_or_default(), + target_id: target_id.unwrap_or_default(), + configuration_id: configuration_id.unwrap_or_default(), } .encode(&mut encoded) .expect("Encoding into a Vec cannot not fail"); diff --git a/remote_execution/oss/re_grpc/src/metadata.rs b/remote_execution/oss/re_grpc/src/metadata.rs index f4c30fedd683..dffa5b8f6c4c 100644 --- a/remote_execution/oss/re_grpc/src/metadata.rs +++ b/remote_execution/oss/re_grpc/src/metadata.rs @@ -42,5 +42,10 @@ pub struct RemoteExecutionMetadata { pub do_not_cache: bool, pub respect_file_symlinks: Option, pub client_context: Option, + pub action_id: Option, + pub action_mnemonic: Option, + pub target_id: Option, + pub configuration_id: Option, + pub correlated_invocations_id: Option, pub _dot_dot: (), }