Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/buck2_build_api/src/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use buck2_common::legacy_configs::key::BuckconfigKeyRef;
use buck2_common::legacy_configs::view::LegacyBuckConfigView;
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
use buck2_core::fs::project_rel_path::ProjectRelativePath;
use buck2_directory::directory::fingerprinted_directory::FingerprintedDirectory;
use buck2_error::BuckErrorContext;
use buck2_execute::artifact::artifact_dyn::ArtifactDyn;
use buck2_execute::artifact_utils::ArtifactValueBuilder;
Expand All @@ -27,6 +28,7 @@ use buck2_execute::digest_config::HasDigestConfig;
use buck2_execute::directory::ActionDirectoryBuilder;
use buck2_execute::execute::blobs::ActionBlobs;
use buck2_execute::materialize::materializer::HasMaterializer;
use buck2_execute::re::action_identity::ReActionIdentity;
use dashmap::DashSet;
use dice::DiceComputations;
use dice::UserComputationData;
Expand Down Expand Up @@ -190,7 +192,10 @@ async fn ensure_uploaded(
&ActionBlobs::new(digest_config),
ProjectRelativePath::empty(),
&dir,
None,
&ReActionIdentity::minimal(
dir.fingerprint().raw_digest().to_string(),
Some(dir.fingerprint().raw_digest().to_string()),
),
digest_config,
ctx.per_transaction_data()
.get_run_action_knobs()
Expand Down
24 changes: 20 additions & 4 deletions app/buck2_execute/src/re/action_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::execute::target::CommandExecutionTarget;
pub struct ReActionIdentity<'a> {
/// This is currently unused, but historically it has been useful to add logging in the RE
/// client, so it's worth keeping around.
_target: &'a dyn CommandExecutionTarget,
_target: Option<&'a dyn CommandExecutionTarget>,

/// Actions with the same action key share e.g. memory requirements learnt by RE.
pub action_key: String,
Expand All @@ -26,7 +26,7 @@ pub struct ReActionIdentity<'a> {
pub affinity_key: String,

/// Details about the action collected while uploading
pub paths: &'a CommandExecutionPaths,
pub paths: Option<&'a CommandExecutionPaths>,

/// Optional action id (usually the action digest hash) used for request metadata.
pub action_id: Option<String>,
Expand Down Expand Up @@ -58,15 +58,31 @@ impl<'a> ReActionIdentity<'a> {
let configuration_hash = target.configuration_hash();

Self {
_target: target,
_target: Some(target),
action_key,
affinity_key: target.re_affinity_key(),
paths,
paths: Some(paths),
action_id,
action_mnemonic,
target_label,
configuration_hash,
trace_id,
}
}

/// Create a minimal identity for operations that don't have a full action context,
/// such as permission checks.
pub fn minimal(action_key: String, action_id: Option<String>) -> Self {
Self {
_target: None,
action_key,
affinity_key: String::new(),
paths: None,
action_id,
action_mnemonic: None,
target_label: None,
configuration_hash: None,
trace_id: get_dispatcher().trace_id().to_owned(),
}
}
}
16 changes: 13 additions & 3 deletions app/buck2_execute/src/re/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl RemoteExecutionClient {
dir_path: &ProjectRelativePath,
input_dir: &ActionImmutableDirectory,
use_case: RemoteExecutorUseCase,
identity: Option<&ReActionIdentity<'_>>,
identity: &ReActionIdentity<'_>,
digest_config: DigestConfig,
deduplicate_get_digests_ttl_calls: bool,
) -> buck2_error::Result<UploadStats> {
Expand Down Expand Up @@ -268,6 +268,7 @@ impl RemoteExecutionClient {
directories: Vec<remote_execution::Path>,
inlined_blobs_with_digest: Vec<InlinedBlobWithDigest>,
use_case: RemoteExecutorUseCase,
identity: &ReActionIdentity<'_>,
) -> buck2_error::Result<()> {
self.data
.uploads
Expand All @@ -276,6 +277,7 @@ impl RemoteExecutionClient {
directories,
inlined_blobs_with_digest,
use_case,
identity,
))
.await
}
Expand Down Expand Up @@ -960,14 +962,19 @@ impl RemoteExecutionClientImpl {
directories: Vec<remote_execution::Path>,
inlined_blobs_with_digest: Vec<InlinedBlobWithDigest>,
use_case: RemoteExecutorUseCase,
identity: &ReActionIdentity<'_>,
) -> buck2_error::Result<()> {
let mut metadata = use_case.metadata(Some(identity));
if let Some(action_id) = identity.action_id.as_ref() {
metadata.action_id = Some(action_id.clone());
}
with_error_handler(
"upload_files_and_directories",
self.get_session_id(),
self.client()
.get_cas_client()
.upload(
use_case.metadata(None),
metadata,
UploadRequest {
files_with_digest: Some(files_with_digest),
inlined_blobs_with_digest: Some(inlined_blobs_with_digest),
Expand Down Expand Up @@ -1329,7 +1336,10 @@ impl RemoteExecutionClientImpl {
host_runtime_requirements: THostRuntimeRequirements {
platform: re_platform(platform),
host_resource_requirements: THostResourceRequirements {
input_files_bytes: identity.paths.input_files_bytes() as i64,
input_files_bytes: identity
.paths
.map(|p| p.input_files_bytes() as i64)
.unwrap_or(0),
resource_units: re_resource_units.unwrap_or_default(),
..Default::default()
},
Expand Down
4 changes: 3 additions & 1 deletion app/buck2_execute/src/re/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl ManagedRemoteExecutionClient {
blobs: &ActionBlobs,
dir_path: &ProjectRelativePath,
input_dir: &ActionImmutableDirectory,
identity: Option<&ReActionIdentity<'_>>,
identity: &ReActionIdentity<'_>,
digest_config: DigestConfig,
deduplicate_get_digests_ttl_calls: bool,
) -> buck2_error::Result<UploadStats> {
Expand All @@ -405,6 +405,7 @@ impl ManagedRemoteExecutionClient {
files_with_digest: Vec<NamedDigest>,
directories: Vec<remote_execution::Path>,
inlined_blobs_with_digest: Vec<InlinedBlobWithDigest>,
identity: &ReActionIdentity<'_>,
) -> buck2_error::Result<()> {
self.lock()?
.get()
Expand All @@ -414,6 +415,7 @@ impl ManagedRemoteExecutionClient {
directories,
inlined_blobs_with_digest,
self.use_case,
identity,
)
.await
}
Expand Down
20 changes: 10 additions & 10 deletions app/buck2_execute/src/re/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Uploader {
input_dir: &'a ActionImmutableDirectory,
blobs: &'a ActionBlobs,
use_case: &RemoteExecutorUseCase,
identity: Option<&ReActionIdentity<'_>>,
identity: &ReActionIdentity<'_>,
digest_config: DigestConfig,
deduplicate_get_digests_ttl_calls: bool,
) -> buck2_error::Result<(
Expand Down Expand Up @@ -175,8 +175,8 @@ impl Uploader {
})
} else {
let client = client.clone();
let mut metadata = use_case.metadata(identity);
if let Some(id) = identity.and_then(|id| id.action_id.clone()) {
let mut metadata = use_case.metadata(Some(identity));
if let Some(id) = identity.action_id.clone() {
metadata.action_id = Some(id);
}
let request = GetDigestsTtlRequest {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl Uploader {
input_dir: &ActionImmutableDirectory,
blobs: &ActionBlobs,
use_case: RemoteExecutorUseCase,
identity: Option<&ReActionIdentity<'_>>,
identity: &ReActionIdentity<'_>,
digest_config: DigestConfig,
deduplicate_get_digests_ttl_calls: bool,
) -> buck2_error::Result<UploadStats> {
Expand Down Expand Up @@ -435,8 +435,8 @@ impl Uploader {

// Upload
if !upload_files.is_empty() || !upload_blobs.is_empty() {
let mut metadata = use_case.metadata(identity);
if let Some(id) = identity.and_then(|id| id.action_id.clone()) {
let mut metadata = use_case.metadata(Some(identity));
if let Some(id) = identity.action_id.clone() {
metadata.action_id = Some(id);
}
with_error_handler(
Expand Down Expand Up @@ -597,7 +597,7 @@ impl<'s> GetDigestsTtlDeduper<'s> {
deduper: &'s Mutex<Self>,
client: &'a RemoteExecutionClient,
use_case: RemoteExecutorUseCase,
identity: Option<&'a ReActionIdentity<'a>>,
identity: &'a ReActionIdentity<'a>,
digest_config: DigestConfig,
digests: impl IntoIterator<Item = &'a TrackedFileDigest>,
) -> (
Expand Down Expand Up @@ -669,13 +669,13 @@ fn query_digest_ttls<'s>(
request_id: RequestId,
client: &RemoteExecutionClient,
use_case: RemoteExecutorUseCase,
identity: Option<&ReActionIdentity<'_>>,
identity: &ReActionIdentity<'_>,
digest_config: DigestConfig,
input_digests: Vec<TrackedFileDigest>,
) -> BoxFuture<'s, buck2_error::Result<HashMap<TrackedFileDigest, i64>>> {
let client = client.dupe();
let mut metadata = use_case.metadata(identity);
if let Some(id) = identity.and_then(|id| id.action_id.clone()) {
let mut metadata = use_case.metadata(Some(identity));
if let Some(id) = identity.action_id.clone() {
metadata.action_id = Some(id);
}
let request = GetDigestsTtlRequest {
Expand Down
13 changes: 3 additions & 10 deletions app/buck2_execute_impl/src/executors/action_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ async fn query_action_cache_and_download_result(
)
.await;

let identity = Some(ReActionIdentity::new(
let identity = ReActionIdentity::new(
command.target,
re_action_key.as_deref(),
command.request.paths(),
Some(action_digest.raw_digest().to_string()),
));
);
if upload_all_actions {
match re_client
.upload(
Expand All @@ -121,7 +121,7 @@ async fn query_action_cache_and_download_result(
action_blobs,
ProjectRelativePath::empty(),
request.paths().input_directory(),
identity.as_ref(),
&identity,
digest_config,
deduplicate_get_digests_ttl_calls,
)
Expand Down Expand Up @@ -168,13 +168,6 @@ async fn query_action_cache_and_download_result(
}
};

let identity = ReActionIdentity::new(
command.target,
re_action_key.as_deref(),
command.request.paths(),
Some(action_digest.raw_digest().to_string()),
);

let response = ActionCacheResult(response, cache_type.to_proto());
let res = download_action_results(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use buck2_error::BuckErrorContext;
use buck2_execute::digest_config::DigestConfig;
use buck2_execute::re::client::ActionCacheWriteType;
use buck2_execute::re::error::RemoteExecutionError;
use buck2_execute::re::action_identity::ReActionIdentity;
use buck2_execute::re::manager::ManagedRemoteExecutionClient;
use dashmap::DashMap;
use dupe::Dupe;
Expand Down Expand Up @@ -58,9 +59,18 @@ impl ActionCacheUploadPermissionChecker {
) -> buck2_error::Result<Result<(), String>> {
let (action, action_result) = empty_action_result(platform, digest_config)?;

// This is CAS upload, if it fails, something is very broken.
// This is CAS upload for permission check with a synthetic empty action.
let identity = ReActionIdentity::minimal(
"CASPermCheck".to_owned(),
Some("CASPermCheck".to_owned()),
);
re_client
.upload_files_and_directories(Vec::new(), Vec::new(), action.blobs.to_inlined_blobs())
.upload_files_and_directories(
Vec::new(),
Vec::new(),
action.blobs.to_inlined_blobs(),
&identity,
)
.await?;

// This operation requires permission to write.
Expand Down
29 changes: 28 additions & 1 deletion app/buck2_execute_impl/src/executors/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ impl CacheUploader {
return Ok(rejected);
}

let identity = ReActionIdentity::new(
info.target,
None, // re_action_key not available in cache upload context
info.paths,
Some(digest.raw_digest().to_string()),
);

// upload Action to CAS.
// This is necessary when writing to the ActionCache through CAS, since CAS needs to inspect the Action related to the ActionResult.
// Without storing the Action itself to CAS, ActionCache writes would fail.
Expand All @@ -145,6 +152,7 @@ impl CacheUploader {
vec![],
vec![],
action_digest_and_blobs.blobs.to_inlined_blobs(),
&identity,
)
.await?;

Expand Down Expand Up @@ -254,6 +262,13 @@ impl CacheUploader {
};
action_result.execution_metadata.auxiliary_metadata = vec![dep_file_tany];

let identity = ReActionIdentity::new(
info.target,
None, // re_action_key not available in cache upload context
info.paths,
Some(digest.raw_digest().to_string()),
);

// upload Action to CAS.
// This is necessary when writing to the ActionCache through CAS, since CAS needs to inspect the Action related to the ActionResult.
// Without storing the Action itself to CAS, ActionCache writes would fail.
Expand All @@ -262,6 +277,7 @@ impl CacheUploader {
vec![],
vec![],
remote_dep_file_action.blobs.to_inlined_blobs(),
&identity,
)
.await?;

Expand Down Expand Up @@ -350,6 +366,16 @@ impl CacheUploader {
..Default::default()
});

// ReActionIdentity contains references so it cannot be moved into the async
// block. Create it inside the closure instead. The action_id is precomputed
// above to avoid repeated string allocations.
let identity = ReActionIdentity::new(
info.target,
None, // re_action_key not available in cache upload context
info.paths,
Some(action_id.clone()),
);

let fut = async move {
let name = self
.artifact_fs
Expand All @@ -367,6 +393,7 @@ impl CacheUploader {
}],
vec![],
vec![],
&identity,
)
.await
};
Expand Down Expand Up @@ -404,7 +431,7 @@ impl CacheUploader {
&action_blobs,
output.path(),
&d.dupe().as_immutable(),
Some(&identity),
&identity,
digest_config,
self.deduplicate_get_digests_ttl_calls,
)
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute_impl/src/executors/re.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl ReExecutor {
blobs,
ProjectRelativePath::empty(),
paths.input_directory(),
Some(identity),
identity,
digest_config,
self.deduplicate_get_digests_ttl_calls,
)
Expand Down