Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from itertools import cycle
from typing import Dict, List
from typing import List
import logging_utils
from file_utils import *
from gcs_dao import prepare_gcs_bucket
from gcs_dao import prepare_gcs_bucket, ensure_folders_exist
from dataplex_dao import get_dataplex_service, create_and_monitor_job
from payloads import *
from constants import MIGRATION_FOLDER_PREFIX, MAX_FOLDERS
import os
from file_utils import *
logger = logging_utils.get_logger()
Expand All @@ -21,7 +21,7 @@ def get_referenced_scopes(file_path: str, main_project_id: str) -> list:
return list(scopes)


def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> bool:
def process_import_file(file_path: str, project_id: str, gcs_bucket: str, folder_name: str) -> bool:
"""
Processes a single glossary or entrylink file:
- Builds payload
Expand All @@ -40,13 +40,14 @@ def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> boo

service = get_dataplex_service()

job_id, payload, job_location = build_payload(file_path, project_id, gcs_bucket)
job_id, payload, job_location = build_payload(file_path, project_id, gcs_bucket, folder_name)
if not payload or not job_id or not job_location:
return False

try:
# Upload file to GCS first; only continue if upload succeeded
upload_status = prepare_gcs_bucket(gcs_bucket, file_path, filename)
upload_status = prepare_gcs_bucket(gcs_bucket, folder_name, file_path, filename)
logger.debug(f"Upload status for file '{filename}': {upload_status} to bucket '{gcs_bucket}' in folder '{folder_name}'")
if not upload_status:
logger.error(f"Failed to prepare GCS bucket '{gcs_bucket}' for file '{filename}'. Skipping import.")
return False
Expand All @@ -60,45 +61,62 @@ def process_import_file(file_path: str, project_id: str, gcs_bucket: str) -> boo
return False


def _process_files_for_bucket(files_for_bucket: List[str], project_id: str, bucket: str) -> List[bool]:
"""Worker: processes all files assigned to a bucket sequentially."""
results = []
for f in files_for_bucket:
try:
result = process_import_file(f, project_id, bucket)
except Exception as e:
logger.error(f"Error processing file {f} in bucket {bucket}: {e}")
logger.debug(f"Error processing file {f} in bucket {bucket}: {e}", exc_info=True)
result = False
results.append(result)
return results


def run_import_files(files: List[str], project_id: str, buckets: List[str]) -> List[bool]:
"""
Distribute files round-robin to buckets, start one worker per bucket and process sequentially
to guarantee no two threads operate on the same bucket.
"""
"""Runs import for multiple files using ThreadPoolExecutor for concurrency."""
if not files:
return []
if not buckets:
logger.error("No buckets provided to run_import_files.")
return [False] * len(files)

bucket_file_map: Dict[str, List[str]] = {b: [] for b in buckets}
cycler = cycle(buckets)
for f in files:
b = next(cycler)
bucket_file_map[b].append(f)
bucket = buckets[0]
if len(buckets) > 1:
logger.warning(f"Multiple buckets provided; using '{bucket}' with folder-based uploads.")

folder_names = [f"{MIGRATION_FOLDER_PREFIX}{(idx % MAX_FOLDERS)+1}" for idx in range(len(files))]
if not ensure_folders_exist(bucket, folder_names):
logger.error(f"Unable to ensure migration folders exist in bucket '{bucket}'.")
return [False] * len(files)

assignments = list(zip(files, folder_names))
logger.info(f"Starting import of {len(files)} files into bucket '{bucket}' using {len(folder_names)} folders.")

results: List[bool] = []
with ThreadPoolExecutor(max_workers=len(buckets)) as executor:
max_workers = min(MAX_FOLDERS, len(assignments))
import_files_with_threads(project_id, bucket, assignments, results, max_workers) # Re-raise to propagate the interrupt

return results

def import_files_with_threads(project_id, bucket, assignments, results, max_workers):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(_process_files_for_bucket, bucket_file_map[bucket], project_id, bucket): bucket
for bucket in buckets
executor.submit(process_import_file, file_path, project_id, bucket, folder_name): (file_path, folder_name)
for file_path, folder_name in assignments
}
for future in as_completed(future_map):
bucket_results = future.result() or []
results.extend(bucket_results)
return results

try:
for future in as_completed(future_map):
file_path, folder_name = future_map[future]
try:
result = future.result()
except Exception as e:
logger.error(f"Error processing file {file_path} in bucket {bucket} folder {folder_name}: {e}")
logger.debug(f"Error processing file {file_path} in bucket {bucket} folder {folder_name}: {e}", exc_info=True)
result = False
results.append(result)
except KeyboardInterrupt:
logger.warning("\n*** Import interrupted by user (Ctrl+C) ***")
logger.info("Cancelling pending imports...")

# Cancel all pending futures
for future in future_map.keys():
if not future.done():
future.cancel()

# Shutdown without waiting for threads
executor.shutdown(wait=False)
logger.info("Import process terminated by user")
raise


def filter_files_for_phases(phase_name: str, exported_files: List[str]) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# --- URLs ---
DATACATALOG_BASE_URL = "https://datacatalog.googleapis.com/v2"
DATAPLEX_BASE_URL = "https://dataplex.googleapis.com/v1"
DATAPLEX_BASE_URL = "https://staging-dataplex.sandbox.googleapis.com/v1"
SEARCH_BASE_URL = "https://datacatalog.googleapis.com/v1/catalog:search"
CLOUD_RESOURCE_MANAGER_BASE_URL = "https://cloudresourcemanager.googleapis.com/v3"

Expand Down Expand Up @@ -41,8 +41,9 @@
MAX_DESC_SIZE_BYTES = 120 * 1024
MAX_WORKERS = 10
PAGE_SIZE = 1000
PROJECT_NUMBER = "655216118709"
PROJECT_NUMBER = "418487367933"
ROLE_STEWARD = "steward"
MAX_FOLDERS = 15

# -- BACKOFF Constants ---
MAX_ATTEMPTS = 10
Expand All @@ -58,6 +59,7 @@
UNGROUPED_ENTRYLINKS_DIRECTORY = "ungrouped_entrylinks"
LOGS_DIRECTORY = "logs"
SUMMARY_DIRECTORY = "summary"
MIGRATION_FOLDER_PREFIX = "migration_folder_"

MAX_BUCKETS = 20
MAX_POLLS = 12*12 # 12 hours
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ def build_entry_link_for_entry_to_term(context, dc_glossary_term_entry, dc_entry
def process_entry_to_term_entrylinks(context: Context, dc_glossary_term_entry: GlossaryTaxonomyEntry, search_entry_result: SearchEntryResult) -> List[EntryLink]:
"""Processes asset relationships for a given search result and returns flat list of EntryLink objects."""
dataplex_entry_links: List[EntryLink] = []
if not lookup_dataplex_entry(context, search_entry_result):
return dataplex_entry_links
# if not lookup_dataplex_entry(context, search_entry_result):
# return dataplex_entry_links

dc_asset_entry_name = search_entry_result.relativeResourceName
dc_linked_resource = normalize_linked_resource(search_entry_result.linkedResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ def poll_metadata_job(service, project_id: str, location: str, job_id: str) -> b
job_path = f"projects/{project_id}/locations/{location}/metadataJobs/{job_id}"

for i in range(max_polls):
time.sleep(poll_interval)
try:
time.sleep(poll_interval)
except KeyboardInterrupt:
logger.warning(f"Job '{job_id}' polling interrupted by user.")
raise

job, state = get_job_and_state(service, job_path, job_id)
if job is None:
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,71 @@

from google.cloud import storage
from gcs_dao import *

import httplib2
import google_auth_httplib2
from google.cloud import storage
import logging_utils
from migration_utils import *
from constants import *
from dataplex_dao import *
logger = logging_utils.get_logger()


def prepare_gcs_bucket(gcs_bucket: str, file_path: str, filename: str) -> bool:
clear_bucket(gcs_bucket)
upload_to_gcs(gcs_bucket, file_path, filename)
def create_folders(bucket_name: str, folder_name: str) -> bool:
"""Creates a zero-byte object to represent a folder prefix when missing."""
normalized_folder = folder_name.strip("/")
prefix = f"{normalized_folder}/"
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
existing = list(bucket.list_blobs(prefix=prefix, max_results=1))
if existing:
return True
blob = bucket.blob(prefix)
blob.upload_from_string("")
logger.debug(f"Created folder placeholder gs://{bucket_name}/{prefix}")
return True
except Exception as error:
logger.error("Failed to ensure folder '%s' in bucket '%s': %s", prefix, bucket_name, error)
return False


def ensure_folders_exist(bucket_name: str, folder_names: list[str]) -> bool:
"""Ensures every folder prefix exists before uploads; returns False on first failure."""
logger.info("Creating necessary folders in bucket '%s'...", bucket_name)
for folder_name in folder_names:
if not create_folders(bucket_name, folder_name):
return False
return True


def upload_to_gcs(bucket_name: str, file_path: str, file_name: str) -> bool:
def prepare_gcs_bucket(gcs_bucket: str, folder_name: str, file_path: str, filename: str) -> bool:
destination_path = f"{folder_name.strip('/')}/{filename}"
if not create_folders(gcs_bucket, folder_name):
return False
if not clear_gcs_path_content(gcs_bucket, folder_name):
return False
return upload_to_gcs(gcs_bucket, file_path, destination_path)


def upload_to_gcs(bucket_name: str, file_path: str, destination_blob_name: str) -> bool:
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(file_path)
logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{file_name}")
logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{destination_blob_name}")
return True
except Exception as error:
logger.error("Failed to upload '%s' to bucket '%s' with error '%s'", file_path, bucket_name, error)
return False


def clear_bucket(bucket_name: str) -> bool:
def clear_gcs_path_content(bucket_name: str, folder_prefix: str = "") -> bool:
"""Deletes all objects in a bucket. Returns True on success, False on failure."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blobs = list(bucket.list_blobs())
prefix = folder_prefix.strip("/") + "/" if folder_prefix else ""
blobs = list(bucket.list_blobs(prefix=prefix)) if prefix else list(bucket.list_blobs())
if not blobs:
logger.debug(f"Bucket '{bucket_name}' is already empty.")
return True
Expand All @@ -48,11 +77,12 @@ def clear_bucket(bucket_name: str) -> bool:
return False

def build_dummy_payload(bucket_name):
folder_name = "permission-check"
return {
"type": "IMPORT",
"import_spec": {
"log_level": "DEBUG",
"source_storage_uri": f"gs://{bucket_name}/",
"source_storage_uri": f"gs://{bucket_name}/{folder_name}/",
"entry_sync_mode": "FULL",
"aspect_sync_mode": "INCREMENTAL",
"scope": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

logger = logging_utils.get_logger()

def build_import_spec_base(gcs_bucket: str) -> dict:
def build_import_spec_base(gcs_bucket: str, folder_name: str) -> dict:
normalized_folder = folder_name.strip("/")
return {
"log_level": "DEBUG",
"source_storage_uri": f"gs://{gcs_bucket}/",
"source_storage_uri": f"gs://{gcs_bucket}/{normalized_folder}/",
"entry_sync_mode": "FULL",
"aspect_sync_mode": "INCREMENTAL"
}
Expand Down Expand Up @@ -149,8 +150,8 @@ def build_entrylink_payload(file_path: str, project_id: str, import_spec_base: d
return build_synonym_related_entrylink_payload(file_path, project_id, import_spec_base)


def build_payload(file_path: str, project_id: str, gcs_bucket: str):
import_spec_base = build_import_spec_base(gcs_bucket)
def build_payload(file_path: str, project_id: str, gcs_bucket: str, folder_name: str):
import_spec_base = build_import_spec_base(gcs_bucket, folder_name)
filename = os.path.basename(file_path)
if filename.startswith("glossary_"):
return build_glossary_payload(filename, project_id, import_spec_base)
Expand Down
Loading