From 2376d526949e02cfde85d91aa271719defea93e0 Mon Sep 17 00:00:00 2001 From: Son Luong Ngoc Date: Mon, 8 Dec 2025 00:54:13 +0100 Subject: [PATCH 1/2] re_grpc: wire up OSS RequestMetadata In open source remote execution setup, request metdata let the server group cache requests using a build (invocation) and build targets. --- .../src/actions/impls/run.rs | 6 +++ app/buck2_build_api/src/actions.rs | 1 + .../execute/action_execution_target.rs | 18 ++++++++ .../src/actions/execute/action_executor.rs | 2 + .../src/execute/cache_uploader.rs | 2 + app/buck2_execute/src/execute/target.rs | 15 +++++++ app/buck2_execute/src/re/action_identity.rs | 16 +++++++ app/buck2_execute/src/re/client.rs | 17 +++++-- app/buck2_execute/src/re/metadata.rs | 6 ++- app/buck2_execute/src/re/uploader.rs | 19 ++++++-- .../src/executors/action_cache.rs | 10 ++++- .../src/executors/caching.rs | 28 +++++++++--- app/buck2_execute_impl/src/executors/re.rs | 8 +++- app/buck2_test/src/orchestrator.rs | 25 +++++++++++ remote_execution/oss/re_grpc/src/client.rs | 44 ++++++++++++++----- remote_execution/oss/re_grpc/src/metadata.rs | 5 +++ 16 files changed, 194 insertions(+), 28 deletions(-) diff --git a/app/buck2_action_impl/src/actions/impls/run.rs b/app/buck2_action_impl/src/actions/impls/run.rs index 12fdcf9602713..99de66692229b 100644 --- a/app/buck2_action_impl/src/actions/impls/run.rs +++ b/app/buck2_action_impl/src/actions/impls/run.rs @@ -381,6 +381,7 @@ enum ExecuteResult { executor_preference: ExecutorPreference, action_and_blobs: ActionDigestAndBlobs, input_files_bytes: u64, + request: CommandExecutionRequest, }, } @@ -1131,6 +1132,7 @@ impl RunAction { executor_preference: req.executor_preference, action_and_blobs, input_files_bytes: req.paths().input_files_bytes(), + request: req, }) } @@ -1446,6 +1448,7 @@ impl Action for RunAction { executor_preference, action_and_blobs, input_files_bytes, + request, ) = match self.execute_inner(ctx).await? { ExecuteResult::LocalDepFileHit(outputs, metadata) => { return Ok((outputs, metadata)); @@ -1456,12 +1459,14 @@ impl Action for RunAction { executor_preference, action_and_blobs, input_files_bytes, + request, } => ( result, dep_file_bundle, executor_preference, action_and_blobs, input_files_bytes, + request, ), }; @@ -1489,6 +1494,7 @@ impl Action for RunAction { let re_result = result.action_result.take(); let upload_result = ctx .cache_upload( + &request, &action_and_blobs, &result, re_result, diff --git a/app/buck2_build_api/src/actions.rs b/app/buck2_build_api/src/actions.rs index 7278cfe78a021..95bb521a30ec8 100644 --- a/app/buck2_build_api/src/actions.rs +++ b/app/buck2_build_api/src/actions.rs @@ -262,6 +262,7 @@ pub trait ActionExecutionCtx: Send + Sync { async fn cache_upload( &mut self, + request: &CommandExecutionRequest, action: &ActionDigestAndBlobs, execution_result: &CommandExecutionResult, re_result: Option, diff --git a/app/buck2_build_api/src/actions/execute/action_execution_target.rs b/app/buck2_build_api/src/actions/execute/action_execution_target.rs index 4933c652fc0fb..3f029a1ab133b 100644 --- a/app/buck2_build_api/src/actions/execute/action_execution_target.rs +++ b/app/buck2_build_api/src/actions/execute/action_execution_target.rs @@ -86,4 +86,22 @@ impl CommandExecutionTarget for ActionExecutionTarget<'_> { identifier: self.action.identifier().unwrap_or("").to_owned(), } } + + fn action_mnemonic(&self) -> Option { + Some(self.action.category().as_str().to_owned()) + } + + fn target_label(&self) -> Option { + self.action + .owner() + .unpack_target_label() + .map(ToString::to_string) + } + + fn configuration_hash(&self) -> Option { + self.action + .owner() + .unpack_target_label() + .map(|label| label.cfg().output_hash().as_str().to_owned()) + } } diff --git a/app/buck2_build_api/src/actions/execute/action_executor.rs b/app/buck2_build_api/src/actions/execute/action_executor.rs index 36288d11a4151..c4dcf9ad346de 100644 --- a/app/buck2_build_api/src/actions/execute/action_executor.rs +++ b/app/buck2_build_api/src/actions/execute/action_executor.rs @@ -593,6 +593,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { async fn cache_upload( &mut self, + request: &CommandExecutionRequest, action_digest_and_blobs: &ActionDigestAndBlobs, execution_result: &CommandExecutionResult, re_result: Option, @@ -608,6 +609,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { digest_config: self.digest_config(), mergebase: self.mergebase().0.as_ref(), re_platform: self.re_platform(), + paths: request.paths(), }, execution_result, re_result, diff --git a/app/buck2_execute/src/execute/cache_uploader.rs b/app/buck2_execute/src/execute/cache_uploader.rs index b40f978ec2c0c..f9774aa5a4a18 100644 --- a/app/buck2_execute/src/execute/cache_uploader.rs +++ b/app/buck2_execute/src/execute/cache_uploader.rs @@ -17,6 +17,7 @@ use remote_execution::TActionResult2; use crate::digest_config::DigestConfig; use crate::execute::action_digest_and_blobs::ActionDigestAndBlobs; use crate::execute::dep_file_digest::DepFileDigest; +use crate::execute::request::CommandExecutionPaths; use crate::execute::result::CommandExecutionResult; use crate::execute::target::CommandExecutionTarget; use crate::materialize::materializer::Materializer; @@ -26,6 +27,7 @@ pub struct CacheUploadInfo<'a> { pub digest_config: DigestConfig, pub mergebase: &'a Option, pub re_platform: &'a remote_execution::Platform, + pub paths: &'a CommandExecutionPaths, } #[async_trait] diff --git a/app/buck2_execute/src/execute/target.rs b/app/buck2_execute/src/execute/target.rs index 59972168e486d..131546bbd3dda 100644 --- a/app/buck2_execute/src/execute/target.rs +++ b/app/buck2_execute/src/execute/target.rs @@ -18,4 +18,19 @@ pub trait CommandExecutionTarget: Send + Sync + Debug { fn as_proto_action_key(&self) -> buck2_data::ActionKey; fn as_proto_action_name(&self) -> buck2_data::ActionName; + + /// Optional mnemonic describing the action kind (e.g. `CxxCompile`). + fn action_mnemonic(&self) -> Option { + None + } + + /// Optional configured target label this action belongs to. + fn target_label(&self) -> Option { + None + } + + /// Optional hash identifying the build configuration of the target. + fn configuration_hash(&self) -> Option { + None + } } diff --git a/app/buck2_execute/src/re/action_identity.rs b/app/buck2_execute/src/re/action_identity.rs index 382d45d8a8caa..837ef5d678767 100644 --- a/app/buck2_execute/src/re/action_identity.rs +++ b/app/buck2_execute/src/re/action_identity.rs @@ -28,6 +28,14 @@ pub struct ReActionIdentity<'a> { /// Details about the action collected while uploading pub paths: &'a CommandExecutionPaths, + /// Optional action id (usually the action digest hash) used for request metadata. + pub action_id: Option, + + /// Optional action mnemonic, target label and configuration hash used for OSS RE metadata. + pub action_mnemonic: Option, + pub target_label: Option, + pub configuration_hash: Option, + //// Trace ID which started the execution of this action, to be added on the RE side pub trace_id: TraceId, } @@ -37,6 +45,7 @@ impl<'a> ReActionIdentity<'a> { target: &'a dyn CommandExecutionTarget, executor_action_key: Option<&str>, paths: &'a CommandExecutionPaths, + action_id: Option, ) -> Self { let mut action_key = target.re_action_key(); if let Some(executor_action_key) = executor_action_key { @@ -44,12 +53,19 @@ impl<'a> ReActionIdentity<'a> { } let trace_id = get_dispatcher().trace_id().to_owned(); + let action_mnemonic = target.action_mnemonic(); + let target_label = target.target_label(); + let configuration_hash = target.configuration_hash(); Self { _target: target, action_key, affinity_key: target.re_affinity_key(), paths, + action_id, + action_mnemonic, + target_label, + configuration_hash, trace_id, } } diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index cc065551367c8..a01fed31a2e66 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -923,13 +923,16 @@ impl RemoteExecutionClientImpl { action_digest: ActionDigest, use_case: RemoteExecutorUseCase, ) -> buck2_error::Result> { + let mut metadata = use_case.metadata(None); + metadata.action_id = Some(action_digest.raw_digest().to_string()); + let res = with_error_handler( "action_cache", self.get_session_id(), self.client() .get_action_cache_client() .get_action_result( - use_case.metadata(None), + metadata, ActionResultRequest { digest: action_digest.to_re(), ..Default::default() @@ -1298,6 +1301,7 @@ impl RemoteExecutionClientImpl { ..Default::default() }), respect_file_symlinks: Some(self.respect_file_symlinks), + action_id: Some(action_digest.raw_digest().to_string()), ..use_case.metadata(Some(identity)) }; @@ -1376,13 +1380,17 @@ impl RemoteExecutionClientImpl { return Ok((Vec::new(), TLocalCacheStats::default())); } let expected_blobs = digests.len(); + let mut metadata = use_case.metadata(identity); + metadata.action_id = identity + .and_then(|id| id.action_id.clone()) + .or(metadata.action_id); let response = with_error_handler( "download_typed_blobs", self.get_session_id(), self.client() .get_cas_client() .download( - use_case.metadata(identity), + metadata, DownloadRequest { inlined_digests: Some(digests), ..Default::default() @@ -1420,13 +1428,15 @@ impl RemoteExecutionClientImpl { use_case: RemoteExecutorUseCase, ) -> buck2_error::Result<(Vec, TLocalCacheStats)> { let re_action = format!("download_blob for digest {digest}"); + let mut metadata = use_case.metadata(None); + metadata.action_id = Some(digest.hash.clone()); let response = with_error_handler( re_action.as_str(), self.get_session_id(), self.client() .get_cas_client() .download( - use_case.metadata(None), + metadata, DownloadRequest { inlined_digests: Some(vec![digest.clone()]), ..Default::default() @@ -1601,6 +1611,7 @@ impl RemoteExecutionClientImpl { attributes, ..Default::default() }), + action_id: Some(digest.raw_digest().to_string()), ..use_case.metadata(None) }, WriteActionResultRequest { diff --git a/app/buck2_execute/src/re/metadata.rs b/app/buck2_execute/src/re/metadata.rs index 47d0bfe189d44..c10d441e3b882 100644 --- a/app/buck2_execute/src/re/metadata.rs +++ b/app/buck2_execute/src/re/metadata.rs @@ -30,14 +30,18 @@ impl RemoteExecutionMetadataExt for RemoteExecutorUseCase { RemoteExecutionMetadata { use_case_id: self.as_str().to_owned(), buck_info: Some(BuckInfo { - build_id: trace_id, + build_id: trace_id.clone(), ..Default::default() }), + correlated_invocations_id: Some(trace_id), action_history_info: identity.map(|identity| ActionHistoryInfo { action_key: identity.action_key.clone(), disable_retry_on_oom: false, ..Default::default() }), + action_mnemonic: identity.and_then(|identity| identity.action_mnemonic.clone()), + target_id: identity.and_then(|identity| identity.target_label.clone()), + configuration_id: identity.and_then(|identity| identity.configuration_hash.clone()), ..Default::default() } } diff --git a/app/buck2_execute/src/re/uploader.rs b/app/buck2_execute/src/re/uploader.rs index 8449911e4c0a8..fc6900797f0a5 100644 --- a/app/buck2_execute/src/re/uploader.rs +++ b/app/buck2_execute/src/re/uploader.rs @@ -175,7 +175,10 @@ impl Uploader { }) } else { let client = client.clone(); - let metadata = use_case.metadata(identity); + let mut metadata = use_case.metadata(identity); + if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + metadata.action_id = Some(id); + } let request = GetDigestsTtlRequest { digests: input_digests.iter().map(|d| d.to_re()).collect(), ..Default::default() @@ -432,12 +435,17 @@ impl Uploader { // Upload if !upload_files.is_empty() || !upload_blobs.is_empty() { + let mut metadata = use_case.metadata(identity); + if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + metadata.action_id = Some(id); + } with_error_handler( "upload", client.get_session_id(), - client.get_raw_re_client() + client + .get_raw_re_client() .upload( - use_case.metadata(identity), + metadata, UploadRequest { files_with_digest: Some(upload_files), inlined_blobs_with_digest: Some(upload_blobs), @@ -666,7 +674,10 @@ fn query_digest_ttls<'s>( input_digests: Vec, ) -> BoxFuture<'s, buck2_error::Result>> { let client = client.dupe(); - let metadata = use_case.metadata(identity); + let mut metadata = use_case.metadata(identity); + if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + metadata.action_id = Some(id); + } let request = GetDigestsTtlRequest { digests: input_digests.iter().map(|d| d.to_re()).collect(), ..Default::default() diff --git a/app/buck2_execute_impl/src/executors/action_cache.rs b/app/buck2_execute_impl/src/executors/action_cache.rs index 839bc7d9c3887..68c4f9a370bf5 100644 --- a/app/buck2_execute_impl/src/executors/action_cache.rs +++ b/app/buck2_execute_impl/src/executors/action_cache.rs @@ -107,7 +107,12 @@ async fn query_action_cache_and_download_result( ) .await; - let identity = None; // TODO(#503): implement this + let identity = Some(ReActionIdentity::new( + command.target, + re_action_key.as_deref(), + command.request.paths(), + Some(action_digest.raw_digest().to_string()), + )); if upload_all_actions { match re_client .upload( @@ -116,7 +121,7 @@ async fn query_action_cache_and_download_result( action_blobs, ProjectRelativePath::empty(), request.paths().input_directory(), - identity, + identity.as_ref(), digest_config, deduplicate_get_digests_ttl_calls, ) @@ -167,6 +172,7 @@ async fn query_action_cache_and_download_result( command.target, re_action_key.as_deref(), command.request.paths(), + Some(action_digest.raw_digest().to_string()), ); let response = ActionCacheResult(response, cache_type.to_proto()); diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 4e3a51721c0db..358cdd4de3679 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -22,9 +22,9 @@ use buck2_directory::directory::entry::DirectoryEntry; use buck2_error::BuckErrorContext; use buck2_events::dispatch::span_async; use buck2_execute::digest::CasDigestToReExt; -use buck2_execute::digest_config::DigestConfig; use buck2_execute::directory::ActionDirectoryMember; use buck2_execute::directory::directory_to_re_tree; +use buck2_execute::execute::action_digest::ActionDigest; use buck2_execute::execute::action_digest_and_blobs::ActionDigestAndBlobs; use buck2_execute::execute::blobs::ActionBlobs; use buck2_execute::execute::cache_uploader::CacheUploadInfo; @@ -33,6 +33,7 @@ use buck2_execute::execute::cache_uploader::IntoRemoteDepFile; use buck2_execute::execute::cache_uploader::UploadCache; use buck2_execute::execute::result::CommandExecutionResult; use buck2_execute::materialize::materializer::Materializer; +use buck2_execute::re::action_identity::ReActionIdentity; use buck2_execute::re::client::ActionCacheWriteType; use buck2_execute::re::error::RemoteExecutionError; use buck2_execute::re::manager::ManagedRemoteExecutionClient; @@ -119,7 +120,7 @@ impl CacheUploader { name: Some(info.target.as_proto_action_name()), action_digest: digest_str.clone(), }, - async { + async move { let mut file_digests = Vec::new(); let mut tree_digests = Vec::new(); @@ -150,10 +151,11 @@ impl CacheUploader { // upload ActionResult to ActionCache let result: TActionResult2 = match self .upload_files_and_directories( + info, result, &mut file_digests, &mut tree_digests, - info.digest_config, + &digest, ) .await? { @@ -315,15 +317,20 @@ impl CacheUploader { async fn upload_files_and_directories( &self, + info: &CacheUploadInfo<'_>, result: &CommandExecutionResult, file_digests: &mut Vec, tree_digests: &mut Vec, - digest_config: DigestConfig, + action_digest: &ActionDigest, ) -> buck2_error::Result> { + let digest_config = info.digest_config; let mut upload_futs = vec![]; let mut output_files: Vec = Vec::new(); let mut output_directories: Vec = Vec::new(); + // Precompute the action_id string once since it's the same for all directory uploads. + let action_id = action_digest.raw_digest().to_string(); + for output_result in result.resolve_outputs(&self.artifact_fs) { let (output, value) = output_result?; match value.entry().as_ref() { @@ -379,7 +386,16 @@ impl CacheUploader { ..Default::default() }); - let identity = None; // TODO(#503): implement this + // ReActionIdentity contains references so it cannot be moved into the async + // block. Create it inside the closure instead. The action_id is precomputed + // above to avoid repeated string allocations. + let identity = ReActionIdentity::new( + info.target, + None, // re_action_key not available in cache upload context + info.paths, + Some(action_id.clone()), + ); + let fut = async move { self.re_client .upload( @@ -388,7 +404,7 @@ impl CacheUploader { &action_blobs, output.path(), &d.dupe().as_immutable(), - identity, + Some(&identity), digest_config, self.deduplicate_get_digests_ttl_calls, ) diff --git a/app/buck2_execute_impl/src/executors/re.rs b/app/buck2_execute_impl/src/executors/re.rs index 48a4f50fa4eb9..ecee300bf8747 100644 --- a/app/buck2_execute_impl/src/executors/re.rs +++ b/app/buck2_execute_impl/src/executors/re.rs @@ -373,8 +373,12 @@ impl PreparedCommandExecutor for ReExecutor { )?; } - let identity = - ReActionIdentity::new(*target, self.re_action_key.as_deref(), request.paths()); + let identity = ReActionIdentity::new( + *target, + self.re_action_key.as_deref(), + request.paths(), + Some(action_and_blobs.action.raw_digest().to_string()), + ); // TODO(bobyf, torozco): remote execution probably needs to explicitly handle cancellations let manager = self diff --git a/app/buck2_test/src/orchestrator.rs b/app/buck2_test/src/orchestrator.rs index ac51f7ff533e8..621318a015fe4 100644 --- a/app/buck2_test/src/orchestrator.rs +++ b/app/buck2_test/src/orchestrator.rs @@ -1091,6 +1091,7 @@ impl BuckTestOrchestrator<'_> { digest_config, mergebase: &None, re_platform: executor.re_platform(), + paths: request.paths(), }; let _result = match executor .cache_upload( @@ -2200,6 +2201,18 @@ impl CommandExecutionTarget for TestTarget<'_> { identifier: "".to_owned(), } } + + fn action_mnemonic(&self) -> Option { + Some("test".to_owned()) + } + + fn target_label(&self) -> Option { + Some(self.target.to_string()) + } + + fn configuration_hash(&self) -> Option { + Some(self.target.cfg().output_hash().as_str().to_owned()) + } } fn create_action_key_suffix(stage: &TestStage) -> String { @@ -2253,6 +2266,18 @@ impl CommandExecutionTarget for LocalResourceTarget<'_> { identifier: "".to_owned(), } } + + fn action_mnemonic(&self) -> Option { + Some("setup_local_resource".to_owned()) + } + + fn target_label(&self) -> Option { + Some(self.target.to_string()) + } + + fn configuration_hash(&self) -> Option { + Some(self.target.cfg().output_hash().as_str().to_owned()) + } } struct TestExecutor { diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index e9011a0c4dff2..6e2cb925059e1 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -94,6 +94,7 @@ use tonic::transport::Channel; use tonic::transport::Identity; use tonic::transport::Uri; use tonic::transport::channel::ClientTlsConfig; +use uuid::Uuid; use crate::error::*; use crate::metadata::*; @@ -1744,20 +1745,43 @@ fn with_re_metadata( .insert_bin("re-metadata-bin", MetadataValue::from_bytes(&encoded)); } else { let mut encoded = Vec::new(); + let RemoteExecutionMetadata { + correlated_invocations_id, + buck_info, + action_id, + action_mnemonic, + target_id, + configuration_id, + .. + } = metadata; + + let correlated_invocations_id = correlated_invocations_id + .or_else(|| buck_info.as_ref().map(|b| b.build_id.clone())) + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let (tool_invocation_id, tool_version) = match buck_info { + Some(buck_info) => { + let tool_version = if buck_info.version.is_empty() { + "dev".to_owned() + } else { + buck_info.version + }; + (buck_info.build_id, tool_version) + } + None => (String::new(), "dev".to_owned()), + }; + RequestMetadata { tool_details: Some(ToolDetails { tool_name: "buck2".to_owned(), - // TODO(#503): Pull the BuckVersion::get_unique_id() from BuckDaemon - tool_version: "0.1.0".to_owned(), + tool_version, }), - action_id: "".to_owned(), - tool_invocation_id: metadata - .buck_info - .map_or(String::new(), |buck_info| buck_info.build_id), - correlated_invocations_id: "".to_owned(), - action_mnemonic: "".to_owned(), - target_id: "".to_owned(), - configuration_id: "".to_owned(), + action_id: action_id.unwrap_or_default(), + tool_invocation_id, + correlated_invocations_id, + action_mnemonic: action_mnemonic.unwrap_or_default(), + target_id: target_id.unwrap_or_default(), + configuration_id: configuration_id.unwrap_or_default(), } .encode(&mut encoded) .expect("Encoding into a Vec cannot not fail"); diff --git a/remote_execution/oss/re_grpc/src/metadata.rs b/remote_execution/oss/re_grpc/src/metadata.rs index f4c30fedd6835..dffa5b8f6c4c2 100644 --- a/remote_execution/oss/re_grpc/src/metadata.rs +++ b/remote_execution/oss/re_grpc/src/metadata.rs @@ -42,5 +42,10 @@ pub struct RemoteExecutionMetadata { pub do_not_cache: bool, pub respect_file_symlinks: Option, pub client_context: Option, + pub action_id: Option, + pub action_mnemonic: Option, + pub target_id: Option, + pub configuration_id: Option, + pub correlated_invocations_id: Option, pub _dot_dot: (), } From 8193bf9e2414bcca796ab6bbd8b75ade6e89a862 Mon Sep 17 00:00:00 2001 From: Son Luong Ngoc Date: Tue, 9 Dec 2025 14:23:16 +0100 Subject: [PATCH 2/2] ReActionIdentity: make target and paths optional Instead of making 'identity' optional in some of the request paths, make the fields inside ReActionIdentity optional instead. This enables us to mandate 'identity' for all call RBE client consumer. We simplify the creation of the identity by introducing a minimal constructor to use in different places with insufficient data to build a full one. This change is based on https://github.com/facebook/buck2/pull/1169 --- app/buck2_build_api/src/materialize.rs | 7 ++++- app/buck2_execute/src/re/action_identity.rs | 24 ++++++++++++--- app/buck2_execute/src/re/client.rs | 16 ++++++++-- app/buck2_execute/src/re/manager.rs | 4 ++- app/buck2_execute/src/re/uploader.rs | 20 ++++++------- .../src/executors/action_cache.rs | 13 ++------- .../action_cache_upload_permission_checker.rs | 14 +++++++-- .../src/executors/caching.rs | 29 ++++++++++++++++++- app/buck2_execute_impl/src/executors/re.rs | 2 +- 9 files changed, 96 insertions(+), 33 deletions(-) diff --git a/app/buck2_build_api/src/materialize.rs b/app/buck2_build_api/src/materialize.rs index b31b20a599414..ccc15d85e231e 100644 --- a/app/buck2_build_api/src/materialize.rs +++ b/app/buck2_build_api/src/materialize.rs @@ -19,6 +19,7 @@ use buck2_common::legacy_configs::key::BuckconfigKeyRef; use buck2_common::legacy_configs::view::LegacyBuckConfigView; use buck2_core::execution_types::executor_config::RemoteExecutorUseCase; use buck2_core::fs::project_rel_path::ProjectRelativePath; +use buck2_directory::directory::fingerprinted_directory::FingerprintedDirectory; use buck2_error::BuckErrorContext; use buck2_execute::artifact::artifact_dyn::ArtifactDyn; use buck2_execute::artifact_utils::ArtifactValueBuilder; @@ -27,6 +28,7 @@ use buck2_execute::digest_config::HasDigestConfig; use buck2_execute::directory::ActionDirectoryBuilder; use buck2_execute::execute::blobs::ActionBlobs; use buck2_execute::materialize::materializer::HasMaterializer; +use buck2_execute::re::action_identity::ReActionIdentity; use dashmap::DashSet; use dice::DiceComputations; use dice::UserComputationData; @@ -190,7 +192,10 @@ async fn ensure_uploaded( &ActionBlobs::new(digest_config), ProjectRelativePath::empty(), &dir, - None, + &ReActionIdentity::minimal( + dir.fingerprint().raw_digest().to_string(), + Some(dir.fingerprint().raw_digest().to_string()), + ), digest_config, ctx.per_transaction_data() .get_run_action_knobs() diff --git a/app/buck2_execute/src/re/action_identity.rs b/app/buck2_execute/src/re/action_identity.rs index 837ef5d678767..e92fe7bdc8774 100644 --- a/app/buck2_execute/src/re/action_identity.rs +++ b/app/buck2_execute/src/re/action_identity.rs @@ -17,7 +17,7 @@ use crate::execute::target::CommandExecutionTarget; pub struct ReActionIdentity<'a> { /// This is currently unused, but historically it has been useful to add logging in the RE /// client, so it's worth keeping around. - _target: &'a dyn CommandExecutionTarget, + _target: Option<&'a dyn CommandExecutionTarget>, /// Actions with the same action key share e.g. memory requirements learnt by RE. pub action_key: String, @@ -26,7 +26,7 @@ pub struct ReActionIdentity<'a> { pub affinity_key: String, /// Details about the action collected while uploading - pub paths: &'a CommandExecutionPaths, + pub paths: Option<&'a CommandExecutionPaths>, /// Optional action id (usually the action digest hash) used for request metadata. pub action_id: Option, @@ -58,10 +58,10 @@ impl<'a> ReActionIdentity<'a> { let configuration_hash = target.configuration_hash(); Self { - _target: target, + _target: Some(target), action_key, affinity_key: target.re_affinity_key(), - paths, + paths: Some(paths), action_id, action_mnemonic, target_label, @@ -69,4 +69,20 @@ impl<'a> ReActionIdentity<'a> { trace_id, } } + + /// Create a minimal identity for operations that don't have a full action context, + /// such as permission checks. + pub fn minimal(action_key: String, action_id: Option) -> Self { + Self { + _target: None, + action_key, + affinity_key: String::new(), + paths: None, + action_id, + action_mnemonic: None, + target_label: None, + configuration_hash: None, + trace_id: get_dispatcher().trace_id().to_owned(), + } + } } diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index a01fed31a2e66..df1833197c19b 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -238,7 +238,7 @@ impl RemoteExecutionClient { dir_path: &ProjectRelativePath, input_dir: &ActionImmutableDirectory, use_case: RemoteExecutorUseCase, - identity: Option<&ReActionIdentity<'_>>, + identity: &ReActionIdentity<'_>, digest_config: DigestConfig, deduplicate_get_digests_ttl_calls: bool, ) -> buck2_error::Result { @@ -268,6 +268,7 @@ impl RemoteExecutionClient { directories: Vec, inlined_blobs_with_digest: Vec, use_case: RemoteExecutorUseCase, + identity: &ReActionIdentity<'_>, ) -> buck2_error::Result<()> { self.data .uploads @@ -276,6 +277,7 @@ impl RemoteExecutionClient { directories, inlined_blobs_with_digest, use_case, + identity, )) .await } @@ -960,14 +962,19 @@ impl RemoteExecutionClientImpl { directories: Vec, inlined_blobs_with_digest: Vec, use_case: RemoteExecutorUseCase, + identity: &ReActionIdentity<'_>, ) -> buck2_error::Result<()> { + let mut metadata = use_case.metadata(Some(identity)); + if let Some(action_id) = identity.action_id.as_ref() { + metadata.action_id = Some(action_id.clone()); + } with_error_handler( "upload_files_and_directories", self.get_session_id(), self.client() .get_cas_client() .upload( - use_case.metadata(None), + metadata, UploadRequest { files_with_digest: Some(files_with_digest), inlined_blobs_with_digest: Some(inlined_blobs_with_digest), @@ -1329,7 +1336,10 @@ impl RemoteExecutionClientImpl { host_runtime_requirements: THostRuntimeRequirements { platform: re_platform(platform), host_resource_requirements: THostResourceRequirements { - input_files_bytes: identity.paths.input_files_bytes() as i64, + input_files_bytes: identity + .paths + .map(|p| p.input_files_bytes() as i64) + .unwrap_or(0), resource_units: re_resource_units.unwrap_or_default(), ..Default::default() }, diff --git a/app/buck2_execute/src/re/manager.rs b/app/buck2_execute/src/re/manager.rs index 6fa2e8d1dd2cf..16caae8e441ac 100644 --- a/app/buck2_execute/src/re/manager.rs +++ b/app/buck2_execute/src/re/manager.rs @@ -379,7 +379,7 @@ impl ManagedRemoteExecutionClient { blobs: &ActionBlobs, dir_path: &ProjectRelativePath, input_dir: &ActionImmutableDirectory, - identity: Option<&ReActionIdentity<'_>>, + identity: &ReActionIdentity<'_>, digest_config: DigestConfig, deduplicate_get_digests_ttl_calls: bool, ) -> buck2_error::Result { @@ -405,6 +405,7 @@ impl ManagedRemoteExecutionClient { files_with_digest: Vec, directories: Vec, inlined_blobs_with_digest: Vec, + identity: &ReActionIdentity<'_>, ) -> buck2_error::Result<()> { self.lock()? .get() @@ -414,6 +415,7 @@ impl ManagedRemoteExecutionClient { directories, inlined_blobs_with_digest, self.use_case, + identity, ) .await } diff --git a/app/buck2_execute/src/re/uploader.rs b/app/buck2_execute/src/re/uploader.rs index fc6900797f0a5..1635fc1bced7f 100644 --- a/app/buck2_execute/src/re/uploader.rs +++ b/app/buck2_execute/src/re/uploader.rs @@ -82,7 +82,7 @@ impl Uploader { input_dir: &'a ActionImmutableDirectory, blobs: &'a ActionBlobs, use_case: &RemoteExecutorUseCase, - identity: Option<&ReActionIdentity<'_>>, + identity: &ReActionIdentity<'_>, digest_config: DigestConfig, deduplicate_get_digests_ttl_calls: bool, ) -> buck2_error::Result<( @@ -175,8 +175,8 @@ impl Uploader { }) } else { let client = client.clone(); - let mut metadata = use_case.metadata(identity); - if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + let mut metadata = use_case.metadata(Some(identity)); + if let Some(id) = identity.action_id.clone() { metadata.action_id = Some(id); } let request = GetDigestsTtlRequest { @@ -236,7 +236,7 @@ impl Uploader { input_dir: &ActionImmutableDirectory, blobs: &ActionBlobs, use_case: RemoteExecutorUseCase, - identity: Option<&ReActionIdentity<'_>>, + identity: &ReActionIdentity<'_>, digest_config: DigestConfig, deduplicate_get_digests_ttl_calls: bool, ) -> buck2_error::Result { @@ -435,8 +435,8 @@ impl Uploader { // Upload if !upload_files.is_empty() || !upload_blobs.is_empty() { - let mut metadata = use_case.metadata(identity); - if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + let mut metadata = use_case.metadata(Some(identity)); + if let Some(id) = identity.action_id.clone() { metadata.action_id = Some(id); } with_error_handler( @@ -597,7 +597,7 @@ impl<'s> GetDigestsTtlDeduper<'s> { deduper: &'s Mutex, client: &'a RemoteExecutionClient, use_case: RemoteExecutorUseCase, - identity: Option<&'a ReActionIdentity<'a>>, + identity: &'a ReActionIdentity<'a>, digest_config: DigestConfig, digests: impl IntoIterator, ) -> ( @@ -669,13 +669,13 @@ fn query_digest_ttls<'s>( request_id: RequestId, client: &RemoteExecutionClient, use_case: RemoteExecutorUseCase, - identity: Option<&ReActionIdentity<'_>>, + identity: &ReActionIdentity<'_>, digest_config: DigestConfig, input_digests: Vec, ) -> BoxFuture<'s, buck2_error::Result>> { let client = client.dupe(); - let mut metadata = use_case.metadata(identity); - if let Some(id) = identity.and_then(|id| id.action_id.clone()) { + let mut metadata = use_case.metadata(Some(identity)); + if let Some(id) = identity.action_id.clone() { metadata.action_id = Some(id); } let request = GetDigestsTtlRequest { diff --git a/app/buck2_execute_impl/src/executors/action_cache.rs b/app/buck2_execute_impl/src/executors/action_cache.rs index 68c4f9a370bf5..86199351166e3 100644 --- a/app/buck2_execute_impl/src/executors/action_cache.rs +++ b/app/buck2_execute_impl/src/executors/action_cache.rs @@ -107,12 +107,12 @@ async fn query_action_cache_and_download_result( ) .await; - let identity = Some(ReActionIdentity::new( + let identity = ReActionIdentity::new( command.target, re_action_key.as_deref(), command.request.paths(), Some(action_digest.raw_digest().to_string()), - )); + ); if upload_all_actions { match re_client .upload( @@ -121,7 +121,7 @@ async fn query_action_cache_and_download_result( action_blobs, ProjectRelativePath::empty(), request.paths().input_directory(), - identity.as_ref(), + &identity, digest_config, deduplicate_get_digests_ttl_calls, ) @@ -168,13 +168,6 @@ async fn query_action_cache_and_download_result( } }; - let identity = ReActionIdentity::new( - command.target, - re_action_key.as_deref(), - command.request.paths(), - Some(action_digest.raw_digest().to_string()), - ); - let response = ActionCacheResult(response, cache_type.to_proto()); let res = download_action_results( request, diff --git a/app/buck2_execute_impl/src/executors/action_cache_upload_permission_checker.rs b/app/buck2_execute_impl/src/executors/action_cache_upload_permission_checker.rs index bbc3a25be1929..fbce463af7808 100644 --- a/app/buck2_execute_impl/src/executors/action_cache_upload_permission_checker.rs +++ b/app/buck2_execute_impl/src/executors/action_cache_upload_permission_checker.rs @@ -18,6 +18,7 @@ use buck2_error::BuckErrorContext; use buck2_execute::digest_config::DigestConfig; use buck2_execute::re::client::ActionCacheWriteType; use buck2_execute::re::error::RemoteExecutionError; +use buck2_execute::re::action_identity::ReActionIdentity; use buck2_execute::re::manager::ManagedRemoteExecutionClient; use dashmap::DashMap; use dupe::Dupe; @@ -58,9 +59,18 @@ impl ActionCacheUploadPermissionChecker { ) -> buck2_error::Result> { let (action, action_result) = empty_action_result(platform, digest_config)?; - // This is CAS upload, if it fails, something is very broken. + // This is CAS upload for permission check with a synthetic empty action. + let identity = ReActionIdentity::minimal( + "CASPermCheck".to_owned(), + Some("CASPermCheck".to_owned()), + ); re_client - .upload_files_and_directories(Vec::new(), Vec::new(), action.blobs.to_inlined_blobs()) + .upload_files_and_directories( + Vec::new(), + Vec::new(), + action.blobs.to_inlined_blobs(), + &identity, + ) .await?; // This operation requires permission to write. diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 358cdd4de3679..920d04ffe4a3f 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -137,6 +137,13 @@ impl CacheUploader { return Ok(rejected); } + let identity = ReActionIdentity::new( + info.target, + None, // re_action_key not available in cache upload context + info.paths, + Some(digest.raw_digest().to_string()), + ); + // upload Action to CAS. // This is necessary when writing to the ActionCache through CAS, since CAS needs to inspect the Action related to the ActionResult. // Without storing the Action itself to CAS, ActionCache writes would fail. @@ -145,6 +152,7 @@ impl CacheUploader { vec![], vec![], action_digest_and_blobs.blobs.to_inlined_blobs(), + &identity, ) .await?; @@ -254,6 +262,13 @@ impl CacheUploader { }; action_result.execution_metadata.auxiliary_metadata = vec![dep_file_tany]; + let identity = ReActionIdentity::new( + info.target, + None, // re_action_key not available in cache upload context + info.paths, + Some(digest.raw_digest().to_string()), + ); + // upload Action to CAS. // This is necessary when writing to the ActionCache through CAS, since CAS needs to inspect the Action related to the ActionResult. // Without storing the Action itself to CAS, ActionCache writes would fail. @@ -262,6 +277,7 @@ impl CacheUploader { vec![], vec![], remote_dep_file_action.blobs.to_inlined_blobs(), + &identity, ) .await?; @@ -350,6 +366,16 @@ impl CacheUploader { ..Default::default() }); + // ReActionIdentity contains references so it cannot be moved into the async + // block. Create it inside the closure instead. The action_id is precomputed + // above to avoid repeated string allocations. + let identity = ReActionIdentity::new( + info.target, + None, // re_action_key not available in cache upload context + info.paths, + Some(action_id.clone()), + ); + let fut = async move { let name = self .artifact_fs @@ -367,6 +393,7 @@ impl CacheUploader { }], vec![], vec![], + &identity, ) .await }; @@ -404,7 +431,7 @@ impl CacheUploader { &action_blobs, output.path(), &d.dupe().as_immutable(), - Some(&identity), + &identity, digest_config, self.deduplicate_get_digests_ttl_calls, ) diff --git a/app/buck2_execute_impl/src/executors/re.rs b/app/buck2_execute_impl/src/executors/re.rs index ecee300bf8747..ae7d89b183cb6 100644 --- a/app/buck2_execute_impl/src/executors/re.rs +++ b/app/buck2_execute_impl/src/executors/re.rs @@ -110,7 +110,7 @@ impl ReExecutor { blobs, ProjectRelativePath::empty(), paths.input_directory(), - Some(identity), + identity, digest_config, self.deduplicate_get_digests_ttl_calls, )