Skip to content
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from itertools import cycle
from typing import Dict, List
from typing import List
import logging_utils
from file_utils import *
from gcs_dao import prepare_gcs_bucket
from gcs_dao import prepare_gcs_bucket, ensure_folders_exist
from dataplex_dao import get_dataplex_service, create_and_monitor_job
from payloads import *
from constants import MIGRATION_FOLDER_PREFIX
import os
from file_utils import *
logger = logging_utils.get_logger()
Expand All @@ -21,7 +21,7 @@ def get_referenced_scopes(file_path: str, main_project_id: str) -> list:
return list(scopes)


def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> bool:
def process_import_file(file_path: str, project_id: str, gcs_bucket: str, folder_name: str) -> bool:
"""
Processes a single glossary or entrylink file:
- Builds payload
Expand All @@ -40,13 +40,13 @@ def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> boo

service = get_dataplex_service()

job_id, payload, job_location = build_payload(file_path, project_id, gcs_bucket)
job_id, payload, job_location = build_payload(file_path, project_id, gcs_bucket, folder_name)
if not payload or not job_id or not job_location:
return False

try:
# Upload file to GCS first; only continue if upload succeeded
upload_status = prepare_gcs_bucket(gcs_bucket, file_path, filename)
upload_status = prepare_gcs_bucket(gcs_bucket, folder_name, file_path, filename)
if not upload_status:
logger.error(f"Failed to prepare GCS bucket '{gcs_bucket}' for file '{filename}'. Skipping import.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets log folder_name as well

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return False
Expand All @@ -60,45 +60,62 @@ def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> boo
return False


def _process_files_for_bucket(files_for_bucket: List[str], project_id: str, bucket: str) -> List[bool]:
"""Worker: processes all files assigned to a bucket sequentially."""
results = []
for f in files_for_bucket:
try:
result = process_import_file(f, project_id, bucket)
except Exception as e:
logger.error(f"Error processing file {f} in bucket {bucket}: {e}")
logger.debug(f"Error processing file {f} in bucket {bucket}: {e}", exc_info=True)
result = False
results.append(result)
return results


def run_import_files(files: List[str], project_id: str, buckets: List[str]) -> List[bool]:
"""
Distribute files round-robin to buckets, start one worker per bucket and process sequentially
to guarantee no two threads operate on the same bucket.
"""
"""Runs import for multiple files using ThreadPoolExecutor for concurrency."""
if not files:
return []
if not buckets:
logger.error("No buckets provided to run_import_files.")
return [False] * len(files)

bucket_file_map: Dict[str, List[str]] = {b: [] for b in buckets}
cycler = cycle(buckets)
for f in files:
b = next(cycler)
bucket_file_map[b].append(f)
bucket = buckets[0]
if len(buckets) > 1:
logger.warning(f"Multiple buckets provided; using '{bucket}' with folder-based uploads.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from now on we will use only a single bucket no matter how many buckets the user has provided?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for the customers whoever is using this PR. We don't merge the PR though.


folder_names = [f"{MIGRATION_FOLDER_PREFIX}{idx+1}" for idx in range(len(files))]
if not ensure_folders_exist(bucket, folder_names):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure_folder_exists actually creates the folders idempotently, we should rename is to create_folders

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

logger.error(f"Unable to ensure migration folders exist in bucket '{bucket}'.")
return [False] * len(files)

assignments = list(zip(files, folder_names))
logger.info(f"Starting import of {len(files)} files into bucket '{bucket}' using {len(folder_names)} folders.")

results: List[bool] = []
with ThreadPoolExecutor(max_workers=len(buckets)) as executor:
max_workers = min(15, len(assignments))
import_files_with_threads(project_id, bucket, assignments, results, max_workers) # Re-raise to propagate the interrupt

return results

def import_files_with_threads(project_id, bucket, assignments, results, max_workers):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(_process_files_for_bucket, bucket_file_map[bucket], project_id, bucket): bucket
for bucket in buckets
executor.submit(process_import_file, file_path, project_id, bucket, folder_name): (file_path, folder_name)
for file_path, folder_name in assignments
}
for future in as_completed(future_map):
bucket_results = future.result() or []
results.extend(bucket_results)
return results

try:
for future in as_completed(future_map):
file_path, folder_name = future_map[future]
try:
result = future.result()
except Exception as e:
logger.error(f"Error processing file {file_path} in bucket {bucket} folder {folder_name}: {e}")
logger.debug(f"Error processing file {file_path} in bucket {bucket} folder {folder_name}: {e}", exc_info=True)
result = False
results.append(result)
except KeyboardInterrupt:
logger.warning("\n*** Import interrupted by user (Ctrl+C) ***")
logger.info("Cancelling pending imports...")

# Cancel all pending futures
for future in future_map.keys():
if not future.done():
future.cancel()

# Shutdown without waiting for threads
executor.shutdown(wait=False)
logger.info("Import process terminated by user")
raise


def filter_files_for_phases(phase_name: str, exported_files: List[str]) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
UNGROUPED_ENTRYLINKS_DIRECTORY = "ungrouped_entrylinks"
LOGS_DIRECTORY = "logs"
SUMMARY_DIRECTORY = "summary"
MIGRATION_FOLDER_PREFIX = "migration_folder_"

MAX_BUCKETS = 20
MAX_POLLS = 12*12 # 12 hours
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ def poll_metadata_job(service, project_id: str, location: str, job_id: str) -> b
job_path = f"projects/{project_id}/locations/{location}/metadataJobs/{job_id}"

for i in range(max_polls):
time.sleep(poll_interval)
try:
time.sleep(poll_interval)
except KeyboardInterrupt:
logger.warning(f"Job '{job_id}' polling interrupted by user.")
raise

job, state = get_job_and_state(service, job_path, job_id)
if job is None:
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,56 @@

from google.cloud import storage
from gcs_dao import *

import httplib2
import google_auth_httplib2
from google.cloud import storage
import logging_utils
from migration_utils import *
from constants import *
from dataplex_dao import *
logger = logging_utils.get_logger()


def prepare_gcs_bucket(gcs_bucket: str, file_path: str, filename: str) -> bool:
clear_bucket(gcs_bucket)
upload_to_gcs(gcs_bucket, file_path, filename)
def ensure_folder_exists(bucket_name: str, folder_name: str) -> bool:
"""Creates a zero-byte object to represent a folder prefix when missing."""
normalized_folder = folder_name.strip("/")
prefix = f"{normalized_folder}/"
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
existing = list(bucket.list_blobs(prefix=prefix, max_results=1))
if existing:
return True
blob = bucket.blob(prefix)
blob.upload_from_string("")
logger.debug(f"Created folder placeholder gs://{bucket_name}/{prefix}")
return True
except Exception as error:
logger.error("Failed to ensure folder '%s' in bucket '%s': %s", prefix, bucket_name, error)
return False


def ensure_folders_exist(bucket_name: str, folder_names: list[str]) -> bool:
"""Ensures every folder prefix exists before uploads; returns False on first failure."""
logger.info("Creating necessary folders in bucket '%s'...", bucket_name)
for folder_name in folder_names:
if not ensure_folder_exists(bucket_name, folder_name):
return False
return True


def upload_to_gcs(bucket_name: str, file_path: str, file_name: str) -> bool:
def prepare_gcs_bucket(gcs_bucket: str, folder_name: str, file_path: str, filename: str) -> bool:
destination_path = f"{folder_name.strip('/')}/{filename}"
if not ensure_folder_exists(gcs_bucket, folder_name):
return False
return upload_to_gcs(gcs_bucket, file_path, destination_path)


def upload_to_gcs(bucket_name: str, file_path: str, destination_blob_name: str) -> bool:
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(file_path)
logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{file_name}")
logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{destination_blob_name}")
return True
except Exception as error:
logger.error("Failed to upload '%s' to bucket '%s' with error '%s'", file_path, bucket_name, error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets log destination_blob_name as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have the destination blob name log in generated logs file, do we need it in main logs (Directly displayed to user while script is running) also?

Expand All @@ -48,11 +74,12 @@ def clear_bucket(bucket_name: str) -> bool:
return False

def build_dummy_payload(bucket_name):
folder_name = "permission-check"
return {
"type": "IMPORT",
"import_spec": {
"log_level": "DEBUG",
"source_storage_uri": f"gs://{bucket_name}/",
"source_storage_uri": f"gs://{bucket_name}/{folder_name}/",
"entry_sync_mode": "FULL",
"aspect_sync_mode": "INCREMENTAL",
"scope": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

logger = logging_utils.get_logger()

def build_import_spec_base(gcs_bucket: str) -> dict:
def build_import_spec_base(gcs_bucket: str, folder_name: str) -> dict:
normalized_folder = folder_name.strip("/")
return {
"log_level": "DEBUG",
"source_storage_uri": f"gs://{gcs_bucket}/",
"source_storage_uri": f"gs://{gcs_bucket}/{normalized_folder}/",
"entry_sync_mode": "FULL",
"aspect_sync_mode": "INCREMENTAL"
}
Expand Down Expand Up @@ -149,8 +150,8 @@ def build_entrylink_payload(file_path: str, project_id: str, import_spec_base: d
return build_synonym_related_entrylink_payload(file_path, project_id, import_spec_base)


def build_payload(file_path: str, project_id: str, gcs_bucket: str):
import_spec_base = build_import_spec_base(gcs_bucket)
def build_payload(file_path: str, project_id: str, gcs_bucket: str, folder_name: str):
import_spec_base = build_import_spec_base(gcs_bucket, folder_name)
filename = os.path.basename(file_path)
if filename.startswith("glossary_"):
return build_glossary_payload(filename, project_id, import_spec_base)
Expand Down
Loading
Loading