diff --git a/src/deadline/client/api/_submit_job_bundle.py b/src/deadline/client/api/_submit_job_bundle.py index 7d29d8512..08418331e 100644 --- a/src/deadline/client/api/_submit_job_bundle.py +++ b/src/deadline/client/api/_submit_job_bundle.py @@ -159,6 +159,7 @@ def _upload_attachments( upload_progress_callback: Optional[Callable], config: Optional[ConfigParser] = None, from_gui: bool = False, + force_s3_check: Optional[bool] = None, ) -> Dict[str, Any]: """ Starts the job attachments upload and handles the progress reporting callback. @@ -175,6 +176,7 @@ def _default_update_upload_progress(upload_metadata: ProgressReportMetadata) -> manifests=manifests, on_uploading_assets=upload_progress_callback, s3_check_cache_dir=config_file.get_cache_directory(), + force_s3_check=force_s3_check, ) api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary( upload_summary, @@ -392,6 +394,7 @@ def create_job_from_job_bundle( hashing_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None, upload_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None, create_job_result_callback: Optional[Callable[[], bool]] = None, + force_s3_check: Optional[bool] = None, ) -> Optional[str]: """ Creates a [Deadline Cloud job] in the [queue] configured as default for the workstation @@ -462,6 +465,9 @@ def create_job_from_job_bundle( See hashing_progress_callback for more details. create_job_result_callback (Callable -> bool): Callbacks periodically called while waiting for the deadline.create_job result. See hashing_progress_callback for more details. + force_s3_check (bool, optional): If True, skip S3CheckCache and always do S3 HEAD + to verify job attachment existence before uploading. Use when S3 bucket contents may be out of sync with local caches. + If None (default), reads from the `settings.force_s3_check` config setting. Returns: Returns the submitted job id. If `debug_snapshot_dir` is provided then no job is submitted and it returns None. @@ -498,6 +504,10 @@ def create_job_from_job_bundle( "defaults.job_attachments_file_system", config=config ) + # Read force_s3_check from config if not explicitly set by caller + if force_s3_check is None: + force_s3_check = config_file.str2bool(get_setting("settings.force_s3_check", config=config)) + queue = deadline.get_queue( farmId=farm_id, queueId=queue_id, @@ -718,6 +728,7 @@ def create_job_from_job_bundle( print_function_callback, upload_progress_callback, from_gui=from_gui, + force_s3_check=force_s3_check, ) else: attachment_settings = _snapshot_attachments( # type: ignore diff --git a/src/deadline/client/cli/_groups/bundle_group.py b/src/deadline/client/cli/_groups/bundle_group.py index d28679a12..9b7baef9e 100644 --- a/src/deadline/client/cli/_groups/bundle_group.py +++ b/src/deadline/client/cli/_groups/bundle_group.py @@ -216,6 +216,13 @@ def _interactive_confirmation_prompt(message: str, default_response: bool) -> bo " It includes the job attachments and parameters for creating the job." " You can later run the bash script in the snapshot to submit the job using AWS CLI commands.", ) +@click.option( + "--force-s3-check/--no-force-s3-check", + default=None, + help="Force verification that job attachments exist in S3 before skipping upload. " + "Use when S3 bucket contents may be out of sync with local caches. " + "Overrides the 'settings.force_s3_check' config setting.", +) @click.argument("job_bundle_dir") @_handle_error def bundle_submit( @@ -232,6 +239,7 @@ def bundle_submit( require_paths_exist, submitter_name, save_debug_snapshot, + force_s3_check, **args, ): """ @@ -245,6 +253,12 @@ def bundle_submit( # Apply the CLI args to the config config = _apply_cli_options_to_config(required_options={"farm_id", "queue_id"}, **args) + # Resolve force_s3_check: CLI flag takes precedence, otherwise use config setting + if force_s3_check is None: + force_s3_check = config_file.str2bool( + config_file.get_setting("settings.force_s3_check", config=config) + ) + hash_callback_manager = _ProgressBarCallbackManager(length=100, label="Hashing Attachments") upload_callback_manager = _ProgressBarCallbackManager(length=100, label="Uploading Attachments") @@ -280,6 +294,7 @@ def _check_create_job_wait_canceled() -> bool: submitter_name=submitter_name or "CLI", known_asset_paths=known_asset_path, debug_snapshot_dir=snapshot_tmpdir.name if snapshot_tmpdir else save_debug_snapshot, + force_s3_check=force_s3_check, ) if snapshot_tmpdir: diff --git a/src/deadline/client/config/config_file.py b/src/deadline/client/config/config_file.py index 1e037be10..318273e73 100644 --- a/src/deadline/client/config/config_file.py +++ b/src/deadline/client/config/config_file.py @@ -145,6 +145,15 @@ "default": "", "description": "The locale to use for the UI. If empty, uses the system locale.", }, + "settings.force_s3_check": { + "default": "false", + "description": ( + "Controls S3 verification behavior for job attachments. " + "When 'true', always verify files exist in S3 via HEAD request before skipping upload " + "(most reliable but slower, skips cache integrity check since every file is verified). " + "When 'false' or unset, use local cache with periodic integrity sampling against S3 (balanced default)." + ), + }, } diff --git a/src/deadline/client/ui/dialogs/deadline_config_dialog.py b/src/deadline/client/ui/dialogs/deadline_config_dialog.py index 4bed4b7b7..468031100 100644 --- a/src/deadline/client/ui/dialogs/deadline_config_dialog.py +++ b/src/deadline/client/ui/dialogs/deadline_config_dialog.py @@ -376,6 +376,9 @@ def _build_general_settings_ui(self, group, layout): self.telemetry_opt_out = self._init_checkbox_setting( group, layout, "telemetry.opt_out", tr("Telemetry opt out") ) + self.force_s3_check = self._init_checkbox_setting( + group, layout, "settings.force_s3_check", tr("Always check S3 job attachments") + ) self._conflict_resolution_options = [option.name for option in FileConflictResolution] self.conflict_resolution_box = self._init_combobox_setting( diff --git a/src/deadline/client/ui/translations/locales/de_DE.json b/src/deadline/client/ui/translations/locales/de_DE.json index cedc8b283..7a5a8b205 100644 --- a/src/deadline/client/ui/translations/locales/de_DE.json +++ b/src/deadline/client/ui/translations/locales/de_DE.json @@ -12,6 +12,7 @@ "Add...": "Hinzufügen...", "All": "Alle", "All fields below are optional": "Alle folgenden Felder sind optional", + "Always check S3 job attachments": "S3-Jobanhänge immer prüfen", "Amount name": "Mengenname", "Any": "Beliebig", "Apply": "Anwenden", diff --git a/src/deadline/client/ui/translations/locales/en_US.json b/src/deadline/client/ui/translations/locales/en_US.json index 4d05dda68..59edcf3fa 100644 --- a/src/deadline/client/ui/translations/locales/en_US.json +++ b/src/deadline/client/ui/translations/locales/en_US.json @@ -12,6 +12,7 @@ "Add...": "Add...", "All": "All", "All fields below are optional": "All fields below are optional", + "Always check S3 job attachments": "Always check S3 job attachments", "Amount name": "Amount name", "Any": "Any", "Apply": "Apply", diff --git a/src/deadline/client/ui/translations/locales/es_ES.json b/src/deadline/client/ui/translations/locales/es_ES.json index 8f546ab19..dab64dca2 100644 --- a/src/deadline/client/ui/translations/locales/es_ES.json +++ b/src/deadline/client/ui/translations/locales/es_ES.json @@ -12,6 +12,7 @@ "Add...": "Agregar...", "All": "Todos", "All fields below are optional": "Todos los campos siguientes son opcionales", + "Always check S3 job attachments": "Comprobar siempre los archivos adjuntos de trabajo en S3", "Amount name": "Nombre de cantidad", "Any": "Cualquiera", "Apply": "Aplicar", diff --git a/src/deadline/client/ui/translations/locales/fr_FR.json b/src/deadline/client/ui/translations/locales/fr_FR.json index da277ec78..ca41db5f1 100644 --- a/src/deadline/client/ui/translations/locales/fr_FR.json +++ b/src/deadline/client/ui/translations/locales/fr_FR.json @@ -12,6 +12,7 @@ "Add...": "Ajouter...", "All": "Tous", "All fields below are optional": "Tous les champs ci-dessous sont facultatifs", + "Always check S3 job attachments": "Toujours vérifier les fichiers joints de tâche S3", "Amount name": "Nom de quantité", "Any": "Quelconque", "Apply": "Appliquer", diff --git a/src/deadline/client/ui/translations/locales/id_ID.json b/src/deadline/client/ui/translations/locales/id_ID.json index 44d3f672c..47cc81e39 100644 --- a/src/deadline/client/ui/translations/locales/id_ID.json +++ b/src/deadline/client/ui/translations/locales/id_ID.json @@ -12,6 +12,7 @@ "Add...": "Tambah...", "All": "Semua", "All fields below are optional": "Semua bidang di bawah ini bersifat opsional", + "Always check S3 job attachments": "Selalu periksa lampiran pekerjaan S3", "Amount name": "Nama jumlah", "Any": "Apa saja", "Apply": "Terapkan", diff --git a/src/deadline/client/ui/translations/locales/it_IT.json b/src/deadline/client/ui/translations/locales/it_IT.json index 94f352c13..dbdbec1f4 100644 --- a/src/deadline/client/ui/translations/locales/it_IT.json +++ b/src/deadline/client/ui/translations/locales/it_IT.json @@ -12,6 +12,7 @@ "Add...": "Aggiungi...", "All": "Tutti", "All fields below are optional": "Tutti i campi seguenti sono facoltativi", + "Always check S3 job attachments": "Controlla sempre gli allegati lavoro S3", "Amount name": "Nome quantità", "Any": "Qualsiasi", "Apply": "Applica", diff --git a/src/deadline/client/ui/translations/locales/ja_JP.json b/src/deadline/client/ui/translations/locales/ja_JP.json index 2ebc9d4fa..1897c3a7c 100644 --- a/src/deadline/client/ui/translations/locales/ja_JP.json +++ b/src/deadline/client/ui/translations/locales/ja_JP.json @@ -12,6 +12,7 @@ "Add...": "追加...", "All": "すべて", "All fields below are optional": "以下のすべてのフィールドはオプションです", + "Always check S3 job attachments": "S3 ジョブアタッチメントを常にチェック", "Amount name": "量の名前", "Any": "任意", "Apply": "適用", diff --git a/src/deadline/client/ui/translations/locales/ko_KR.json b/src/deadline/client/ui/translations/locales/ko_KR.json index 08d534692..e5be44602 100644 --- a/src/deadline/client/ui/translations/locales/ko_KR.json +++ b/src/deadline/client/ui/translations/locales/ko_KR.json @@ -12,6 +12,7 @@ "Add...": "추가...", "All": "모두", "All fields below are optional": "아래의 모든 필드는 선택 사항입니다", + "Always check S3 job attachments": "S3 작업 첨부 파일 항상 확인", "Amount name": "수량 이름", "Any": "모두", "Apply": "적용", diff --git a/src/deadline/client/ui/translations/locales/pt_BR.json b/src/deadline/client/ui/translations/locales/pt_BR.json index bbda297ad..44d9c6962 100644 --- a/src/deadline/client/ui/translations/locales/pt_BR.json +++ b/src/deadline/client/ui/translations/locales/pt_BR.json @@ -12,6 +12,7 @@ "Add...": "Adicionar...", "All": "Todos", "All fields below are optional": "Todos os campos abaixo são opcionais", + "Always check S3 job attachments": "Sempre verificar anexos de trabalho do S3", "Amount name": "Nome da quantidade", "Any": "Qualquer", "Apply": "Aplicar", diff --git a/src/deadline/client/ui/translations/locales/tr_TR.json b/src/deadline/client/ui/translations/locales/tr_TR.json index 7eda93457..29f325083 100644 --- a/src/deadline/client/ui/translations/locales/tr_TR.json +++ b/src/deadline/client/ui/translations/locales/tr_TR.json @@ -12,6 +12,7 @@ "Add...": "Ekle...", "All": "Tümü", "All fields below are optional": "Aşağıdaki tüm alanlar isteğe bağlıdır", + "Always check S3 job attachments": "S3 iş eklerini her zaman kontrol et", "Amount name": "Miktar adı", "Any": "Herhangi", "Apply": "Uygula", diff --git a/src/deadline/client/ui/translations/locales/zh_CN.json b/src/deadline/client/ui/translations/locales/zh_CN.json index 8b926638e..fabd2f52c 100644 --- a/src/deadline/client/ui/translations/locales/zh_CN.json +++ b/src/deadline/client/ui/translations/locales/zh_CN.json @@ -12,6 +12,7 @@ "Add...": "添加...", "All": "全部", "All fields below are optional": "以下所有字段均为可选", + "Always check S3 job attachments": "始终检查 S3 作业附件", "Amount name": "数量名称", "Any": "任意", "Apply": "应用", diff --git a/src/deadline/client/ui/translations/locales/zh_TW.json b/src/deadline/client/ui/translations/locales/zh_TW.json index e60a4def8..fd171c3fb 100644 --- a/src/deadline/client/ui/translations/locales/zh_TW.json +++ b/src/deadline/client/ui/translations/locales/zh_TW.json @@ -12,6 +12,7 @@ "Add...": "新增...", "All": "全部", "All fields below are optional": "以下所有欄位均為選用", + "Always check S3 job attachments": "一律檢查 S3 任務附件", "Amount name": "數量名稱", "Any": "任何", "Apply": "套用", diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 52c4ac985..1183370a7 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -198,6 +198,7 @@ def upload_assets( manifest_metadata: dict[str, dict[str, str]] = dict(), manifest_file_name: Optional[str] = None, asset_root: Optional[Path] = None, + force_s3_check: Optional[bool] = None, ) -> tuple[str, str]: """ Uploads assets based off of an asset manifest, uploads the asset manifest. @@ -214,10 +215,12 @@ def upload_assets( manifest_metadata: File metadata for given manifest to be uploaded. manifest_file_name: Optional file name for given manifest to be uploaded, otherwise use default name. asset_root: The root in which asset actually in to facilitate path mapping. + force_s3_check: Controls S3 verification behavior: + - True: Skip the S3 check cache, always check whether uploads are already in S3. + - False/None: Use the S3 check cache, with periodic integrity sampling against S3 (default) Returns: - A tuple of (the partial key for the manifest on S3, the hash of input manifest). - """ + A tuple of (the partial key for the manifest on S3, the hash of input manifest).""" # Upload asset manifest (hash_alg, manifest_bytes, manifest_name) = S3AssetUploader._gather_upload_metadata( @@ -253,8 +256,10 @@ def upload_assets( extra_args=manifest_metadata, ) - # Verify S3 hash cache integrity, and reset cache if cached files are missing - if not self.verify_hash_cache_integrity( + # Verify S3 hash cache integrity, and reset cache if cached files are missing. + # Skip integrity check only when force_s3_check is True - we'll do S3 HEAD on every file anyway. + # When False or None, run the integrity check to catch stale cache entries. + if force_s3_check is not True and not self.verify_hash_cache_integrity( s3_check_cache_dir, manifest, job_attachment_settings.full_cas_prefix(), @@ -270,6 +275,7 @@ def upload_assets( s3_cas_prefix=job_attachment_settings.full_cas_prefix(), progress_tracker=progress_tracker, s3_check_cache_dir=s3_check_cache_dir, + force_s3_check=force_s3_check, ) return (partial_manifest_key, hash_data(manifest_bytes, hash_alg)) @@ -434,6 +440,7 @@ def upload_input_files( s3_cas_prefix: str, progress_tracker: Optional[ProgressTracker] = None, s3_check_cache_dir: Optional[str] = None, + force_s3_check: Optional[bool] = None, ) -> None: """ Uploads all of the files listed in the given manifest to S3 if they don't exist in the @@ -466,6 +473,7 @@ def upload_input_files( s3_cas_prefix, s3_cache, progress_tracker, + force_s3_check, ): file for file in small_file_queue } @@ -485,6 +493,7 @@ def upload_input_files( s3_cas_prefix, s3_cache, progress_tracker, + force_s3_check, ) if progress_tracker and not is_uploaded: progress_tracker.increase_skipped(1, file_size) @@ -644,23 +653,32 @@ def upload_object_to_cas( s3_cas_prefix: str, s3_check_cache: S3CheckCache, progress_tracker: Optional[ProgressTracker] = None, + force_s3_check: Optional[bool] = None, ) -> Tuple[bool, int]: """ Uploads an object to the S3 content-addressable storage (CAS) prefix. Optionally, does a head-object check and only uploads the file if it doesn't exist in S3 already. Returns a tuple (whether it has been uploaded, the file size). + + Args: + force_s3_check: Controls S3 verification behavior: + - True: Skip the S3 check cache, always check whether uploads are already in S3. + - False/None: Use the S3 check cache, with periodic integrity sampling against S3 (default) """ local_path = source_root.joinpath(file.path) s3_upload_key = self._generate_s3_upload_key(file, hash_algorithm, s3_cas_prefix) is_uploaded = False - if s3_check_cache.get_connection_entry( - s3_key=f"{s3_bucket}/{s3_upload_key}", connection=s3_check_cache.get_local_connection() - ): - logger.debug( - f"skipping {local_path} because {s3_bucket}/{s3_upload_key} exists in the cache" - ) - return (is_uploaded, file.size) + # Check cache first unless force_s3_check is True (skip cache entirely) + if force_s3_check is not True: + if s3_check_cache.get_connection_entry( + s3_key=f"{s3_bucket}/{s3_upload_key}", + connection=s3_check_cache.get_local_connection(), + ): + logger.debug( + f"skipping {local_path} because {s3_bucket}/{s3_upload_key} exists in the cache" + ) + return (is_uploaded, file.size) if self.file_already_uploaded(s3_bucket, s3_upload_key): logger.debug( @@ -1499,6 +1517,7 @@ def upload_assets( on_uploading_assets: Optional[Callable[[Any], bool]] = None, s3_check_cache_dir: Optional[str] = None, manifest_write_dir: Optional[str] = None, + force_s3_check: Optional[bool] = None, ) -> tuple[SummaryStatistics, Attachments]: """ Uploads all the files for provided manifests and manifests themselves to S3. @@ -1507,6 +1526,9 @@ def upload_assets( manifests: a list of manifests that contain assets to be uploaded on_uploading_assets: a callback to be called to periodically report progress to the caller. The callback returns True if the operation should continue as normal, or False to cancel. + force_s3_check: Controls S3 verification behavior: + - True: Skip the S3 check cache, always check whether uploads are already in S3. + - False/None: Use the S3 check cache, with periodic integrity sampling against S3 (default) Returns: a tuple with (1) the summary statistics of the upload operation, and @@ -1555,6 +1577,7 @@ def upload_assets( progress_tracker=progress_tracker, s3_check_cache_dir=s3_check_cache_dir, manifest_write_dir=manifest_write_dir, + force_s3_check=force_s3_check, ) manifest_properties.inputManifestPath = partial_manifest_key manifest_properties.inputManifestHash = asset_manifest_hash diff --git a/test/unit/deadline_client/api/test_job_bundle_submission.py b/test/unit/deadline_client/api/test_job_bundle_submission.py index b7781fb2f..384ff2407 100644 --- a/test/unit/deadline_client/api/test_job_bundle_submission.py +++ b/test/unit/deadline_client/api/test_job_bundle_submission.py @@ -1063,3 +1063,70 @@ def mock_continue_callback() -> bool: deadline_client=deadline_client, continue_callback=mock_continue_callback, ) + + +@pytest.mark.parametrize( + "force_s3_check_param, config_value, expected_value", + [ + pytest.param(True, "false", True, id="explicit-true-overrides-config-false"), + pytest.param(False, "true", False, id="explicit-false-overrides-config-true"), + pytest.param(None, "true", True, id="none-reads-config-true"), + pytest.param(None, "false", False, id="none-reads-config-false"), + ], +) +def test_create_job_from_job_bundle_force_s3_check( + fresh_deadline_config, + temp_job_bundle_dir, + force_s3_check_param, + config_value, + expected_value, +): + """ + Test that force_s3_check parameter is correctly resolved: + - Explicit True/False overrides config + - None falls back to config setting + """ + config.set_setting("defaults.farm_id", MOCK_FARM_ID) + config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) + config.set_setting("settings.storage_profile_id", MOCK_STORAGE_PROFILE_ID) + config.set_setting("settings.force_s3_check", config_value) + + job_template_type, job_template = MOCK_JOB_TEMPLATE_CASES["MINIMAL_JSON"] + + with patch_calls_for_create_job_from_job_bundle() as mock: + mock.get_boto3_client().get_storage_profile_for_queue.return_value = ( + MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE + ) + + # Write the template to the job bundle + with open( + os.path.join(temp_job_bundle_dir, f"template.{job_template_type.lower()}"), + "w", + encoding="utf8", + ) as f: + f.write(job_template) + + # Build kwargs, only include force_s3_check if not None + kwargs: Dict[str, Any] = { + "job_bundle_dir": temp_job_bundle_dir, + "queue_parameter_definitions": [], + } + if force_s3_check_param is not None: + kwargs["force_s3_check"] = force_s3_check_param + + # Call the function under test + response = api.create_job_from_job_bundle(**kwargs) + + # Verify the job was created + assert response == MOCK_JOB_ID + + # Verify create_job_from_job_bundle was called with the expected force_s3_check value + mock.create_job_from_job_bundle.assert_called_once() + _, call_kwargs = mock.create_job_from_job_bundle.call_args + # The wrapped function receives the resolved value, but we need to check + # what was passed to _upload_attachments. Since there are no attachments + # in this test, we verify the parameter was passed correctly to the function. + if force_s3_check_param is not None: + assert call_kwargs.get("force_s3_check") == force_s3_check_param + else: + assert call_kwargs.get("force_s3_check") is None diff --git a/test/unit/deadline_client/cli/test_cli_config.py b/test/unit/deadline_client/cli/test_cli_config.py index 5e511ca4c..df9bcb70c 100644 --- a/test/unit/deadline_client/cli/test_cli_config.py +++ b/test/unit/deadline_client/cli/test_cli_config.py @@ -35,7 +35,7 @@ def test_cli_config_show_defaults(fresh_deadline_config): assert fresh_deadline_config in result.output # Assert the expected number of settings - assert len(settings.keys()) == 17 + assert len(settings.keys()) == 18 for setting_name in settings.keys(): assert setting_name in result.output @@ -103,6 +103,7 @@ def test_cli_config_show_modified_config(fresh_deadline_config): config.set_setting("settings.small_file_threshold_multiplier", "15") config.set_setting("settings.known_asset_paths", "/known/asset/path") config.set_setting("settings.locale", "ja_JP") + config.set_setting("settings.force_s3_check", "true") runner = CliRunner() result = runner.invoke(main, ["config", "show"]) @@ -114,6 +115,9 @@ def test_cli_config_show_modified_config(fresh_deadline_config): assert "~/alternate/job_history" in result.output assert result.output.count("False") == 1 assert result.output.count("True") == 1 + # "true" appears three times: once as the value for force_s3_check, + # once in the telemetry.opt_out description, and once in the force_s3_check description + assert result.output.count("true") == 3 assert "farm-82934h23k4j23kjh" in result.output assert "queue-389348u234jhk34" in result.output assert "job-239u40234jkl234nkl23" in result.output diff --git a/test/unit/deadline_client/config/test_config_file.py b/test/unit/deadline_client/config/test_config_file.py index ab670a7ac..5f3f7f14d 100644 --- a/test/unit/deadline_client/config/test_config_file.py +++ b/test/unit/deadline_client/config/test_config_file.py @@ -26,6 +26,7 @@ ("defaults.farm_id", "", "farm-82934h23k4j23kjh"), ("defaults.job_attachments_file_system", "COPIED", "VIRTUAL"), ("settings.locale", "", "ja_JP"), + ("settings.force_s3_check", "false", "true"), ] diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index cbcc65df3..d1148a5e6 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -2651,6 +2651,91 @@ def test_upload_object_to_cas_skips_upload_with_cache( assert file_size == 5 s3_cache.put_entry.assert_not_called() + @mock_aws + @pytest.mark.parametrize( + "manifest_version", + [ + ManifestVersion.v2023_03_03, + ], + ) + @pytest.mark.parametrize( + "file_exists_in_s3, expected_upload", + [ + pytest.param(True, False, id="file-exists-skip-upload"), + pytest.param(False, True, id="file-missing-do-upload"), + ], + ) + def test_upload_object_to_cas_force_s3_check_bypasses_cache( + self, + tmpdir, + farm_id, + queue_id, + manifest_version, + default_job_attachment_s3_settings, + file_exists_in_s3, + expected_upload, + ): + """ + Tests that when force_s3_check=True, the S3CheckCache is bypassed and S3 HEAD is always performed. + Verifies that upload happens only when file doesn't exist in S3. + """ + # Given + asset_root = tmpdir.mkdir("test-root") + test_file = asset_root.join("test-file.txt") + test_file.write("stuff") + asset_manager = S3AssetManager( + farm_id=farm_id, + queue_id=queue_id, + job_attachment_settings=self.job_attachment_s3_settings, + asset_manifest_version=manifest_version, + ) + s3_key = f"{default_job_attachment_s3_settings.s3BucketName}/prefix/test-hash.xxh128" + test_entry = S3CheckCacheEntry(s3_key, "123.45") + s3_cache = MagicMock() + # Cache has an entry, but it should be bypassed + s3_cache.get_connection_entry.return_value = test_entry + + # When + with patch.object( + asset_manager.asset_uploader, + "_get_current_timestamp", + side_effect=["345.67"], + ), patch.object( + asset_manager.asset_uploader, + "file_already_uploaded", + return_value=file_exists_in_s3, + ) as mock_file_already_uploaded, patch.object( + asset_manager.asset_uploader, + "upload_file_to_s3", + ) as mock_upload_file_to_s3: + (is_uploaded, file_size) = asset_manager.asset_uploader.upload_object_to_cas( + file=BaseManifestPath(path="test-file.txt", hash="test-hash", size=5, mtime=1), + hash_algorithm=HashAlgorithm.XXH128, + s3_bucket=default_job_attachment_s3_settings.s3BucketName, + source_root=Path(asset_root), + s3_cas_prefix="prefix", + s3_check_cache=s3_cache, + force_s3_check=True, + ) + + # Then + assert is_uploaded == expected_upload + assert file_size == 5 + # Cache lookup should NOT have been called (bypassed due to force_s3_check) + s3_cache.get_connection_entry.assert_not_called() + # S3 HEAD should always be called when force_s3_check=True + mock_file_already_uploaded.assert_called_once_with( + default_job_attachment_s3_settings.s3BucketName, "prefix/test-hash.xxh128" + ) + # Upload should only happen if file doesn't exist in S3 + if expected_upload: + mock_upload_file_to_s3.assert_called_once() + else: + mock_upload_file_to_s3.assert_not_called() + # Cache should always be updated after HEAD/upload + expected_new_entry = S3CheckCacheEntry(s3_key, "345.67") + s3_cache.put_entry.assert_called_once_with(expected_new_entry) + def test_open_non_symlink_file_binary(self, tmp_path: Path): temp_file = tmp_path / "temp_file.txt" temp_file.write_text("this is test file")