Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def _upload_attachments(
upload_progress_callback: Optional[Callable],
config: Optional[ConfigParser] = None,
from_gui: bool = False,
force_s3_check: Optional[bool] = None,
) -> Dict[str, Any]:
"""
Starts the job attachments upload and handles the progress reporting callback.
Expand All @@ -175,6 +176,7 @@ def _default_update_upload_progress(upload_metadata: ProgressReportMetadata) ->
manifests=manifests,
on_uploading_assets=upload_progress_callback,
s3_check_cache_dir=config_file.get_cache_directory(),
force_s3_check=force_s3_check,
)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary,
Expand Down Expand Up @@ -392,6 +394,7 @@ def create_job_from_job_bundle(
hashing_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
upload_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
create_job_result_callback: Optional[Callable[[], bool]] = None,
force_s3_check: Optional[bool] = None,
) -> Optional[str]:
"""
Creates a [Deadline Cloud job] in the [queue] configured as default for the workstation
Expand Down Expand Up @@ -462,6 +465,9 @@ def create_job_from_job_bundle(
See hashing_progress_callback for more details.
create_job_result_callback (Callable -> bool): Callbacks periodically called while waiting for the deadline.create_job
result. See hashing_progress_callback for more details.
force_s3_check (bool, optional): If True, skip S3CheckCache and always do S3 HEAD
to verify job attachment existence before uploading. Use when S3 bucket contents may be out of sync with local caches.
If None (default), reads from the `settings.force_s3_check` config setting.

Returns:
Returns the submitted job id. If `debug_snapshot_dir` is provided then no job is submitted and it returns None.
Expand Down Expand Up @@ -498,6 +504,10 @@ def create_job_from_job_bundle(
"defaults.job_attachments_file_system", config=config
)

# Read force_s3_check from config if not explicitly set by caller
if force_s3_check is None:
force_s3_check = config_file.str2bool(get_setting("settings.force_s3_check", config=config))

queue = deadline.get_queue(
farmId=farm_id,
queueId=queue_id,
Expand Down Expand Up @@ -718,6 +728,7 @@ def create_job_from_job_bundle(
print_function_callback,
upload_progress_callback,
from_gui=from_gui,
force_s3_check=force_s3_check,
)
else:
attachment_settings = _snapshot_attachments( # type: ignore
Expand Down
15 changes: 15 additions & 0 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ def _interactive_confirmation_prompt(message: str, default_response: bool) -> bo
" It includes the job attachments and parameters for creating the job."
" You can later run the bash script in the snapshot to submit the job using AWS CLI commands.",
)
@click.option(
"--force-s3-check/--no-force-s3-check",
default=None,
help="Force verification that job attachments exist in S3 before skipping upload. "
"Use when S3 bucket contents may be out of sync with local caches. "
"Overrides the 'settings.force_s3_check' config setting.",
)
@click.argument("job_bundle_dir")
@_handle_error
def bundle_submit(
Expand All @@ -232,6 +239,7 @@ def bundle_submit(
require_paths_exist,
submitter_name,
save_debug_snapshot,
force_s3_check,
**args,
):
"""
Expand All @@ -245,6 +253,12 @@ def bundle_submit(
# Apply the CLI args to the config
config = _apply_cli_options_to_config(required_options={"farm_id", "queue_id"}, **args)

# Resolve force_s3_check: CLI flag takes precedence, otherwise use config setting
if force_s3_check is None:
force_s3_check = config_file.str2bool(
config_file.get_setting("settings.force_s3_check", config=config)
)

hash_callback_manager = _ProgressBarCallbackManager(length=100, label="Hashing Attachments")
upload_callback_manager = _ProgressBarCallbackManager(length=100, label="Uploading Attachments")

Expand Down Expand Up @@ -280,6 +294,7 @@ def _check_create_job_wait_canceled() -> bool:
submitter_name=submitter_name or "CLI",
known_asset_paths=known_asset_path,
debug_snapshot_dir=snapshot_tmpdir.name if snapshot_tmpdir else save_debug_snapshot,
force_s3_check=force_s3_check,
)

if snapshot_tmpdir:
Expand Down
9 changes: 9 additions & 0 deletions src/deadline/client/config/config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@
"default": "",
"description": "The locale to use for the UI. If empty, uses the system locale.",
},
"settings.force_s3_check": {
"default": "false",
"description": (
"Controls S3 verification behavior for job attachments. "
"When 'true', always verify files exist in S3 via HEAD request before skipping upload "
"(most reliable but slower, skips cache integrity check since every file is verified). "
"When 'false' or unset, use local cache with periodic integrity sampling against S3 (balanced default)."
),
},
}


Expand Down
3 changes: 3 additions & 0 deletions src/deadline/client/ui/dialogs/deadline_config_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ def _build_general_settings_ui(self, group, layout):
self.telemetry_opt_out = self._init_checkbox_setting(
group, layout, "telemetry.opt_out", tr("Telemetry opt out")
)
self.force_s3_check = self._init_checkbox_setting(
group, layout, "settings.force_s3_check", tr("Always check S3 job attachments")
)

self._conflict_resolution_options = [option.name for option in FileConflictResolution]
self.conflict_resolution_box = self._init_combobox_setting(
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/de_DE.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Hinzufügen...",
"All": "Alle",
"All fields below are optional": "Alle folgenden Felder sind optional",
"Always check S3 job attachments": "S3-Jobanhänge immer prüfen",
"Amount name": "Mengenname",
"Any": "Beliebig",
"Apply": "Anwenden",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/en_US.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Add...",
"All": "All",
"All fields below are optional": "All fields below are optional",
"Always check S3 job attachments": "Always check S3 job attachments",
"Amount name": "Amount name",
"Any": "Any",
"Apply": "Apply",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/es_ES.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Agregar...",
"All": "Todos",
"All fields below are optional": "Todos los campos siguientes son opcionales",
"Always check S3 job attachments": "Comprobar siempre los archivos adjuntos de trabajo en S3",
"Amount name": "Nombre de cantidad",
"Any": "Cualquiera",
"Apply": "Aplicar",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/fr_FR.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Ajouter...",
"All": "Tous",
"All fields below are optional": "Tous les champs ci-dessous sont facultatifs",
"Always check S3 job attachments": "Toujours vérifier les fichiers joints de tâche S3",
"Amount name": "Nom de quantité",
"Any": "Quelconque",
"Apply": "Appliquer",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/id_ID.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Tambah...",
"All": "Semua",
"All fields below are optional": "Semua bidang di bawah ini bersifat opsional",
"Always check S3 job attachments": "Selalu periksa lampiran pekerjaan S3",
"Amount name": "Nama jumlah",
"Any": "Apa saja",
"Apply": "Terapkan",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/it_IT.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Aggiungi...",
"All": "Tutti",
"All fields below are optional": "Tutti i campi seguenti sono facoltativi",
"Always check S3 job attachments": "Controlla sempre gli allegati lavoro S3",
"Amount name": "Nome quantità",
"Any": "Qualsiasi",
"Apply": "Applica",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/ja_JP.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "追加...",
"All": "すべて",
"All fields below are optional": "以下のすべてのフィールドはオプションです",
"Always check S3 job attachments": "S3 ジョブアタッチメントを常にチェック",
"Amount name": "量の名前",
"Any": "任意",
"Apply": "適用",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/ko_KR.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "추가...",
"All": "모두",
"All fields below are optional": "아래의 모든 필드는 선택 사항입니다",
"Always check S3 job attachments": "S3 작업 첨부 파일 항상 확인",
"Amount name": "수량 이름",
"Any": "모두",
"Apply": "적용",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/pt_BR.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Adicionar...",
"All": "Todos",
"All fields below are optional": "Todos os campos abaixo são opcionais",
"Always check S3 job attachments": "Sempre verificar anexos de trabalho do S3",
"Amount name": "Nome da quantidade",
"Any": "Qualquer",
"Apply": "Aplicar",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/tr_TR.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "Ekle...",
"All": "Tümü",
"All fields below are optional": "Aşağıdaki tüm alanlar isteğe bağlıdır",
"Always check S3 job attachments": "S3 iş eklerini her zaman kontrol et",
"Amount name": "Miktar adı",
"Any": "Herhangi",
"Apply": "Uygula",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/zh_CN.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "添加...",
"All": "全部",
"All fields below are optional": "以下所有字段均为可选",
"Always check S3 job attachments": "始终检查 S3 作业附件",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we translate job attachments to 作业附件? Also curious why the simplified is 作业 while traditional is 任務.

"Amount name": "数量名称",
"Any": "任意",
"Apply": "应用",
Expand Down
1 change: 1 addition & 0 deletions src/deadline/client/ui/translations/locales/zh_TW.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"Add...": "新增...",
"All": "全部",
"All fields below are optional": "以下所有欄位均為選用",
"Always check S3 job attachments": "一律檢查 S3 任務附件",
"Amount name": "數量名稱",
"Any": "任何",
"Apply": "套用",
Expand Down
45 changes: 34 additions & 11 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def upload_assets(
manifest_metadata: dict[str, dict[str, str]] = dict(),
manifest_file_name: Optional[str] = None,
asset_root: Optional[Path] = None,
force_s3_check: Optional[bool] = None,
) -> tuple[str, str]:
"""
Uploads assets based off of an asset manifest, uploads the asset manifest.
Expand All @@ -214,10 +215,12 @@ def upload_assets(
manifest_metadata: File metadata for given manifest to be uploaded.
manifest_file_name: Optional file name for given manifest to be uploaded, otherwise use default name.
asset_root: The root in which asset actually in to facilitate path mapping.
force_s3_check: Controls S3 verification behavior:
- True: Skip the S3 check cache, always check whether uploads are already in S3.
- False/None: Use the S3 check cache, with periodic integrity sampling against S3 (default)

Returns:
A tuple of (the partial key for the manifest on S3, the hash of input manifest).
"""
A tuple of (the partial key for the manifest on S3, the hash of input manifest)."""

# Upload asset manifest
(hash_alg, manifest_bytes, manifest_name) = S3AssetUploader._gather_upload_metadata(
Expand Down Expand Up @@ -253,8 +256,10 @@ def upload_assets(
extra_args=manifest_metadata,
)

# Verify S3 hash cache integrity, and reset cache if cached files are missing
if not self.verify_hash_cache_integrity(
# Verify S3 hash cache integrity, and reset cache if cached files are missing.
# Skip integrity check only when force_s3_check is True - we'll do S3 HEAD on every file anyway.
# When False or None, run the integrity check to catch stale cache entries.
if force_s3_check is not True and not self.verify_hash_cache_integrity(
s3_check_cache_dir,
manifest,
job_attachment_settings.full_cas_prefix(),
Expand All @@ -270,6 +275,7 @@ def upload_assets(
s3_cas_prefix=job_attachment_settings.full_cas_prefix(),
progress_tracker=progress_tracker,
s3_check_cache_dir=s3_check_cache_dir,
force_s3_check=force_s3_check,
)

return (partial_manifest_key, hash_data(manifest_bytes, hash_alg))
Expand Down Expand Up @@ -434,6 +440,7 @@ def upload_input_files(
s3_cas_prefix: str,
progress_tracker: Optional[ProgressTracker] = None,
s3_check_cache_dir: Optional[str] = None,
force_s3_check: Optional[bool] = None,
) -> None:
"""
Uploads all of the files listed in the given manifest to S3 if they don't exist in the
Expand Down Expand Up @@ -466,6 +473,7 @@ def upload_input_files(
s3_cas_prefix,
s3_cache,
progress_tracker,
force_s3_check,
): file
for file in small_file_queue
}
Expand All @@ -485,6 +493,7 @@ def upload_input_files(
s3_cas_prefix,
s3_cache,
progress_tracker,
force_s3_check,
)
if progress_tracker and not is_uploaded:
progress_tracker.increase_skipped(1, file_size)
Expand Down Expand Up @@ -644,23 +653,32 @@ def upload_object_to_cas(
s3_cas_prefix: str,
s3_check_cache: S3CheckCache,
progress_tracker: Optional[ProgressTracker] = None,
force_s3_check: Optional[bool] = None,
) -> Tuple[bool, int]:
"""
Uploads an object to the S3 content-addressable storage (CAS) prefix. Optionally,
does a head-object check and only uploads the file if it doesn't exist in S3 already.
Returns a tuple (whether it has been uploaded, the file size).

Args:
force_s3_check: Controls S3 verification behavior:
- True: Skip the S3 check cache, always check whether uploads are already in S3.
- False/None: Use the S3 check cache, with periodic integrity sampling against S3 (default)
"""
local_path = source_root.joinpath(file.path)
s3_upload_key = self._generate_s3_upload_key(file, hash_algorithm, s3_cas_prefix)
is_uploaded = False

if s3_check_cache.get_connection_entry(
s3_key=f"{s3_bucket}/{s3_upload_key}", connection=s3_check_cache.get_local_connection()
):
logger.debug(
f"skipping {local_path} because {s3_bucket}/{s3_upload_key} exists in the cache"
)
return (is_uploaded, file.size)
# Check cache first unless force_s3_check is True (skip cache entirely)
if force_s3_check is not True:
if s3_check_cache.get_connection_entry(
s3_key=f"{s3_bucket}/{s3_upload_key}",
connection=s3_check_cache.get_local_connection(),
):
logger.debug(
f"skipping {local_path} because {s3_bucket}/{s3_upload_key} exists in the cache"
)
return (is_uploaded, file.size)

if self.file_already_uploaded(s3_bucket, s3_upload_key):
logger.debug(
Expand Down Expand Up @@ -1499,6 +1517,7 @@ def upload_assets(
on_uploading_assets: Optional[Callable[[Any], bool]] = None,
s3_check_cache_dir: Optional[str] = None,
manifest_write_dir: Optional[str] = None,
force_s3_check: Optional[bool] = None,
) -> tuple[SummaryStatistics, Attachments]:
"""
Uploads all the files for provided manifests and manifests themselves to S3.
Expand All @@ -1507,6 +1526,9 @@ def upload_assets(
manifests: a list of manifests that contain assets to be uploaded
on_uploading_assets: a callback to be called to periodically report progress to the caller.
The callback returns True if the operation should continue as normal, or False to cancel.
force_s3_check: Controls S3 verification behavior:
- True: Skip the S3 check cache, always check whether uploads are already in S3.
- False/None: Use the S3 check cache, with periodic integrity sampling against S3 (default)

Returns:
a tuple with (1) the summary statistics of the upload operation, and
Expand Down Expand Up @@ -1555,6 +1577,7 @@ def upload_assets(
progress_tracker=progress_tracker,
s3_check_cache_dir=s3_check_cache_dir,
manifest_write_dir=manifest_write_dir,
force_s3_check=force_s3_check,
)
manifest_properties.inputManifestPath = partial_manifest_key
manifest_properties.inputManifestHash = asset_manifest_hash
Expand Down
Loading
Loading