diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/business_glossary_import_v2.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/business_glossary_import_v2.py index f44d834..35bf0b5 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/business_glossary_import_v2.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/business_glossary_import_v2.py @@ -1,11 +1,11 @@ from concurrent.futures import ThreadPoolExecutor, as_completed -from itertools import cycle -from typing import Dict, List +from typing import List import logging_utils from file_utils import * -from gcs_dao import prepare_gcs_bucket +from gcs_dao import prepare_gcs_bucket, ensure_folders_exist from dataplex_dao import get_dataplex_service, create_and_monitor_job from payloads import * +from constants import MIGRATION_FOLDER_PREFIX, MAX_FOLDERS import os from file_utils import * logger = logging_utils.get_logger() @@ -21,7 +21,7 @@ def get_referenced_scopes(file_path: str, main_project_id: str) -> list: return list(scopes) -def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> bool: +def process_import_file(file_path: str, project_id: str, gcs_bucket: str, folder_name: str) -> bool: """ Processes a single glossary or entrylink file: - Builds payload @@ -40,13 +40,14 @@ def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> boo service = get_dataplex_service() - job_id, payload, job_location = build_payload(file_path, project_id, gcs_bucket) + job_id, payload, job_location = build_payload(file_path, project_id, gcs_bucket, folder_name) if not payload or not job_id or not job_location: return False try: # Upload file to GCS first; only continue if upload succeeded - upload_status = prepare_gcs_bucket(gcs_bucket, file_path, filename) + upload_status = prepare_gcs_bucket(gcs_bucket, folder_name, file_path, filename) + logger.debug(f"Upload status for file '{filename}': {upload_status} to bucket '{gcs_bucket}' in folder '{folder_name}'") if not upload_status: logger.error(f"Failed to prepare GCS bucket '{gcs_bucket}' for file '{filename}'. Skipping import.") return False @@ -60,45 +61,62 @@ def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> boo return False -def _process_files_for_bucket(files_for_bucket: List[str], project_id: str, bucket: str) -> List[bool]: - """Worker: processes all files assigned to a bucket sequentially.""" - results = [] - for f in files_for_bucket: - try: - result = process_import_file(f, project_id, bucket) - except Exception as e: - logger.error(f"Error processing file {f} in bucket {bucket}: {e}") - logger.debug(f"Error processing file {f} in bucket {bucket}: {e}", exc_info=True) - result = False - results.append(result) - return results - - def run_import_files(files: List[str], project_id: str, buckets: List[str]) -> List[bool]: - """ - Distribute files round-robin to buckets, start one worker per bucket and process sequentially - to guarantee no two threads operate on the same bucket. - """ + """Runs import for multiple files using ThreadPoolExecutor for concurrency.""" + if not files: + return [] if not buckets: logger.error("No buckets provided to run_import_files.") return [False] * len(files) - bucket_file_map: Dict[str, List[str]] = {b: [] for b in buckets} - cycler = cycle(buckets) - for f in files: - b = next(cycler) - bucket_file_map[b].append(f) + bucket = buckets[0] + if len(buckets) > 1: + logger.warning(f"Multiple buckets provided; using '{bucket}' with folder-based uploads.") + + folder_names = [f"{MIGRATION_FOLDER_PREFIX}{(idx % MAX_FOLDERS)+1}" for idx in range(len(files))] + if not ensure_folders_exist(bucket, folder_names): + logger.error(f"Unable to ensure migration folders exist in bucket '{bucket}'.") + return [False] * len(files) + + assignments = list(zip(files, folder_names)) + logger.info(f"Starting import of {len(files)} files into bucket '{bucket}' using {len(folder_names)} folders.") results: List[bool] = [] - with ThreadPoolExecutor(max_workers=len(buckets)) as executor: + max_workers = min(MAX_FOLDERS, len(assignments)) + import_files_with_threads(project_id, bucket, assignments, results, max_workers) # Re-raise to propagate the interrupt + + return results + +def import_files_with_threads(project_id, bucket, assignments, results, max_workers): + with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { - executor.submit(_process_files_for_bucket, bucket_file_map[bucket], project_id, bucket): bucket - for bucket in buckets + executor.submit(process_import_file, file_path, project_id, bucket, folder_name): (file_path, folder_name) + for file_path, folder_name in assignments } - for future in as_completed(future_map): - bucket_results = future.result() or [] - results.extend(bucket_results) - return results + + try: + for future in as_completed(future_map): + file_path, folder_name = future_map[future] + try: + result = future.result() + except Exception as e: + logger.error(f"Error processing file {file_path} in bucket {bucket} folder {folder_name}: {e}") + logger.debug(f"Error processing file {file_path} in bucket {bucket} folder {folder_name}: {e}", exc_info=True) + result = False + results.append(result) + except KeyboardInterrupt: + logger.warning("\n*** Import interrupted by user (Ctrl+C) ***") + logger.info("Cancelling pending imports...") + + # Cancel all pending futures + for future in future_map.keys(): + if not future.done(): + future.cancel() + + # Shutdown without waiting for threads + executor.shutdown(wait=False) + logger.info("Import process terminated by user") + raise def filter_files_for_phases(phase_name: str, exported_files: List[str]) -> List[str]: diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/constants.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/constants.py index 6dd0699..84c7f9c 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/constants.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/constants.py @@ -2,7 +2,7 @@ # --- URLs --- DATACATALOG_BASE_URL = "https://datacatalog.googleapis.com/v2" -DATAPLEX_BASE_URL = "https://dataplex.googleapis.com/v1" +DATAPLEX_BASE_URL = "https://staging-dataplex.sandbox.googleapis.com/v1" SEARCH_BASE_URL = "https://datacatalog.googleapis.com/v1/catalog:search" CLOUD_RESOURCE_MANAGER_BASE_URL = "https://cloudresourcemanager.googleapis.com/v3" @@ -41,8 +41,9 @@ MAX_DESC_SIZE_BYTES = 120 * 1024 MAX_WORKERS = 10 PAGE_SIZE = 1000 -PROJECT_NUMBER = "655216118709" +PROJECT_NUMBER = "418487367933" ROLE_STEWARD = "steward" +MAX_FOLDERS = 15 # -- BACKOFF Constants --- MAX_ATTEMPTS = 10 @@ -58,6 +59,7 @@ UNGROUPED_ENTRYLINKS_DIRECTORY = "ungrouped_entrylinks" LOGS_DIRECTORY = "logs" SUMMARY_DIRECTORY = "summary" +MIGRATION_FOLDER_PREFIX = "migration_folder_" MAX_BUCKETS = 20 MAX_POLLS = 12*12 # 12 hours diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/data_transformer.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/data_transformer.py index dd0308a..f2b0a49 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/data_transformer.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/data_transformer.py @@ -340,8 +340,8 @@ def build_entry_link_for_entry_to_term(context, dc_glossary_term_entry, dc_entry def process_entry_to_term_entrylinks(context: Context, dc_glossary_term_entry: GlossaryTaxonomyEntry, search_entry_result: SearchEntryResult) -> List[EntryLink]: """Processes asset relationships for a given search result and returns flat list of EntryLink objects.""" dataplex_entry_links: List[EntryLink] = [] - if not lookup_dataplex_entry(context, search_entry_result): - return dataplex_entry_links + # if not lookup_dataplex_entry(context, search_entry_result): + # return dataplex_entry_links dc_asset_entry_name = search_entry_result.relativeResourceName dc_linked_resource = normalize_linked_resource(search_entry_result.linkedResource) diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/dataplex_dao.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/dataplex_dao.py index 0cd6684..f3f3808 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/dataplex_dao.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/dataplex_dao.py @@ -123,7 +123,12 @@ def poll_metadata_job(service, project_id: str, location: str, job_id: str) -> b job_path = f"projects/{project_id}/locations/{location}/metadataJobs/{job_id}" for i in range(max_polls): - time.sleep(poll_interval) + try: + time.sleep(poll_interval) + except KeyboardInterrupt: + logger.warning(f"Job '{job_id}' polling interrupted by user.") + raise + job, state = get_job_and_state(service, job_path, job_id) if job is None: return False diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/gcs_dao.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/gcs_dao.py index aa729f1..8288fef 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/gcs_dao.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/gcs_dao.py @@ -1,10 +1,7 @@ - from google.cloud import storage -from gcs_dao import * import httplib2 import google_auth_httplib2 -from google.cloud import storage import logging_utils from migration_utils import * from constants import * @@ -12,31 +9,63 @@ logger = logging_utils.get_logger() -def prepare_gcs_bucket(gcs_bucket: str, file_path: str, filename: str) -> bool: - clear_bucket(gcs_bucket) - upload_to_gcs(gcs_bucket, file_path, filename) +def create_folders(bucket_name: str, folder_name: str) -> bool: + """Creates a zero-byte object to represent a folder prefix when missing.""" + normalized_folder = folder_name.strip("/") + prefix = f"{normalized_folder}/" + try: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + existing = list(bucket.list_blobs(prefix=prefix, max_results=1)) + if existing: + return True + blob = bucket.blob(prefix) + blob.upload_from_string("") + logger.debug(f"Created folder placeholder gs://{bucket_name}/{prefix}") + return True + except Exception as error: + logger.error("Failed to ensure folder '%s' in bucket '%s': %s", prefix, bucket_name, error) + return False + + +def ensure_folders_exist(bucket_name: str, folder_names: list[str]) -> bool: + """Ensures every folder prefix exists before uploads; returns False on first failure.""" + logger.info("Creating necessary folders in bucket '%s'...", bucket_name) + for folder_name in folder_names: + if not create_folders(bucket_name, folder_name): + return False return True -def upload_to_gcs(bucket_name: str, file_path: str, file_name: str) -> bool: +def prepare_gcs_bucket(gcs_bucket: str, folder_name: str, file_path: str, filename: str) -> bool: + destination_path = f"{folder_name.strip('/')}/{filename}" + if not create_folders(gcs_bucket, folder_name): + return False + if not clear_gcs_path_content(gcs_bucket, folder_name): + return False + return upload_to_gcs(gcs_bucket, file_path, destination_path) + + +def upload_to_gcs(bucket_name: str, file_path: str, destination_blob_name: str) -> bool: try: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) - blob = bucket.blob(file_name) + blob = bucket.blob(destination_blob_name) blob.upload_from_filename(file_path) - logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{file_name}") + logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{destination_blob_name}") return True except Exception as error: logger.error("Failed to upload '%s' to bucket '%s' with error '%s'", file_path, bucket_name, error) return False -def clear_bucket(bucket_name: str) -> bool: +def clear_gcs_path_content(bucket_name: str, folder_prefix: str = "") -> bool: """Deletes all objects in a bucket. Returns True on success, False on failure.""" try: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) - blobs = list(bucket.list_blobs()) + prefix = folder_prefix.strip("/") + "/" if folder_prefix else "" + blobs = list(bucket.list_blobs(prefix=prefix)) if prefix else list(bucket.list_blobs()) if not blobs: logger.debug(f"Bucket '{bucket_name}' is already empty.") return True @@ -48,11 +77,12 @@ def clear_bucket(bucket_name: str) -> bool: return False def build_dummy_payload(bucket_name): + folder_name = "permission-check" return { "type": "IMPORT", "import_spec": { "log_level": "DEBUG", - "source_storage_uri": f"gs://{bucket_name}/", + "source_storage_uri": f"gs://{bucket_name}/{folder_name}/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/payloads.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/payloads.py index 529a484..410fb0b 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/payloads.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/payloads.py @@ -6,10 +6,11 @@ logger = logging_utils.get_logger() -def build_import_spec_base(gcs_bucket: str) -> dict: +def build_import_spec_base(gcs_bucket: str, folder_name: str) -> dict: + normalized_folder = folder_name.strip("/") return { "log_level": "DEBUG", - "source_storage_uri": f"gs://{gcs_bucket}/", + "source_storage_uri": f"gs://{gcs_bucket}/{normalized_folder}/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL" } @@ -149,8 +150,8 @@ def build_entrylink_payload(file_path: str, project_id: str, import_spec_base: d return build_synonym_related_entrylink_payload(file_path, project_id, import_spec_base) -def build_payload(file_path: str, project_id: str, gcs_bucket: str): - import_spec_base = build_import_spec_base(gcs_bucket) +def build_payload(file_path: str, project_id: str, gcs_bucket: str, folder_name: str): + import_spec_base = build_import_spec_base(gcs_bucket, folder_name) filename = os.path.basename(file_path) if filename.startswith("glossary_"): return build_glossary_payload(filename, project_id, import_spec_base) diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_business_glossary_import_v2.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_business_glossary_import_v2.py index 77fa8b0..43a7c84 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_business_glossary_import_v2.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_business_glossary_import_v2.py @@ -46,7 +46,7 @@ def test_process_import_file_empty(monkeypatch, is_empty): monkeypatch.setattr(business_glossary_import_v2, "prepare_gcs_bucket", lambda *a, **kw: True) monkeypatch.setattr(business_glossary_import_v2, "create_and_monitor_job", lambda *a, **kw: True) - result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket") + result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket", "migration_folder_1") if is_empty: assert result is True else: @@ -58,7 +58,7 @@ def test_process_import_file_payload_missing(monkeypatch): # Simulate missing payload/job_id/job_location monkeypatch.setattr(business_glossary_import_v2, "build_payload", lambda *a, **kw: (None, None, None)) - result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket") + result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket", "migration_folder_1") assert result is False def test_process_import_file_gcs_upload_failed(monkeypatch): @@ -67,7 +67,7 @@ def test_process_import_file_gcs_upload_failed(monkeypatch): monkeypatch.setattr(business_glossary_import_v2, "build_payload", lambda *a, **kw: ("jobid", {"payload": True}, "location")) monkeypatch.setattr(business_glossary_import_v2, "prepare_gcs_bucket", lambda *a, **kw: False) - result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket") + result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket", "migration_folder_1") assert result is False def test_process_import_file_job_failed(monkeypatch): @@ -77,7 +77,7 @@ def test_process_import_file_job_failed(monkeypatch): monkeypatch.setattr(business_glossary_import_v2, "prepare_gcs_bucket", lambda *a, **kw: True) monkeypatch.setattr(business_glossary_import_v2, "create_and_monitor_job", lambda *a, **kw: False) - result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket") + result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket", "migration_folder_1") assert result is False def test_process_import_file_exception(monkeypatch): @@ -88,50 +88,16 @@ def test_process_import_file_exception(monkeypatch): def raise_exc(*a, **kw): raise Exception("fail") monkeypatch.setattr(business_glossary_import_v2, "create_and_monitor_job", raise_exc) - result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket") + result = business_glossary_import_v2.process_import_file("file.txt", "proj", "bucket", "migration_folder_1") assert result is False -def test__process_files_for_bucket_all_success(monkeypatch): - # All files processed successfully - monkeypatch.setattr(business_glossary_import_v2, "process_import_file", lambda f, p, b: True) - files = ["file1.txt", "file2.txt"] - project_id = "proj" - bucket = "bucket" - results = business_glossary_import_v2._process_files_for_bucket(files, project_id, bucket) - assert results == [True, True] - -def test__process_files_for_bucket_some_fail(monkeypatch): - # Alternate files fail - def mock_process_import_file(f, p, b): - return f == "file1.txt" - monkeypatch.setattr(business_glossary_import_v2, "process_import_file", mock_process_import_file) - files = ["file1.txt", "file2.txt"] - project_id = "proj" - bucket = "bucket" - results = business_glossary_import_v2._process_files_for_bucket(files, project_id, bucket) - assert results == [True, False] - -def test__process_files_for_bucket_exception(monkeypatch): - # process_import_file raises exception for one file - def mock_process_import_file(f, p, b): - if f == "file2.txt": - raise Exception("fail") - return True - monkeypatch.setattr(business_glossary_import_v2, "process_import_file", mock_process_import_file) - files = ["file1.txt", "file2.txt"] - project_id = "proj" - bucket = "bucket" - results = business_glossary_import_v2._process_files_for_bucket(files, project_id, bucket) - assert results == [True, False] -def test__process_files_for_bucket_empty(monkeypatch): - # No files to process - monkeypatch.setattr(business_glossary_import_v2, "process_import_file", lambda f, p, b: True) +def test_run_import_files_no_files(monkeypatch): files = [] project_id = "proj" - bucket = "bucket" - results = business_glossary_import_v2._process_files_for_bucket(files, project_id, bucket) - assert results == [] + buckets = ["bucket"] + result = business_glossary_import_v2.run_import_files(files, project_id, buckets) + assert result == [] def test_run_import_files_no_buckets(monkeypatch): files = ["file1.txt", "file2.txt"] @@ -140,68 +106,101 @@ def test_run_import_files_no_buckets(monkeypatch): result = business_glossary_import_v2.run_import_files(files, project_id, buckets) assert result == [False, False] -def test_run_import_files_round_robin_distribution(monkeypatch): - # Simulate _process_files_for_bucket returning True for each file - def mock_process_files_for_bucket(files_for_bucket, project_id, bucket): - # Return True for each file in the bucket - return [True for _ in files_for_bucket] - monkeypatch.setattr(business_glossary_import_v2, "_process_files_for_bucket", mock_process_files_for_bucket) +def test_run_import_files_single_bucket_folder_based(monkeypatch): + """Test that files are assigned to different folders within a single bucket.""" + monkeypatch.setattr(business_glossary_import_v2, "ensure_folders_exist", lambda b, f: True) + monkeypatch.setattr(business_glossary_import_v2, "import_files_with_threads", lambda pid, b, assignments, results, mw: results.extend([True, True])) + + files = ["file1.txt", "file2.txt"] + project_id = "proj" + buckets = ["bucketA"] + + result = business_glossary_import_v2.run_import_files(files, project_id, buckets) + assert len(result) == 2 + assert result == [True, True] - files = ["file1.txt", "file2.txt", "file3.txt", "file4.txt"] +def test_run_import_files_multiple_buckets_uses_first(monkeypatch): + """Test that only first bucket is used when multiple buckets provided.""" + monkeypatch.setattr(business_glossary_import_v2, "ensure_folders_exist", lambda b, f: True) + monkeypatch.setattr(business_glossary_import_v2, "import_files_with_threads", lambda pid, b, assignments, results, mw: results.extend([True, True])) + + files = ["file1.txt", "file2.txt"] project_id = "proj" buckets = ["bucketA", "bucketB"] - + result = business_glossary_import_v2.run_import_files(files, project_id, buckets) - # Should have one result per file, all True - assert result == [True, True, True, True] - -def test_run_import_files_bucket_mapping(monkeypatch): - # Track which files go to which bucket - bucket_calls = {} - def mock_process_files_for_bucket(files_for_bucket, project_id, bucket): - bucket_calls[bucket] = list(files_for_bucket) - return [True for _ in files_for_bucket] - monkeypatch.setattr(business_glossary_import_v2, "_process_files_for_bucket", mock_process_files_for_bucket) - - files = ["f1", "f2", "f3"] - buckets = ["b1", "b2"] - business_glossary_import_v2.run_import_files(files, "proj", buckets) - # Round robin: b1 gets f1, f3; b2 gets f2 - assert bucket_calls["b1"] == ["f1", "f3"] - assert bucket_calls["b2"] == ["f2"] - -def test_run_import_files_some_failures(monkeypatch): - # Simulate some files failing - def mock_process_files_for_bucket(files_for_bucket, project_id, bucket): - return [f == "file1.txt" for f in files_for_bucket] - monkeypatch.setattr(business_glossary_import_v2, "_process_files_for_bucket", mock_process_files_for_bucket) + assert len(result) == 2 +def test_run_import_files_folder_creation_fails(monkeypatch): + """Test that import fails if folder creation fails.""" + monkeypatch.setattr(business_glossary_import_v2, "ensure_folders_exist", lambda b, f: False) + files = ["file1.txt", "file2.txt"] + project_id = "proj" buckets = ["bucketA"] - result = business_glossary_import_v2.run_import_files(files, "proj", buckets) - assert result == [True, False] + + result = business_glossary_import_v2.run_import_files(files, project_id, buckets) + assert result == [False, False] -def test_run_import_files_empty_files(monkeypatch): - # No files to process - def mock_process_files_for_bucket(files_for_bucket, project_id, bucket): - return [] - monkeypatch.setattr(business_glossary_import_v2, "_process_files_for_bucket", mock_process_files_for_bucket) +def test_import_files_with_threads_success(monkeypatch): + """Test import_files_with_threads with successful imports.""" + results = [] + call_count = [0] + + def mock_process_import_file(file_path, project_id, bucket, folder_name): + call_count[0] += 1 + return True + + monkeypatch.setattr(business_glossary_import_v2, "process_import_file", mock_process_import_file) + + assignments = [("file1.txt", "migration_folder_1"), ("file2.txt", "migration_folder_2")] + project_id = "proj" + bucket = "bucketA" + max_workers = 2 + + business_glossary_import_v2.import_files_with_threads(project_id, bucket, assignments, results, max_workers) + assert len(results) == 2 + assert all(results) - files = [] - buckets = ["bucketA", "bucketB"] - result = business_glossary_import_v2.run_import_files(files, "proj", buckets) - assert result == [] +def test_import_files_with_threads_partial_failure(monkeypatch): + """Test import_files_with_threads with partial failures.""" + results = [] + + def mock_process_import_file(file_path, project_id, bucket, folder_name): + return file_path == "file1.txt" + + monkeypatch.setattr(business_glossary_import_v2, "process_import_file", mock_process_import_file) + + assignments = [("file1.txt", "migration_folder_1"), ("file2.txt", "migration_folder_2")] + project_id = "proj" + bucket = "bucketA" + max_workers = 2 + + business_glossary_import_v2.import_files_with_threads(project_id, bucket, assignments, results, max_workers) + assert len(results) == 2 + assert results.count(True) == 1 + assert results.count(False) == 1 + +def test_import_files_with_threads_keyboard_interrupt(monkeypatch): + """Test import_files_with_threads handles keyboard interrupt.""" + results = [] + + def mock_process_import_file(file_path, project_id, bucket, folder_name): + if file_path == "file2.txt": + raise KeyboardInterrupt() + return True + + monkeypatch.setattr(business_glossary_import_v2, "process_import_file", mock_process_import_file) + + assignments = [("file1.txt", "migration_folder_1"), ("file2.txt", "migration_folder_2")] + project_id = "proj" + bucket = "bucketA" + max_workers = 2 + + with pytest.raises(KeyboardInterrupt): + business_glossary_import_v2.import_files_with_threads(project_id, bucket, assignments, results, max_workers) -def test_run_import_files_bucket_worker_returns_none(monkeypatch): - # Simulate worker returning None (should be treated as empty list) - def mock_process_files_for_bucket(files_for_bucket, project_id, bucket): - return None - monkeypatch.setattr(business_glossary_import_v2, "_process_files_for_bucket", mock_process_files_for_bucket) - files = ["file1.txt", "file2.txt"] - buckets = ["bucketA", "bucketB"] - result = business_glossary_import_v2.run_import_files(files, "proj", buckets) - assert result == [] def test_filter_files_for_phases_entrylinks_all_pass(monkeypatch): # All files pass dependency check diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_dataplex_dao.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_dataplex_dao.py index eaa9088..c30089e 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_dataplex_dao.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_dataplex_dao.py @@ -231,6 +231,25 @@ def get_job_and_state_side_effect(*args, **kwargs): assert result is True mock_logger.info.assert_any_call("Job 'testjob-1234' SUCCEEDED.") +def test_poll_metadata_job_keyboard_interrupt(monkeypatch): + """Test that keyboard interrupt is properly raised during polling.""" + mock_service = MagicMock() + mock_logger = MagicMock() + # Patch sleep to raise KeyboardInterrupt + def mock_sleep(interval): + raise KeyboardInterrupt() + + monkeypatch.setattr(dataplex_dao, "get_job_and_state", MagicMock(return_value=({"status": {"state": "QUEUED"}}, "QUEUED"))) + monkeypatch.setattr(dataplex_dao, "is_job_succeeded", MagicMock(return_value=False)) + monkeypatch.setattr(dataplex_dao, "is_job_failed", MagicMock(return_value=False)) + monkeypatch.setattr(dataplex_dao, "logger", mock_logger) + monkeypatch.setattr(dataplex_dao.time, "sleep", mock_sleep) + + with pytest.raises(KeyboardInterrupt): + dataplex_dao.poll_metadata_job(mock_service, "test-project", "us-central1", "testjob-1234") + + mock_logger.warning.assert_called_with("Job 'testjob-1234' polling interrupted by user.") + def test_get_job_and_state_success(monkeypatch): mock_service = MagicMock() mock_logger = MagicMock() diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_gcs_dao.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_gcs_dao.py index 5ca05f4..a8ac4b9 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_gcs_dao.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_gcs_dao.py @@ -36,31 +36,50 @@ def test_upload_failure(monkeypatch, mock_storage_client, mock_logger): mock_logger.error.assert_called_once() assert result is False -def test_clear_bucket_empty(monkeypatch, mock_storage_client, mock_logger): + +# --- clear_gcs_path tests --- +def test_clear_gcs_path_bucket_empty(monkeypatch, mock_storage_client, mock_logger): mock_bucket = MagicMock() mock_storage_client.bucket.return_value = mock_bucket mock_bucket.list_blobs.return_value = [] - result = clear_bucket("empty-bucket") - mock_logger.debug.assert_called_once_with("Bucket 'empty-bucket' is already empty.") + result = clear_gcs_path_content("empty-bucket") + mock_logger.debug.assert_called() assert result is True -def test_clear_bucket_with_blobs(monkeypatch, mock_storage_client, mock_logger): +def test_clear_gcs_path_bucket_with_blobs(monkeypatch, mock_storage_client, mock_logger): mock_bucket = MagicMock() mock_blob1 = MagicMock() mock_blob2 = MagicMock() mock_storage_client.bucket.return_value = mock_bucket mock_bucket.list_blobs.return_value = [mock_blob1, mock_blob2] - result = clear_bucket("non-empty-bucket") + result = clear_gcs_path_content("non-empty-bucket") mock_bucket.delete_blobs.assert_called_once_with([mock_blob1, mock_blob2]) - mock_logger.debug.assert_called_with("Deleted 2 objects from bucket 'non-empty-bucket'.") assert result is True -def test_clear_bucket_exception(monkeypatch, mock_storage_client, mock_logger): +def test_clear_gcs_path_prefix_empty(monkeypatch, mock_storage_client, mock_logger): + mock_bucket = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] + result = clear_gcs_path_content("bucket", "some-folder") + mock_logger.debug.assert_called() + assert result is True + +def test_clear_gcs_path_prefix_with_blobs(monkeypatch, mock_storage_client, mock_logger): + mock_bucket = MagicMock() + mock_blob1 = MagicMock() + mock_blob2 = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [mock_blob1, mock_blob2] + result = clear_gcs_path_content("bucket", "some-folder") + mock_bucket.delete_blobs.assert_called_once_with([mock_blob1, mock_blob2]) + assert result is True + +def test_clear_gcs_path_exception(monkeypatch, mock_storage_client, mock_logger): mock_bucket = MagicMock() mock_storage_client.bucket.return_value = mock_bucket mock_bucket.list_blobs.side_effect = Exception("List failed") - result = clear_bucket("fail-bucket") - mock_logger.error.assert_called_once() + result = clear_gcs_path_content("fail-bucket") + mock_logger.error.assert_called() assert result is False def test_upload_to_gcs_success(monkeypatch, mock_storage_client, mock_logger): @@ -86,77 +105,140 @@ def test_upload_to_gcs_failure(monkeypatch, mock_storage_client, mock_logger): assert result is False def test_prepare_gcs_bucket_success(monkeypatch): - clear_bucket_called = [] - upload_to_gcs_called = [] - - def mock_clear_bucket(bucket): - clear_bucket_called.append(bucket) - return True + """Test prepare_gcs_bucket with successful folder creation and upload.""" + mock_create_folders = MagicMock(return_value=True) + mock_clear_path = MagicMock(return_value=True) + mock_upload = MagicMock(return_value=True) - def mock_upload_to_gcs(bucket, file_path, filename): - upload_to_gcs_called.append((bucket, file_path, filename)) - return True + monkeypatch.setattr("gcs_dao.create_folders", mock_create_folders) + monkeypatch.setattr("gcs_dao.clear_gcs_path_content", mock_clear_path) + monkeypatch.setattr("gcs_dao.upload_to_gcs", mock_upload) - monkeypatch.setattr("gcs_dao.clear_bucket", mock_clear_bucket) - monkeypatch.setattr("gcs_dao.upload_to_gcs", mock_upload_to_gcs) - - result = prepare_gcs_bucket("bucket1", "/tmp/file.txt", "file.txt") + result = prepare_gcs_bucket("bucket1", "migration_folder_1", "/tmp/file.txt", "file.txt") assert result is True - assert clear_bucket_called == ["bucket1"] - assert upload_to_gcs_called == [("bucket1", "/tmp/file.txt", "file.txt")] - -def test_prepare_gcs_bucket_clear_bucket_fails(monkeypatch): - def mock_clear_bucket(bucket): + assert mock_create_folders.call_count == 1 + assert mock_clear_path.call_count == 1 + assert mock_upload.call_count == 1 + # Check that destination path includes the folder name + assert mock_upload.call_args[0][2] == "migration_folder_1/file.txt" + +def test_prepare_gcs_bucket_folder_creation_fails(monkeypatch): + """Test prepare_gcs_bucket when folder creation fails.""" + def mock_create_folders(bucket, folder): return False - def mock_upload_to_gcs(bucket, file_path, filename): + def mock_upload_to_gcs(bucket, file_path, destination_path): return True - monkeypatch.setattr("gcs_dao.clear_bucket", mock_clear_bucket) + monkeypatch.setattr("gcs_dao.create_folders", mock_create_folders) monkeypatch.setattr("gcs_dao.upload_to_gcs", mock_upload_to_gcs) - # prepare_gcs_bucket always returns True, even if clear_bucket fails - result = prepare_gcs_bucket("bucket2", "/tmp/file2.txt", "file2.txt") - assert result is True + result = prepare_gcs_bucket("bucket2", "migration_folder_1", "/tmp/file2.txt", "file2.txt") + assert result is False -def test_prepare_gcs_bucket_upload_to_gcs_fails(monkeypatch): - def mock_clear_bucket(bucket): +def test_prepare_gcs_bucket_upload_fails(monkeypatch): + """Test prepare_gcs_bucket when upload fails.""" + def mock_create_folders(bucket, folder): return True - def mock_upload_to_gcs(bucket, file_path, filename): + def mock_upload_to_gcs(bucket, file_path, destination_path): return False - monkeypatch.setattr("gcs_dao.clear_bucket", mock_clear_bucket) + monkeypatch.setattr("gcs_dao.create_folders", mock_create_folders) + monkeypatch.setattr("gcs_dao.clear_gcs_path_content", MagicMock(return_value=True)) monkeypatch.setattr("gcs_dao.upload_to_gcs", mock_upload_to_gcs) - # prepare_gcs_bucket always returns True, even if upload_to_gcs fails - result = prepare_gcs_bucket("bucket3", "/tmp/file3.txt", "file3.txt") + result = prepare_gcs_bucket("bucket3", "migration_folder_1", "/tmp/file3.txt", "file3.txt") + assert result is False + +def test_ensure_folder_exists_success(monkeypatch, mock_storage_client, mock_logger): + """Test ensure_folder_exists when folder doesn't exist.""" + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] # Folder doesn't exist + mock_bucket.blob.return_value = mock_blob + + result = create_folders("test-bucket", "my-folder") + mock_bucket.blob.assert_called_once_with("my-folder/") + mock_blob.upload_from_string.assert_called_once_with("") + mock_logger.debug.assert_called_once() assert result is True -def test_check_all_buckets_permissions_all_success(monkeypatch): - called_buckets = [] - def mock_check_gcs_permissions(bucket): - called_buckets.append(bucket) - return True +def test_ensure_folder_exists_already_exists(monkeypatch, mock_storage_client, mock_logger): + """Test ensure_folder_exists when folder already exists.""" + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [mock_blob] # Folder exists - monkeypatch.setattr("gcs_dao.check_gcs_permissions", mock_check_gcs_permissions) - buckets = ["bucket1", "bucket2", "bucket3"] - result = check_all_buckets_permissions(buckets) + result = create_folders("test-bucket", "existing-folder") + mock_bucket.blob.assert_not_called() assert result is True - assert called_buckets == buckets -def test_check_all_buckets_permissions_one_failure(monkeypatch): - def mock_check_gcs_permissions(bucket): - return bucket != "bucket2" +def test_ensure_folder_exists_with_trailing_slash(monkeypatch, mock_storage_client, mock_logger): + """Test ensure_folder_exists normalizes folder names.""" + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] - monkeypatch.setattr("gcs_dao.check_gcs_permissions", mock_check_gcs_permissions) - buckets = ["bucket1", "bucket2", "bucket3"] - result = check_all_buckets_permissions(buckets) + result = create_folders("test-bucket", "my-folder/") + mock_bucket.blob.assert_called_once_with("my-folder/") + assert result is True + +def test_ensure_folder_exists_exception(monkeypatch, mock_storage_client, mock_logger): + """Test ensure_folder_exists handles exceptions.""" + mock_bucket = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.side_effect = Exception("List failed") + + result = create_folders("test-bucket", "fail-folder") + mock_logger.error.assert_called_once() + assert result is False + +def test_ensure_folders_exist_all_success(monkeypatch, mock_storage_client, mock_logger): + """Test ensure_folders_exist when all folders are created successfully.""" + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] + mock_bucket.blob.return_value = mock_blob + + folder_names = ["migration_folder_1", "migration_folder_2", "migration_folder_3"] + result = ensure_folders_exist("test-bucket", folder_names) + assert result is True + assert mock_bucket.blob.call_count == 3 + +def test_ensure_folders_exist_one_fails(monkeypatch, mock_storage_client, mock_logger): + """Test ensure_folders_exist stops on first failure.""" + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + + # First succeeds, second fails + call_count = [0] + def side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] == 2: + raise Exception("Creation failed") + return [] + + mock_bucket.list_blobs.side_effect = side_effect + mock_bucket.blob.return_value = mock_blob + + folder_names = ["folder1", "folder2", "folder3"] + result = ensure_folders_exist("test-bucket", folder_names) assert result is False -def test_check_all_buckets_permissions_empty_list(monkeypatch): +def test_ensure_folders_exist_empty_list(mock_storage_client, mock_logger): + """Test ensure_folders_exist with empty list.""" + result = ensure_folders_exist("test-bucket", []) + assert result is True + +def test_check_all_buckets_permissions_empty_list_single(monkeypatch): # Should return True for empty list - result = check_all_buckets_permissions([]) + result = check_all_buckets_permissions([], "123456789") assert result is True @@ -200,12 +282,6 @@ def test_check_all_buckets_permissions_empty_list(monkeypatch, mock_get_dataplex result = check_all_buckets_permissions(buckets, project_number) assert result is True -@pytest.fixture -def mock_logger(monkeypatch): - mock_logger = MagicMock() - monkeypatch.setattr("gcs_dao.logger", mock_logger) - return mock_logger - @pytest.fixture def mock_build_dummy_payload(monkeypatch): monkeypatch.setattr("gcs_dao.build_dummy_payload", MagicMock(return_value={"dummy": "payload"})) @@ -248,7 +324,7 @@ def test_build_dummy_payload_basic(): assert "import_spec" in payload import_spec = payload["import_spec"] assert import_spec["log_level"] == "DEBUG" - assert import_spec["source_storage_uri"] == f"gs://{bucket_name}/" + assert import_spec["source_storage_uri"] == f"gs://{bucket_name}/permission-check/" assert import_spec["entry_sync_mode"] == "FULL" assert import_spec["aspect_sync_mode"] == "INCREMENTAL" assert "scope" in import_spec @@ -260,7 +336,7 @@ def test_build_dummy_payload_basic(): def test_build_dummy_payload_with_different_bucket_names(): for bucket_name in ["bucket1", "bucket-2", "bucket_3"]: payload = build_dummy_payload(bucket_name) - assert payload["import_spec"]["source_storage_uri"] == f"gs://{bucket_name}/" + assert payload["import_spec"]["source_storage_uri"] == f"gs://{bucket_name}/permission-check/" def test_build_dummy_payload_structure(): bucket_name = "test-bucket" @@ -283,10 +359,3 @@ def test_build_dummy_payload_structure(): assert isinstance(scope["glossaries"], list) assert len(scope["glossaries"]) == 1 assert scope["glossaries"][0].startswith("projects/dummy-project-id/locations/global/glossaries/") - - - - - - - diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_payloads.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_payloads.py new file mode 100644 index 0000000..8463c35 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/migration/test_payloads.py @@ -0,0 +1,256 @@ +import pytest +from unittest.mock import MagicMock +import payloads + +class TestBuildImportSpecBase: + def test_build_import_spec_base_success(self): + """Test basic payload building with folder name.""" + gcs_bucket = "my-bucket" + folder_name = "migration_folder_1" + + result = payloads.build_import_spec_base(gcs_bucket, folder_name) + + assert result["log_level"] == "DEBUG" + assert result["source_storage_uri"] == "gs://my-bucket/migration_folder_1/" + assert result["entry_sync_mode"] == "FULL" + assert result["aspect_sync_mode"] == "INCREMENTAL" + + def test_build_import_spec_base_strips_trailing_slash(self): + """Test that trailing slashes are stripped from folder names.""" + gcs_bucket = "my-bucket" + folder_name = "migration_folder_1/" + + result = payloads.build_import_spec_base(gcs_bucket, folder_name) + + assert result["source_storage_uri"] == "gs://my-bucket/migration_folder_1/" + + def test_build_import_spec_base_complex_bucket_names(self): + """Test with various bucket name formats.""" + test_cases = [ + ("bucket-1", "folder1"), + ("bucket_2", "folder-2"), + ("my.bucket", "folder_3"), + ] + + for bucket, folder in test_cases: + result = payloads.build_import_spec_base(bucket, folder) + assert result["source_storage_uri"] == f"gs://{bucket}/{folder}/" + +class TestExtractJobLocationFromEntryGroup: + def test_extract_location_success(self): + """Test extracting location from entry group.""" + entry_group = "projects/123/locations/us-central1/entryGroups/@dataplex" + result = payloads.extract_job_location_from_entry_group(entry_group) + assert result == "us-central1" + + def test_extract_location_global(self): + """Test extraction with global location.""" + entry_group = "projects/123/locations/global/entryGroups/@dataplex" + result = payloads.extract_job_location_from_entry_group(entry_group) + assert result == "global" + + def test_extract_location_default_on_none(self): + """Test that default 'global' is returned for None input.""" + result = payloads.extract_job_location_from_entry_group(None) + assert result == "global" + + def test_extract_location_default_on_empty_string(self): + """Test that default 'global' is returned for empty string.""" + result = payloads.extract_job_location_from_entry_group("") + assert result == "global" + + def test_extract_location_no_match(self): + """Test that default 'global' is returned when pattern doesn't match.""" + entry_group = "invalid/format" + result = payloads.extract_job_location_from_entry_group(entry_group) + assert result == "global" + +class TestExtractScopes: + def test_extract_scopes_from_entry_references_single_scope(self): + """Test extracting project scope from entry references.""" + data = { + "entryLink": { + "entryReferences": [ + {"name": "projects/75037423216/locations/global/glossaries/test/terms/term1"} + ] + } + } + result = payloads.extract_scopes_from_entry_references(data) + assert "projects/75037423216" in result + assert len(result) == 1 + + def test_extract_scopes_from_entry_references_multiple_references(self): + """Test extracting scope from first reference only.""" + data = { + "entryLink": { + "entryReferences": [ + {"name": "projects/111/locations/global/glossaries/test/terms/term1"}, + {"name": "projects/222/locations/global/glossaries/test/terms/term2"} + ] + } + } + result = payloads.extract_scopes_from_entry_references(data) + # Should only extract from first reference + assert "projects/111" in result + assert len(result) == 1 + + def test_extract_scopes_empty_references(self): + """Test with no entry references.""" + data = { + "entryLink": { + "entryReferences": [] + } + } + result = payloads.extract_scopes_from_entry_references(data) + assert len(result) == 0 + + def test_extract_scopes_no_entry_link(self): + """Test when entryLink is missing.""" + data = {} + result = payloads.extract_scopes_from_entry_references(data) + assert len(result) == 0 + +class TestBuildPayload: + def test_build_glossary_payload(self, monkeypatch): + """Test building payload for glossary files.""" + filename = "glossary_my-glossary.json" + project_id = "test-project" + gcs_bucket = "test-bucket" + folder_name = "migration_folder_1" + + job_id, payload, location = payloads.build_payload(f"/path/{filename}", project_id, gcs_bucket, folder_name) + + assert job_id == "glossary-my-glossary" + assert payload["type"] == "IMPORT" + assert payload["import_spec"]["source_storage_uri"] == "gs://test-bucket/migration_folder_1/" + assert location == "global" + assert f"projects/{project_id}/locations/global/glossaries/my-glossary" in str(payload) + + def test_build_entrylink_payload_definition(self, monkeypatch): + """Test building payload for definition entrylinks.""" + # Mock get_link_type to return definition + monkeypatch.setattr(payloads, "get_link_type", lambda x: "definition") + monkeypatch.setattr(payloads, "get_entry_group", lambda x: "projects/test/locations/us-central1/entryGroups/test") + monkeypatch.setattr(payloads, "build_defintion_referenced_entry_scopes", lambda x, y: ["projects/test"]) + + filename = "entrylinks_definition.json" + project_id = "test-project" + gcs_bucket = "test-bucket" + folder_name = "migration_folder_1" + + job_id, payload, location = payloads.build_payload(f"/path/{filename}", project_id, gcs_bucket, folder_name) + + assert payload["type"] == "IMPORT" + assert "entry_groups" in payload["import_spec"]["scope"] + assert "entry_link_types" in payload["import_spec"]["scope"] + + def test_build_unknown_file_type(self, monkeypatch): + """Test that unknown file types return None.""" + filename = "unknown_file.json" + project_id = "test-project" + gcs_bucket = "test-bucket" + folder_name = "migration_folder_1" + + monkeypatch.setattr(payloads, "logger", MagicMock()) + + job_id, payload, location = payloads.build_payload(f"/path/{filename}", project_id, gcs_bucket, folder_name) + + assert job_id is None + assert payload is None + assert location is None + +class TestBuildGlossaryPayload: + def test_build_glossary_payload_success(self): + """Test building glossary payload.""" + filename = "glossary_my-test-glossary.json" + project_id = "my-project" + import_spec_base = payloads.build_import_spec_base("bucket", "folder") + + job_id, payload, location = payloads.build_glossary_payload(filename, project_id, import_spec_base) + + assert job_id == "glossary-my-test-glossary" + assert location == "global" + assert payload["type"] == "IMPORT" + assert "scope" in payload["import_spec"] + assert f"projects/{project_id}/locations/global/glossaries/my-test-glossary" in payload["import_spec"]["scope"]["glossaries"][0] + + def test_build_glossary_payload_id_conversion(self): + """Test that underscores are converted to hyphens in glossary IDs.""" + filename = "glossary_my_test_glossary.json" + project_id = "project" + import_spec_base = {} + + job_id, payload, location = payloads.build_glossary_payload(filename, project_id, import_spec_base) + + assert job_id == "glossary-my-test-glossary" + +class TestBuildDefinitionEntryLinkPayload: + def test_build_definition_payload(self, monkeypatch): + """Test building definition entrylink payload.""" + file_path = "/path/entrylinks_definition.json" + project_id = "test-project" + import_spec_base = payloads.build_import_spec_base("bucket", "folder") + + # Mock required functions + monkeypatch.setattr(payloads, "get_entry_group", lambda x: "projects/test/locations/us-central1/entryGroups/test") + monkeypatch.setattr(payloads, "build_defintion_referenced_entry_scopes", lambda x, y: ["projects/test"]) + + job_id, payload, location = payloads.build_definition_entrylink_payload(file_path, project_id, import_spec_base) + + assert payload["type"] == "IMPORT" + assert "entry_groups" in payload["import_spec"]["scope"] + assert location == "us-central1" + +class TestBuildSynonymRelatedPayload: + def test_build_synonym_related_payload(self, monkeypatch): + """Test building synonym/related entrylink payload.""" + file_path = "/path/entrylinks_related_synonyms.json" + project_id = "test-project" + import_spec_base = payloads.build_import_spec_base("bucket", "folder") + + # Mock required functions + monkeypatch.setattr(payloads, "build_related_synonym_referenced_entry_scopes", lambda x, y: ["projects/ref1", "projects/ref2"]) + + job_id, payload, location = payloads.build_synonym_related_entrylink_payload(file_path, project_id, import_spec_base) + + assert payload["type"] == "IMPORT" + assert location == "global" + assert "entry_link_types" in payload["import_spec"]["scope"] + # Should have both synonym and related link types + entry_link_types = payload["import_spec"]["scope"]["entry_link_types"] + assert any("synonym" in t for t in entry_link_types) + assert any("related" in t for t in entry_link_types) + +class TestGetLinkType: + def test_get_link_type_definition(self, monkeypatch): + """Test getting definition link type.""" + monkeypatch.setattr(payloads, "read_first_json_line", lambda x: { + "entryLink": {"entryLinkType": "definition"} + }) + + result = payloads.get_link_type("test.json") + assert result == "definition" + + def test_get_link_type_synonym(self, monkeypatch): + """Test getting synonym link type.""" + monkeypatch.setattr(payloads, "read_first_json_line", lambda x: { + "entryLink": {"entryLinkType": "synonym"} + }) + + result = payloads.get_link_type("test.json") + assert result == "synonym" + + def test_get_link_type_missing(self, monkeypatch): + """Test when link type is missing.""" + monkeypatch.setattr(payloads, "read_first_json_line", lambda x: {}) + + result = payloads.get_link_type("test.json") + # Can return None or empty string based on implementation + assert result is None or result == "" + + def test_get_link_type_no_data(self, monkeypatch): + """Test when no JSON data is found.""" + monkeypatch.setattr(payloads, "read_first_json_line", lambda x: None) + + result = payloads.get_link_type("test.json") + assert result is None