diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/entrylinks-export.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/entrylinks-export.py new file mode 100644 index 00000000..28e128df --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/entrylinks-export.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +""" +EntryLink Export Utility + +This utility exports EntryLinks from Google Cloud Dataplex glossary terms to Google Sheets. +Given a glossary URL and spreadsheet URL, it fetches all terms, performs lookupEntryLink API calls, +and saves the results to a Google Sheet. + +Usage: + python entrylinks-export.py --glossary-url --spreadsheet-url --user-project +""" + +# Standard library imports +import os +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed + +# Add parent directories to Python path to find modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # For api_layer +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # For utils + +# Local application imports +from utils import api_layer, argument_parser, business_glossary_utils, logging_utils, sheet_utils + +logger = logging_utils.get_logger() + +# Constants +SHEET_HEADERS = ["entry_link_type", "source_entry", "target_entry", "source_path"] +MAX_WORKERS = 10 # Number of threads for parallel API calls + + +def _fetch_entry_links_for_term(term: dict, user_project: str) -> list: + """ + Fetch entry links for a single term. + + Args: + term: The glossary term dictionary + user_project: The project to bill for the export + + Returns: + List of entry link rows, empty if no links found + """ + entry_name = business_glossary_utils.generate_entry_name_from_term_name(term["name"]) + term_links = api_layer.lookup_entry_links_for_term(entry_name, user_project) + if term_links: + return sheet_utils.entry_links_to_rows(term_links) + return [] + + +def export_entry_links( + glossary_resource: str, spreadsheet_url: str, user_project: str +) -> bool: + """ + Export EntryLinks for all terms in a glossary to a Google Sheet. + + Args: + glossary_resource: The full resource name of the glossary + spreadsheet_url: The URL of the Google Sheet to write to + user_project: The project to bill for the export + + Returns: + True if links were exported successfully, False if no links found + """ + dataplex_service = api_layer.authenticate_dataplex() + sheets_service = api_layer.authenticate_sheets() + terms = api_layer.list_glossary_terms(dataplex_service, glossary_resource) + + if not terms: + logger.warning("No terms found in the glossary") + return False + + # Fetch entry links for each term using thread pool + all_links = [] + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # Submit all tasks + future_to_term = { + executor.submit(_fetch_entry_links_for_term, term, user_project): term + for term in terms + } + + # Collect results as they complete + for future in as_completed(future_to_term): + term = future_to_term[future] + try: + term_links = future.result() + if term_links: + all_links.extend(term_links) + except Exception as exc: + logger.error("Failed to fetch entry links for term %s: %s", term.get("name"), exc) + + if not all_links: + logger.info("No entry links found for any terms") + return False + + # Get spreadsheet ID + spreadsheet_id = api_layer.get_spreadsheet_id(spreadsheet_url) + + # Prepare data for sheet (headers + rows) + sheet_data = [SHEET_HEADERS] + sheet_data.extend(all_links) + + # Write to Google Sheet + api_layer.write_to_sheet(sheets_service, spreadsheet_id, sheet_data) + logger.info(f"Successfully wrote {len(all_links)} entry links to spreadsheet") + return True + + +def main() -> int: + """Main entry point.""" + try: + logging_utils.setup_file_logging() + args = argument_parser.get_export_entrylinks_arguments() + glossary_resource = business_glossary_utils.extract_glossary_name(args.glossary_url) + + logger.info("Starting EntryLink Export from Glossary: %s", glossary_resource) + logger.info("Exporting to Google Sheet: %s", args.spreadsheet_url) + + success = export_entry_links(glossary_resource, args.spreadsheet_url, args.user_project) + if success: + logger.info("EntryLinks are successfully exported to Google Sheet") + else: + logger.info("No EntryLinks found to export") + return 0 + except Exception as exc: + logger.exception("Failed to export entry links: %s", exc) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/glossary-export.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/glossary-export.py index ddd0ba58..e300e37c 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/glossary-export.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/export/glossary-export.py @@ -1,55 +1,31 @@ -from google.auth import default -from googleapiclient.discovery import build -import re import logging -from typing import List, Dict, Optional - -# --- Configure logging --- -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -# --- Custom Exception Classes --- -class InvalidSpreadsheetURLError(Exception): - """Raised when the provided spreadsheet URL is invalid.""" - pass - -class InvalidGlossaryNameError(Exception): - """Raised when the provided glossary name is invalid.""" - pass - -class DataplexAPIError(Exception): - """Raised when there is an error interacting with the Dataplex API.""" - pass - -class SheetsAPIError(Exception): - """Raised when there is an error interacting with the Google Sheets API.""" - pass - -class NoCategoriesFoundError(Exception): - """Raised when no categories are found for the given glossary.""" - pass - -class NoTermsFoundError(Exception): - """Raised when no terms are found for the given glossary.""" - pass +import os +import sys +from typing import List -class InvalidTermNameError(Exception): - """Raised when term name is invalid.""" - pass - -class InvalidCategoryNameError(Exception): - """Raised when Category name is invalid.""" - pass +from googleapiclient.discovery import build -class InvalidEntryIdFormatError(Exception): - """Raised when the entry ID format is invalid.""" - pass +# Configure Python path for module imports +curr_dir = os.path.dirname(os.path.abspath(__file__)) +import_dir = os.path.dirname(curr_dir) # import directory +glossary_dir = os.path.dirname(import_dir) # dataplex-glossary +base_dir = os.path.dirname(glossary_dir) # business-glossary-import + +# Add all necessary paths +sys.path.append(import_dir) # For import local modules +sys.path.append(glossary_dir) # For glossary modules +sys.path.append(base_dir) # For migration modules + +from utils import api_layer, error +from utils.constants import ( + CATEGORY_NAME_PATTERN, + GLOSSARY_URL_PATTERN, + SPREADSHEET_URL_PATTERN, + TERM_NAME_PATTERN, +) -GLOSSARY_URL_PATTERN = re.compile(r".*dp-glossaries/projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/?#]+).*") -TERM_NAME_REGEX_PATTERN = re.compile(r"projects/([^/]+)/locations/([^/]+)/glossaries/([^/]+)/terms/([^/]+)") -CATERGORY_NAME_REGEX_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/?#]+)/categories/(?P[^/]+)") -GLOSSARY_NAME_REGEX_PATTERN = re.compile(r"projects/([^/]+)/locations/([^/]+)/glossaries/([^/]+)") -ENTRY_NAME_REGEX_PATTERN = re.compile(r"projects/([^/]+)/locations/([^/]+)/entryGroups/@dataplex/entries/.*") -SPREAD_SHEET_REGEX_PATTERN = re.compile(r"https://docs\.google\.com/spreadsheets/d/([^/]+)") +# --- Configure logging --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Expected format for the glossary URL (for error messages) EXPECTED_GLOSSARY_URL_FORMAT = "any_url_containing/dp-glossaries/projects//locations//glossaries/" @@ -86,160 +62,12 @@ def _get_spreadsheet_id(spreadsheet_url: str) -> str: Raises: InvalidSpreadsheetURLError: If the spreadsheet URL is invalid. """ - match = SPREAD_SHEET_REGEX_PATTERN.match(spreadsheet_url) + match = SPREADSHEET_URL_PATTERN.match(spreadsheet_url) if not match: logging.error(f"Invalid spreadsheet URL: {spreadsheet_url}") - raise InvalidSpreadsheetURLError(f"Invalid spreadsheet URL: {spreadsheet_url}") - return match.group(1) + raise error.InvalidSpreadsheetURLError(f"Invalid spreadsheet URL: {spreadsheet_url}") + return match.group('spreadsheet_id') -def _authenticate_dataplex() -> build: - """Authenticates with the Dataplex API using Application Default Credentials. - - Returns: - A Dataplex API service object. - - Raises: - google.auth.exceptions.DefaultCredentialsError: If credentials cannot be found. - DataplexAPIError: If there is an error during authentication. - """ - try: - dataplex_scopes = ['https://www.googleapis.com/auth/cloud-platform'] - dataplex_creds, _ = default(scopes=dataplex_scopes) - return build('dataplex', 'v1', credentials=dataplex_creds) - except Exception as e: - logging.error(f"Error during Dataplex authentication: {e}") - raise DataplexAPIError(f"Error during Dataplex authentication: {e}") - -def _authenticate_sheets() -> build: - """Authenticates with the Google Sheets API using Application Default Credentials. - - Returns: - A Google Sheets API service object. - - Raises: - google.auth.exceptions.DefaultCredentialsError: If credentials cannot be found. - SheetsAPIError: If there is an error during authentication. - """ - try: - sheets_scopes = ['https://www.googleapis.com/auth/spreadsheets'] - sheets_creds, _ = default(scopes=sheets_scopes) - return build('sheets', 'v4', credentials=sheets_creds) - except Exception as e: - logging.error(f"Error during Sheets authentication: {e}") - raise SheetsAPIError(f"Error during Sheets authentication: {e}") - -def _list_glossary_categories(dataplex_service: build, glossary_name: str) -> List[Dict]: - """Lists categories from a Dataplex glossary. - - Args: - dataplex_service: The Dataplex API service object. - glossary_name: The full glossary name. - - Returns: - A list of glossary categories. - - Raises: - DataplexAPIError: If there is an error during the API call. - NoCategoriesFoundError: If no categories are found. - """ - try: - request = dataplex_service.projects().locations().glossaries().categories().list( - parent=glossary_name, pageSize=1000) - - categories = [] - while request: - response = request.execute() - if 'categories' in response: - categories.extend(response['categories']) - request = dataplex_service.projects().locations().glossaries().categories().list_next(request, response) - if not categories: - logging.warning(f"No categories found for {glossary_name}") - raise NoCategoriesFoundError(f"No categories found for {glossary_name}") - return categories - except Exception as e: - logging.error(f"Error while listing glossary categories: {e}") - raise DataplexAPIError(f"Error while listing glossary categories: {e}") - - -def _list_glossary_terms(dataplex_service: build, glossary_name: str) -> List[Dict]: - """Lists terms from a Dataplex glossary, handling pagination. - - Args: - dataplex_service: The Dataplex API service object. - glossary_name: The full glossary name. - - Returns: - A list of glossary terms. - - Raises: - DataplexAPIError: If there is an error during the API call. - NoTermsFoundError: If no terms are found. - """ - try: - terms = [] - page_token = None - while True: - request = dataplex_service.projects().locations().glossaries().terms().list( - parent=glossary_name, pageSize=1000, pageToken=page_token - ) - response = request.execute() - - if 'terms' in response: - terms.extend(response['terms']) - - page_token = response.get('nextPageToken') - if not page_token: - break - - if not terms: - logging.warning(f"No terms found for {glossary_name}") - raise NoTermsFoundError(f"No terms found for {glossary_name}") - return terms - except Exception as e: - logging.error(f"Error while listing glossary terms for {glossary_name}: {e}", exc_info=True) - raise DataplexAPIError(f"Error while listing glossary terms for {glossary_name}: {e}") - -def _extract_project_and_location_from_entry_id(entry_id: str) -> tuple: - """Extracts project and location from an entry id. - - Args: - entry_id: The entry id. - - Returns: - A tuple with project_id and location_id - - Raises: - InvalidEntryIdFormatError: If the entry id format is invalid - """ - match = ENTRY_NAME_REGEX_PATTERN.match(entry_id) - if not match: - logging.error(f"Invalid entry id format: {entry_id}") - raise InvalidEntryIdFormatError(f"Invalid entry id format: {entry_id}") - project_id, location_id = match.groups() - return project_id, location_id - -def _lookup_entry(dataplex_service: build, entry_id: str) -> Optional[Dict]: - """Looks up an entry using the Dataplex API. - - Args: - dataplex_service: The Dataplex API service object. - entry_id: The entry ID to look up. - - Returns: - The entry response as a dictionary, or None if an error occurs. - - Raises: - DataplexAPIError: If there is an error during the API call. - """ - try: - project_id, location_id = _extract_project_and_location_from_entry_id(entry_id) - name = f"projects/{project_id}/locations/{location_id}" - request = dataplex_service.projects().locations().lookupEntry(name=name, entry=entry_id, view="ALL") - response = request.execute() - return response - except Exception as e: - logging.error(f"Error while looking up entry {entry_id}: {e}") - raise DataplexAPIError(f"Error while looking up entry {entry_id}: {e}") def _generate_entry_name_from_category_name(category_name: str) -> str: """Generates an entryId from a category name. @@ -253,11 +81,15 @@ def _generate_entry_name_from_category_name(category_name: str) -> str: Raises: InvalidCategoryNameError: If the category name format is invalid. """ - match = CATERGORY_NAME_REGEX_PATTERN.match(category_name) + match = CATEGORY_NAME_PATTERN.match(category_name) if not match: logging.error(f"Invalid category name format: {category_name}") - raise InvalidCategoryNameError(f"Invalid category name format: {category_name}") - project_id, location_id, glossary_id, category_id = match.groups() + raise error.InvalidCategoryNameError(f"Invalid category name format: {category_name}") + + project_id = match.group('project_id') + location_id = match.group('location_id') + glossary_id = match.group('glossary_id') + category_id = match.group('category_id') return f"projects/{project_id}/locations/{location_id}/entryGroups/@dataplex/entries/projects/{project_id}/locations/{location_id}/glossaries/{glossary_id}/categories/{category_id}" def _generate_entry_name_from_term_name(term_name: str) -> str: @@ -272,11 +104,16 @@ def _generate_entry_name_from_term_name(term_name: str) -> str: Raises: InvalidTermNameError: If the term name format is invalid. """ - match = TERM_NAME_REGEX_PATTERN.match(term_name) + match = TERM_NAME_PATTERN.match(term_name) if not match: logging.error(f"Invalid term name format: {term_name}") - raise InvalidTermNameError(f"Invalid term name format: {term_name}") - project_id, location_id, glossary_id, term_id = match.groups() + raise error.InvalidTermNameError(f"Invalid term name format: {term_name}") + + project_id = match.group('project_id') + location_id = match.group('location_id') + glossary_id = match.group('glossary_id') + term_id = match.group('term_id') + return f"projects/{project_id}/locations/{location_id}/entryGroups/@dataplex/entries/projects/{project_id}/locations/{location_id}/glossaries/{glossary_id}/terms/{term_id}" def _get_sheet_row_for_category(category, entry): @@ -290,12 +127,16 @@ def _get_sheet_row_for_category(category, entry): an object containing values for the following keys ["id", "parent", "display_name", "description", "overview", "type", "contact1_email", "contact1_name", "contact2_email", "contact2_name", "label1_key", "label1_value", "label2_key", "label2_value"] """ - match = CATERGORY_NAME_REGEX_PATTERN.match(category.get("name")) + match = CATEGORY_NAME_PATTERN.match(category.get("name")) if not match: logging.error(f"Invalid category name format: {category.get('name')}") - raise InvalidCategoryNameError(f"Invalid category name format: {category.get('name')}") + raise error.InvalidCategoryNameError(f"Invalid category name format: {category.get('name')}") - project_id, location_id, glossary_id, category_id = match.groups() + project_id = match.group('project_id') + location_id = match.group('location_id') + glossary_id = match.group('glossary_id') + category_id = match.group('category_id') + sheet_data = {} # append id @@ -305,7 +146,7 @@ def _get_sheet_row_for_category(category, entry): if category.get("parent") == f"projects/{project_id}/locations/{location_id}/glossaries/{glossary_id}": sheet_data["parent"] = "" else: - parent_match = CATERGORY_NAME_REGEX_PATTERN.match(category.get("parent")) + parent_match = CATEGORY_NAME_PATTERN.match(category.get("parent")) if not parent_match: logging.error(f"Invalid parent: {category.get("parent")}") else: @@ -357,11 +198,15 @@ def _get_sheet_row_for_term(term, entry): ["id", "parent", "display_name", "description", "overview", "type", "contact1_email", "contact1_name", "contact2_email", "contact2_name", "label1_key", "label1_value", "label2_key", "label2_value"] """ sheet_data = {} - match = TERM_NAME_REGEX_PATTERN.match(term.get("name")) + match = TERM_NAME_PATTERN.match(term.get("name")) if not match: logging.error(f"Invalid term name format: {term.get('name')}") - raise InvalidTermNameError(f"Invalid term name format: {term.get('name')}") - project_id, location_id, glossary_id, term_id = match.groups() + raise error.InvalidTermNameError(f"Invalid term name format: {term.get('name')}") + + project_id = match.group('project_id') + location_id = match.group('location_id') + glossary_id = match.group('glossary_id') + term_id = match.group('term_id') # append id sheet_data["id"] = term_id @@ -370,11 +215,11 @@ def _get_sheet_row_for_term(term, entry): if term.get("parent") == f"projects/{project_id}/locations/{location_id}/glossaries/{glossary_id}": sheet_data["parent"] = "" else: - parent_match = CATERGORY_NAME_REGEX_PATTERN.match(term.get("parent")) + parent_match = CATEGORY_NAME_PATTERN.match(term.get("parent")) if not parent_match: logging.error(f"Invalid parent: {term.get("parent")}") else: - project_id, location_id, glossary_id, category_id = parent_match.groups() + category_id = parent_match.group('category_id') sheet_data["parent"] = category_id # append display name @@ -457,7 +302,7 @@ def _write_to_sheet(sheets_service: build, spreadsheet_id: str, data: List[List[ logging.info(f"Data written to spreadsheet: {spreadsheet_id}") except Exception as e: logging.error(f"Error while writing to spreadsheet: {e}") - raise SheetsAPIError(f"Error while writing to spreadsheet: {e}") + raise error.SheetsAPIError(f"Error while writing to spreadsheet: {e}") # --- Main Function --- def list_and_write_glossary_taxonomy(spreadsheet_url: str, glossary_name: str) -> None: @@ -477,14 +322,14 @@ def list_and_write_glossary_taxonomy(spreadsheet_url: str, glossary_name: str) - spreadsheet_id = _get_spreadsheet_id(spreadsheet_url) # 2. Authenticate to APIs - dataplex_service = _authenticate_dataplex() - sheets_service = _authenticate_sheets() + dataplex_service = api_layer.authenticate_dataplex() + sheets_service = api_layer.authenticate_sheets() # 3. List glossary categories - categories = _list_glossary_categories(dataplex_service, glossary_name) + categories = api_layer.list_glossary_categories(dataplex_service, glossary_name) # 4. List glossary terms - terms = _list_glossary_terms(dataplex_service, glossary_name) + terms = api_layer.list_glossary_terms(dataplex_service, glossary_name) entry_names_map = {} # 5. Get Category and Terms. @@ -505,10 +350,10 @@ def list_and_write_glossary_taxonomy(spreadsheet_url: str, glossary_name: str) - entry_id_to_entry_map = {} for entry_id, entry_name in entry_names_map.items(): try: - entry_response = _lookup_entry(dataplex_service, entry_name) + entry_response = api_layer.lookup_entry(dataplex_service, entry_name) if entry_response: entry_id_to_entry_map[entry_id] = entry_response - except DataplexAPIError as e: + except error.DataplexAPIError as e: logging.warning(f"Skipping entry {entry_name} due to lookup error: {e}") continue @@ -548,7 +393,7 @@ def list_and_write_glossary_taxonomy(spreadsheet_url: str, glossary_name: str) - logging.info("Process completed successfully.") - except (InvalidSpreadsheetURLError, InvalidGlossaryNameError, DataplexAPIError, SheetsAPIError, NoCategoriesFoundError) as e: + except (error.InvalidSpreadsheetURLError, error.InvalidGlossaryNameError, error.DataplexAPIError, error.SheetsAPIError, error.NoCategoriesFoundError) as e: logging.error(f"Operation failed: {e}") raise except Exception as e: diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/__init__.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/__init__.py new file mode 100644 index 00000000..c7747b73 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/__init__.py @@ -0,0 +1,3 @@ +""" +Import package for EntryLink import functionality. +""" \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/entrylinks-import.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/entrylinks-import.py new file mode 100644 index 00000000..6c3ec0f3 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/entrylinks-import.py @@ -0,0 +1,298 @@ +""" +EntryLink Import Utility + +This utility imports EntryLinks into Google Cloud Dataplex from Google Sheets. +It validates the input data, converts entries to the correct format, +and creates metadata import jobs to add the EntryLinks. + +Usage: + python entrylinks-import.py --spreadsheet-url --project-id --gcs-bucket +""" + +# Standard library imports +import os +import sys +from typing import Dict, List + +# Third-party imports +from googleapiclient.errors import HttpError + +# Configure Python path for module imports +curr_dir = os.path.dirname(os.path.abspath(__file__)) +import_dir = os.path.dirname(curr_dir) # import directory +glossary_dir = os.path.dirname(import_dir) # dataplex-glossary + +# Add all necessary paths +sys.path.append(import_dir) +sys.path.append(glossary_dir) + +# Local imports +from utils import api_layer, argument_parser, business_glossary_utils, constants, import_utils, logging_utils, sheet_utils +from utils.constants import ( + BIGQUERY_SYSTEM_ENTRY_GROUP, + DATAPLEX_ENTRY_PATTERN, + DP_LINK_TYPE_DEFINITION, + DP_LINK_TYPE_RELATED, + DP_LINK_TYPE_SYNONYM, + ENTRY_REFERENCE_TYPE_SOURCE, + ENTRY_REFERENCE_TYPE_TARGET, + SOURCE_ENTRY_PATTERN, +) +from utils.models import EntryLink, EntryReference, SpreadsheetRow + +logger = logging_utils.get_logger() + + +def prompt_user_on_missing_entries(not_found_entries): + if len(not_found_entries) > 0: + logger.warning(f"Found {len(not_found_entries)} entries are not found in Dataplex") + user_input = input("Continue with import? [y/N]: ") + if not user_input.lower().startswith('y'): + sys.exit(1) + + +def extract_entry_references_from_spreadsheet(spreadsheet_url: str): + """Extract entry references from Google Sheet.""" + sheets_service = api_layer.authenticate_sheets() + spreadsheet_id = api_layer.get_spreadsheet_id(spreadsheet_url) + + # Read data from sheet + data = api_layer.read_from_sheet(sheets_service, spreadsheet_id) + + if not data or len(data) < 2: # Need at least headers and one row + logger.warning("Spreadsheet is empty or has no data rows") + return set() + + # Parse headers to find column indices + headers = [header.lower().strip() for header in data[0]] + try: + source_idx = headers.index('source_entry') + target_idx = headers.index('target_entry') + except ValueError as e: + logger.error(f"Required column not found in spreadsheet headers: {e}") + raise ValueError(f"Spreadsheet must have 'source_entry' and 'target_entry' columns") + + entry_references = set() + for row in data[1:]: # Skip header row + if len(row) > source_idx and row[source_idx].strip(): + entry_references.add(row[source_idx].strip()) + if len(row) > target_idx and row[target_idx].strip(): + entry_references.add(row[target_idx].strip()) + return entry_references + +def check_entry_existence(entrylinks: List[EntryLink], dataplex_service) -> List[str]: + """Check if entries exist by extracting entry references from entrylinks.""" + missing_entries = [] + checked_entries = set() # Avoid duplicate checks + + for entrylink in entrylinks: + try: + # Extract entry references from the EntryLinkData model + entry_refs = entrylink.entryReferences + lookup_entries(dataplex_service, missing_entries, checked_entries, entry_refs) + except HttpError as e: + logger.error(f"Error while checking entry existence: {e}") + except Exception as e: + logger.error(f"Unexpected error while checking entry: {e}") + return missing_entries + + +def lookup_entries(dataplex_service, missing_entries, checked_entries, entry_references: List[EntryReference]): + for entry_reference in entry_references: + entry_name = entry_reference.name + if not entry_name or entry_name in checked_entries: + continue + + checked_entries.add(entry_name) + match = DATAPLEX_ENTRY_PATTERN.match(entry_name) + if not match: + logger.warning(f"Invalid entry name format: {entry_name}") + continue + + project_id = match.group('project_id') + location = match.group('location_id') + project_location_name = f"projects/{project_id}/locations/{location}" + + if not api_layer.lookup_entry(dataplex_service, entry_name, project_location_name): + missing_entries.append(entry_name) + logger.warning(f"Entry not found in Dataplex: {entry_name}") + + +def convert_spreadsheet_to_entrylinks(spreadsheet_url: str) -> List[EntryLink]: + """Convert spreadsheet rows to EntryLink entries for import.""" + sheets_service = api_layer.authenticate_sheets() + spreadsheet_id = api_layer.get_spreadsheet_id(spreadsheet_url) + + # Read data from sheet + data = api_layer.read_from_sheet(sheets_service, spreadsheet_id) + + if not data or len(data) < 2: + logger.warning("Spreadsheet is empty or has no data rows") + return [] + + # Parse headers + type_idx, source_idx, target_idx, path_idx = sheet_utils.extract_column_indices(data) + + # Convert rows to entry link dictionaries + row_dicts = sheet_utils.rows_to_entry_link_dicts(data, type_idx, source_idx, target_idx, path_idx) + + # Build EntryLink models from the dictionaries + entries = [] + for row_dict in row_dicts: + row = SpreadsheetRow.from_dict(row_dict) + entry = build_entry_link(row) + entries.append(entry) + + return entries + +def build_entry_link(row: SpreadsheetRow) -> EntryLink: + """Build EntryLink model from row data.""" + match = SOURCE_ENTRY_PATTERN.match(row.source_entry) + if not match: + logger.error(f"Invalid source entry format: {row.source_entry}") + raise ValueError(f"Invalid source entry format: {row.source_entry}") + + project_id = match.group('project_id') + location = match.group('location_id') + entry_group = match.group('entry_group') + + link_type = row.entry_link_type.lower() + entry_references = build_entry_references(row, entry_group, link_type) + + # EntryLink name uses the source entry's project/location/entryGroups path + entrylink_base = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}" + + entrylink = EntryLink( + name=f"{entrylink_base}/entryLinks/{business_glossary_utils.get_entry_link_id()}", + entryLinkType=constants.LINK_TYPES[link_type], + entryReferences=entry_references + ) + + logger.debug(f"input row: {row}, output entrylink: {entrylink}") + return entrylink + +def build_entry_references(row: SpreadsheetRow, entry_group: str, link_type: str) -> List[EntryReference]: + """Build list of EntryReference models from row data.""" + entry_references = [] + if link_type == DP_LINK_TYPE_DEFINITION: + source_path = row.source_path.strip() + + # Handle path formatting for BigQuery + if entry_group == BIGQUERY_SYSTEM_ENTRY_GROUP and source_path and not source_path.startswith('Schema.'): + source_path = f"Schema.{source_path}" + + entry_references.append(EntryReference( + name=row.source_entry, + path=source_path, + type=ENTRY_REFERENCE_TYPE_SOURCE + )) + entry_references.append(EntryReference( + name=row.target_entry, + path='', + type=ENTRY_REFERENCE_TYPE_TARGET + )) + else: + entry_references.append(EntryReference(name=row.source_entry)) + entry_references.append(EntryReference(name=row.target_entry)) + return entry_references + + +def extract_entrylink_components(entrylink_name: str) -> tuple[str, str, str]: + """Extract project_id, location, and entry_group from an entrylink name.""" + match = constants.ENTRYLINK_NAME_PATTERN.match(entrylink_name) + if not match: + logger.error(f"Invalid entryLink name format: {entrylink_name}") + raise ValueError(f"Invalid entryLink name format: {entrylink_name}") + + project_id = match.group('project_id') + location = match.group('location_id') + entry_group = match.group('entry_group') + + return project_id, location, entry_group + +def group_entrylinks_by_entry_type_and_entry_group(entrylinks: List[EntryLink]) -> Dict[str, Dict[str, List[Dict]]]: + """Group entrylinks by entry link type and project/location from the entrylink name.""" + grouped_entrylinks = {} + for entrylink in entrylinks: + # Get the link type using regex pattern + entry_link_type_name = entrylink.entryLinkType + type_match = constants.ENTRYLINK_TYPE_PATTERN.match(entry_link_type_name) + if not type_match: + logger.warning(f"Invalid entryLinkType format: {entry_link_type_name}") + continue + + entry_link_type = type_match.group('link_type') + # Normalize related/synonym to the same category + if entry_link_type in [DP_LINK_TYPE_RELATED, DP_LINK_TYPE_SYNONYM]: + entry_link_type = 'related-synonym' # Group both types together + + # Extract project/location/entryGroup from the entryLink name + entrylink_name = entrylink.name + project_id, location, entry_group = extract_entrylink_components(entrylink_name) + + # Create group key using project_id, location, and entryGroup + # Convert model back to dict for compatibility with downstream code + group_entrylinks(grouped_entrylinks, entrylink.to_dict(), entry_link_type, project_id, location, entry_group) + + logger.debug(f"input: {entrylinks}, output: {grouped_entrylinks}") + return grouped_entrylinks + +def group_entrylinks(grouped_entrylinks, entrylink, entry_link_type, project_id, location, entry_group): + group_key = f"{project_id}_{location}_{entry_group}" + + if entry_link_type not in grouped_entrylinks: + grouped_entrylinks[entry_link_type] = {} + if group_key not in grouped_entrylinks[entry_link_type]: + grouped_entrylinks[entry_link_type][group_key] = [] + grouped_entrylinks[entry_link_type][group_key].append(entrylink) + + +def main(): + """Main entry point for EntryLink import.""" + logging_utils.setup_file_logging() + args = argument_parser.get_import_entrylinks_arguments() + logger.info("Starting EntryLink Import from Google Sheet") + logger.info("Spreadsheet URL: %s", args.spreadsheet_url) + dataplex_service = api_layer.authenticate_dataplex() + + try: + # Convert spreadsheet rows to entries first + entrylinks = convert_spreadsheet_to_entrylinks(args.spreadsheet_url) + if not entrylinks: + logger.warning("Spreadsheet is empty or has no valid entries") + return 1 + + # Step 1: Validate entry existence + not_found_entries = check_entry_existence(entrylinks, dataplex_service) + prompt_user_on_missing_entries(not_found_entries) + + # Step 2: Group entries by type and entry group + grouped_entrylinks = group_entrylinks_by_entry_type_and_entry_group(entrylinks) + + # Step 3: Create JSON files in archive folder + archive_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "archive") + import_files = import_utils.create_import_json_files(grouped_entrylinks, archive_dir) + + if not import_files: + logger.warning("No files to process") + return 1 + + # Step 4: Process each group with bucket cycling + buckets = args.buckets + all_success = import_utils.run_import_files(import_files, buckets) + + if all_success: + logger.info("EntryLink Import Completed Successfully!") + return 0 + else: + logger.error("Some import jobs failed. Check logs for details.") + return 1 + + except Exception as e: + logger.error(f"Unexpected error during import: {e}") + logger.debug(f"Error details:", exc_info=True) + return 1 + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/glossary-import.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/glossary-import.py index 3e71d8f5..caf10cb7 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/glossary-import.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/import/glossary-import.py @@ -1,40 +1,40 @@ -import gspread +import argparse +import datetime import json -from google.auth import default -from google.cloud import storage import os -import re -import datetime -from typing import List -import argparse import time +from typing import List + +import google.auth +import gspread +from google.auth import default +from google.cloud import storage from googleapiclient.discovery import build from googleapiclient.errors import HttpError -import google.auth -# Regex pattern for the glossary URL, allowing any valid URL -GLOSSARY_URL_PATTERN = re.compile(r".*dp-glossaries/projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/?#]+).*") +# Add paths for local module imports +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from utils.constants import ( + DATAPLEX_SYSTEM_ENTRY_GROUP, + EMAIL_PATTERN, + GLOSSARY_URL_PATTERN, + ID_PATTERN, + LABEL_PATTERN, + PARENT_PATTERN, +) # Expected format for the glossary URL (for error messages) EXPECTED_GLOSSARY_URL_FORMAT = "any_url_containing/dp-glossaries/projects//locations//glossaries/" -EMAIL_PATTERN = re.compile(r"^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$") - +# Type constants TERM_TYPE = "TERM" CATEGORY_TYPE = "CATEGORY" # Allowed types ALLOWED_TYPES = {TERM_TYPE, CATEGORY_TYPE} -# Regex pattern for the name format (term_id or category_id) -ID_PATTERN = re.compile(r"^[a-z][a-z0-9_-]*$") - -# Regex pattern for the parent format (category_id) -PARENT_PATTERN = re.compile(r"^[a-z][a-z0-9_-]*$") - -# Regex pattern for label key and values -LABEL_PATTERN = re.compile(r"^[a-z0-9_-]+$") - ID_COLUMN = "id" # A constant for the name column name DISPLAY_NAME_COLUMN_NAME = "display_name" # A constant for the display name column name DESCRIPTION_COLUMN_NAME = "description" # A constant for the description column name @@ -77,7 +77,6 @@ PARENT_ENTRY_COLUMN_NAME="PARENT_ENTRY_COLUMN_NAME" ANCESTORS = "ANCESTORS" -ENTRY_GROUP_ID = "@dataplex" # Added a constant for the entry group id MAX_DEPTH = 4 # Max depth allowed for a node. MAX_NUM_CATEGORIES = 200 # Max number of categories allowed in the glossary MAX_NUM_TERMS = 5000 # Max number of terms allowed in the glossary @@ -163,7 +162,7 @@ def _extract_glossary_ids(self): self.location_id = match.group("location_id") self.glossary_id = match.group("glossary_id") self.project_location_base = f"projects/{self.project_id}/locations/{self.location_id}" - self.entry_group_name = f"{self.project_location_base}/entryGroups/{ENTRY_GROUP_ID}" + self.entry_group_name = f"{self.project_location_base}/entryGroups/{DATAPLEX_SYSTEM_ENTRY_GROUP}" self.base_parent = f"{self.entry_group_name}/entries/{self.project_location_base}/glossaries/{self.glossary_id}" diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/api_call_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/api_call_utils.py new file mode 100644 index 00000000..1ecce7d6 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/api_call_utils.py @@ -0,0 +1,212 @@ +"""Utils for API calls.""" + +import logging +from typing import Any, Callable, Dict +import time +import random +import requests +from utils import logging_utils +from utils.constants import MAX_BACKOFF_SECONDS, INITIAL_BACKOFF_SECONDS +import google.auth +from google.auth.transport.requests import Request + + +logging.getLogger('urllib3').setLevel(logging.WARNING) +logger = logging_utils.get_logger() + + +# Cache +cached_creds = None +cached_token = None +last_refresh_time = 0 + +REFRESH_INTERVAL_SECONDS = 55 * 60 # 55 minutes + +def _refresh_adc_token(): + """Refresh ADC credentials and update cache.""" + global cached_creds, cached_token, last_refresh_time + creds, _ = google.auth.default() + creds.refresh(Request()) + cached_creds = creds + cached_token = creds.token + last_refresh_time = time.time() + logger.debug("ADC token refreshed successfully.") + + +def _get_header(project_id: str) -> Dict[str, str]: + """Return headers using ADC, refreshing token every 30 minutes.""" + global last_refresh_time, cached_token + + is_token_expired = (time.time() - last_refresh_time) > REFRESH_INTERVAL_SECONDS + if not cached_token or is_token_expired: + logger.debug("Refreshing ADC token (interval reached)...") + _refresh_adc_token() + + return { + "Content-Type": "application/json", + "Authorization": f"Bearer {cached_token}", + "X-Goog-User-Project": project_id, + } + +def extract_error_details( + response_err: requests.exceptions.RequestException, +) -> list[Any]: + """Extract error details from response error data. + + Args: + response_err: RequestException containing response error data. + + Returns: + List of error details dictionaries. + """ + if response_err.response is None: + return [] + + try: + data = response_err.response.json() + return data.get('error', dict()).get('details', []) + except requests.exceptions.JSONDecodeError: + return [] + + +def extract_error_code( + response_err: requests.exceptions.RequestException, +) -> str | None: + """Extract error code from response error data. + + Return None if no error code found. + + Args: + response_err: RequestException containing response error data. + + Returns: + String representing error code or None if no error code found. + """ + for detail in extract_error_details(response_err): + if ( + detail.get('@type') == 'type.googleapis.com/google.rpc.ErrorInfo' + and detail.get('metadata') is not None + and detail.get('metadata').get('code') is not None + and not str(detail.get('metadata').get('code')).isspace() + ): + return str(detail.get('metadata').get('code')) + + return None + + +def extract_debug_info_detail( + response_err: requests.exceptions.RequestException, +) -> str | None: + """Extract debug info details from response error data. + + Return None if no debug info detail found. + + Args: + response_err: RequestException containing response error data. + + Returns: + String representing debug infor detail or None if no debug info detail + found. + """ + for detail in extract_error_details(response_err): + if ( + detail.get('@type') == 'type.googleapis.com/google.rpc.DebugInfo' + and detail.get('detail') is not None + and not str(detail.get('detail')).isspace() + ): + return str(detail.get('detail')) + return None + + +def create_error_message( + method_name: str, + url: str, + response_err: requests.exceptions.RequestException, +) -> str: + """Create an error message. + + Args: + method_name: String containing a http method name. + url: String containing targeted url. + response_err: RequestException containing response error data. + + Returns: + String containing user friendly error message. + """ + base_err_description = str(response_err) + err_description = ( + extract_debug_info_detail(response_err) + or extract_error_code(response_err) + or base_err_description + ) + return f'{method_name} call to {url} returned: {err_description}' + + +def fetch_api_response( + method: Callable[..., Any], + url: str, + project_id: str, + request_body: dict[str, Any] | None = None, +) -> dict[str, Any]: + """REST API call helper with exponential backoff retry mechanism. + + Args: + method: HTTP method name. + url: URL of the resource. + project_id: Google Cloud Project id. + request_body: Optional body of request. + + Returns: + Dictionary with response and error if any. + """ + method_name = 'GET' if method == requests.get else 'POST' + context = f'[{method_name} {url}]' + + logger.debug(f'{context} Starting API call with project_id={project_id} and request_body={request_body}') + + backoff = INITIAL_BACKOFF_SECONDS + + while True: + try: + res = method(url, headers=_get_header(project_id), json=request_body) + logger.debug(f'{context} Response status: {res.status_code}, Response text: {res.text}') + + try: + data = res.json() + except requests.exceptions.JSONDecodeError: + error_msg = 'Call returned non-valid JSON.' + logger.debug(f'{context} JSON decode error: {error_msg}') + return {'json': None, 'error_msg': error_msg} + + if res.ok: + logger.debug(f'{context} Successful response JSON: {data}') + return {'json': data, 'error_msg': None} + + # If response is error, capture error message + error_msg = data.get('error', {}).get('message') or f'Call returned HTTP {res.status_code}.' + logger.debug(f'{context} Bad response: {error_msg}') + + # Retry only for infra-related HTTP errors (e.g., 500s, 429 rate limit) + if res.status_code >= 500 or res.status_code == 429: + logger.info(f"{context} Retrying in {backoff} seconds due to transient error...") + time.sleep(backoff + random.uniform(0, 0.5)) # Add jitter + backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) + continue + + # Retry if connection is lost (e.g., requests.ConnectionError) + if res.status_code == 0: + logger.info(f"{context} Retrying in {backoff} seconds due to connection lost (status 0)...") + time.sleep(backoff + random.uniform(0, 0.5)) + backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) + continue + + # For client errors (e.g., 400, 401), do not retry + return {'json': data, 'error_msg': error_msg} + + except requests.exceptions.RequestException as err: + error_msg = create_error_message(method_name, url, err) + logger.debug(f'{context} Exception occurred: {error_msg}. Retrying in {backoff} seconds...') + + time.sleep(backoff + random.uniform(0, 0.5)) # Add jitter + backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) + continue diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/api_layer.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/api_layer.py new file mode 100644 index 00000000..2c9b5783 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/api_layer.py @@ -0,0 +1,304 @@ +""" +API Layer for Dataplex Glossary Operations. + +This module provides a common interface for interacting with the Dataplex API +for glossary-related operations. It's used by both export and import utilities. +""" + +# Standard library imports +import os +import sys +from typing import Dict, List, Optional + +# Third-party imports +import requests +from google.auth import default +from googleapiclient.discovery import build + +# Add parent directory to path for imports +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +# Local imports - use relative imports from utils directory +from . import api_call_utils, business_glossary_utils, logging_utils +from .api_call_utils import fetch_api_response +from .constants import ( + CLOUD_RESOURCE_MANAGER_BASE_URL, + DATAPLEX_BASE_URL, + DATAPLEX_ENTRY_PATTERN, + PROJECT_PATTERN, + SPREADSHEET_URL_PATTERN, +) +from .error import * + +logger = logging_utils.get_logger() + + +def authenticate_dataplex() -> build: + """Authenticates with the Dataplex API using Application Default Credentials. + + Returns: + A Dataplex API service object. + + Raises: + google.auth.exceptions.DefaultCredentialsError: If credentials cannot be found. + DataplexAPIError: If there is an error during authentication. + """ + try: + dataplex_scopes = ['https://www.googleapis.com/auth/cloud-platform'] + dataplex_creds, _ = default(scopes=dataplex_scopes) + return build('dataplex', 'v1', credentials=dataplex_creds, cache_discovery=False) + except Exception as e: + logger.error(f"Error during Dataplex authentication: {e}") + raise DataplexAPIError(f"Error during Dataplex authentication: {e}") + +def authenticate_sheets() -> build: + """Authenticates with the Google Sheets API using Application Default Credentials. + + Returns: + A Google Sheets API service object. + + Raises: + google.auth.exceptions.DefaultCredentialsError: If credentials cannot be found. + SheetsAPIError: If there is an error during authentication. + """ + try: + sheets_scopes = ['https://www.googleapis.com/auth/spreadsheets'] + sheets_creds, _ = default(scopes=sheets_scopes) + return build('sheets', 'v4', credentials=sheets_creds) + except Exception as e: + logger.error(f"Error during Sheets authentication: {e}") + raise SheetsAPIError(f"Error during Sheets authentication: {e}") + +def list_glossary_categories(dataplex_service: build, glossary_name: str) -> List[Dict]: + """Lists categories from a Dataplex glossary. + + Args: + dataplex_service: The Dataplex API service object. + glossary_name: The full glossary name. + + Returns: + A list of glossary categories. + + Raises: + DataplexAPIError: If there is an error during the API call. + NoCategoriesFoundError: If no categories are found. + """ + try: + request = dataplex_service.projects().locations().glossaries().categories().list( + parent=glossary_name, pageSize=1000) + + categories = [] + while request: + response = request.execute() + if 'categories' in response: + categories.extend(response['categories']) + request = dataplex_service.projects().locations().glossaries().categories().list_next(request, response) + if not categories: + logger.warning(f"No categories found for {glossary_name}") + raise NoCategoriesFoundError(f"No categories found for {glossary_name}") + return categories + except Exception as e: + logger.error(f"Error while listing glossary categories: {e}") + raise DataplexAPIError(f"Error while listing glossary categories: {e}") + + +def list_glossary_terms(dataplex_service: build, glossary_name: str) -> List[Dict]: + """Lists terms from a Dataplex glossary, handling pagination. + + Args: + dataplex_service: The Dataplex API service object. + glossary_name: The full glossary name. + + Returns: + A list of glossary terms. + + Raises: + DataplexAPIError: If there is an error during the API call. + """ + try: + terms = [] + page_token = None + while True: + request = dataplex_service.projects().locations().glossaries().terms().list( + parent=glossary_name, pageSize=1000, pageToken=page_token + ) + response = request.execute() + + if 'terms' in response: + terms.extend(response['terms']) + + page_token = response.get('nextPageToken') + if not page_token: + break + + if not terms: + logger.warning(f"No terms found for {glossary_name}") + return terms + except Exception as e: + logger.error(f"Error while listing glossary terms for {glossary_name}: {e}", exc_info=True) + raise DataplexAPIError(f"Error while listing glossary terms for {glossary_name}: {e}") + +def lookup_entry_links_for_term(entry_id: str, user_project: str) -> Optional[List[Dict]]: + """Looks up EntryLinks for a given glossary term. + + Args: + entry_id: The full entry ID for the term. + user_project: The project ID to bill for the API call. + + Returns: + A list of EntryLinks or None if none are found. + + Raises: + DataplexAPIError: If there is an error during the API call. + """ + try: + # Extract project and location from entry ID + match = DATAPLEX_ENTRY_PATTERN.match(entry_id) + if not match: + logger.error(f"Invalid entry ID format: {entry_id}") + raise InvalidEntryIdFormatError(f"Invalid entry ID format: {entry_id}") + + project_id, location_id = match.group('project_id'), match.group('location_id') + + # Construct the API URL + url = build_entry_lookup_url(entry_id, project_id, location_id) + + response = fetch_api_response( + method=requests.get, + url=url, + project_id=user_project + ) + + if response.get('error_msg'): + logger.error(f"Error looking up entry links: {response['error_msg']}") + + data = response.get('json', {}) + entry_links = data.get('entryLinks', []) + return entry_links if entry_links else None + + except Exception as e: + logger.error(f"Error while looking up entry links: {e}") + return None + +def build_entry_lookup_url(entry_id, project_id, location_id): + return f"{DATAPLEX_BASE_URL}/projects/{project_id}/locations/{location_id}:lookupEntryLinks?entry={entry_id}" + +def lookup_entry(dataplex_service: build, entry_id: str, project_location_name: str) -> Optional[Dict]: + """Looks up an entry using the Dataplex API. + + Args: + dataplex_service: The Dataplex API service object. + entry_id: The entry ID to look up. + + Returns: + The entry response as a dictionary, or None if an error occurs. + + Raises: + DataplexAPIError: If there is an error during the API call. + """ + try: + request = dataplex_service.projects().locations().lookupEntry(name=project_location_name, entry=entry_id, view="ALL") + response = request.execute() + return response + except Exception as e: + logger.error(f"Error while looking up entry {entry_id}: {e}") + + +def get_spreadsheet_id(spreadsheet_url: str) -> str: + """Extracts the spreadsheet ID from the URL. + + Args: + spreadsheet_url: The URL of the Google Sheet. + + Returns: + The spreadsheet ID. + + Raises: + InvalidSpreadsheetURLError: If the spreadsheet URL is invalid. + """ + match = SPREADSHEET_URL_PATTERN.match(spreadsheet_url) + if not match: + logger.error(f"Invalid spreadsheet URL: {spreadsheet_url}") + raise InvalidSpreadsheetURLError(f"Invalid spreadsheet URL: {spreadsheet_url}") + return match.group('spreadsheet_id') + +def read_from_sheet(sheets_service: build, spreadsheet_id: str, range_name: str = 'A:Z') -> List[List[str]]: + """Reads data from a Google Sheet. + + Args: + sheets_service: The Google Sheets API service object. + spreadsheet_id: The ID of the spreadsheet. + range_name: The A1 notation of the range to retrieve (default: 'A:Z'). + + Returns: + A list of rows, where each row is a list of values. + + Raises: + SheetsAPIError: If there is an error during the read operation. + """ + try: + result = sheets_service.spreadsheets().values().get( + spreadsheetId=spreadsheet_id, + range=range_name + ).execute() + values = result.get('values', []) + return values + except Exception as e: + logger.error(f"Error while reading from spreadsheet: {e}") + raise SheetsAPIError(f"Error while reading from spreadsheet: {e}") + +def write_to_sheet(sheets_service: build, spreadsheet_id: str, data: List[List[str]], range_name: str = 'A1') -> None: + """Writes data to a Google Sheet. + + Args: + sheets_service: The Google Sheets API service object. + spreadsheet_id: The ID of the spreadsheet. + data: The data to write to the sheet (list of rows). + range_name: The A1 notation of the starting cell (default: 'A1'). + + Raises: + SheetsAPIError: If there is an error during the write operation. + """ + try: + sheets_service.spreadsheets().values().update( + spreadsheetId=spreadsheet_id, + range=range_name, + valueInputOption='USER_ENTERED', + body={'values': data} + ).execute() + logger.info(f"Data written to spreadsheet: {spreadsheet_id}") + except Exception as e: + logger.error(f"Error while writing to spreadsheet: {e}") + raise SheetsAPIError(f"Error while writing to spreadsheet: {e}") + + +def _get_project_url(project_id: str) -> str: + """Builds the Cloud Resource Manager project URL.""" + return f"{CLOUD_RESOURCE_MANAGER_BASE_URL}/projects/{project_id}" + + +def _fetch_project_info(project_id: str, user_project: str) -> dict: + """Calls the Cloud Resource Manager API and returns the project JSON payload.""" + url = _get_project_url(project_id) + response = api_call_utils.fetch_api_response(requests.get, url, user_project) + if response["error_msg"]: + logger.error(f"Failed to fetch project info for '{project_id}': {response['error_msg']}") + sys.exit(1) + return response.get("json", {}) + + +def _extract_project_number_from_info(project_info: dict) -> str: + """Extracts the numeric project number from the project info 'name' field.""" + name = project_info.get("name", "") + match = PROJECT_PATTERN.search(name) + if match: + return match.group('project_number') + logger.error("Project number not found in project info.") + sys.exit(1) + + +def get_project_number(project_id: str, user_project: str) -> str: + """Fetches the project number from the project ID (composed from smaller helpers).""" + project_info = _fetch_project_info(project_id, user_project) + return _extract_project_number_from_info(project_info) + diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/argument_parser.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/argument_parser.py new file mode 100644 index 00000000..688bdc9b --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/argument_parser.py @@ -0,0 +1,87 @@ +""" +Utility functions for handling command-line arguments in the EntryLink export script. +""" + +import argparse +import re +import logging +from typing import Tuple + +logger = logging.getLogger(__name__) + +def extract_glossary_info_from_url(glossary_url: str) -> Tuple[str, str, str]: + """Extract project ID, location, and glossary ID from Dataplex glossary resource name.""" + logger.debug(f"Extracting glossary info from resource name: {glossary_url}") + + # Pattern to match the glossary resource name + pattern = r'projects/([^/]+)/locations/([^/]+)/glossaries/([^/?&#]+)' + + match = re.search(pattern, glossary_url) + if not match: + raise ValueError( + f"Invalid glossary resource name format. Expected pattern: " + f"'projects/PROJECT_ID/locations/LOCATION/glossaries/GLOSSARY_ID'. Got: {glossary_url}" + ) + + project_id = match.group(1) + location = match.group(2) + glossary_id = match.group(3) + + logger.debug(f"Extracted: project_id={project_id}, location={location}, glossary_id={glossary_id}") + return project_id, location, glossary_id + +def configure_export_entrylinks_argument_parser(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: + """Create and return the command line argument parser.""" + parser.add_argument( + "--glossary-url", + required=True, + help="Dataplex Glossary URL to export EntryLinks" + ) + parser.add_argument( + "--spreadsheet-url", + required=True, + help="Google Sheets URL to export EntryLinks data" + ) + parser.add_argument( + "--user-project", + required=True, + help="Google Cloud Project ID to bill for API calls" + ) + return parser + +def get_export_entrylinks_arguments(argv=None) -> argparse.Namespace: + """Gets arguments for the entry links export program.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter + ) + configure_export_entrylinks_argument_parser(parser) + return parser.parse_args(argv) + + +def get_import_entrylinks_arguments(argv=None) -> argparse.Namespace: + """Gets arguments for the entry links import program.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter + ) + configure_import_entrylinks_argument_parser(parser) + return parser.parse_args(argv) + + +def configure_import_entrylinks_argument_parser(parser): + parser.add_argument( + "--spreadsheet-url", + required=True, + help="Google Sheets URL containing EntryLinks to import" + ) + parser.add_argument( + "--buckets", + help="Comma-separated list of GCS bucket ids. Example: --buckets=\"bucket-1,bucket-2\"", + type=lambda s: [item.strip() for item in s.split(",") if item.strip()], + required=True, + metavar="[bucket-1,bucket-2,...]" + ) + parser.add_argument( + "--user-project", + required=True, + help="Google Cloud Project ID to bill for API calls" + ) \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/business_glossary_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/business_glossary_utils.py new file mode 100644 index 00000000..ec38c8ab --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/business_glossary_utils.py @@ -0,0 +1,123 @@ +""" +Business Glossary Utility Functions + +Common utility functions for working with Dataplex Glossary resources. +""" + +# Standard library imports +import re +import uuid + +# Local imports +from utils.constants import ( + DATAPLEX_SYSTEM_ENTRY_GROUP, + GLOSSARY_NAME_PATTERN, + GLOSSARY_URL_PATTERN, + TERM_NAME_PATTERN, +) +from utils.error import InvalidTermNameError + + +def extract_glossary_name(url: str) -> str: + """ + Extract the glossary resource name from a Dataplex URL or resource name. + + Args: + url: Either a full Dataplex glossary URL or a glossary resource name + + Returns: + The glossary resource name in format: projects/{project}/locations/{location}/glossaries/{glossary} + + Raises: + ValueError: If the URL/resource name format is invalid + """ + # First try to match as a URL + match = GLOSSARY_URL_PATTERN.match(url) + if match: + project_id = match.group('project_id') + location_id = match.group('location_id') + glossary_id = match.group('glossary_id') + return f"projects/{project_id}/locations/{location_id}/glossaries/{glossary_id}" + + # Try to match as a direct resource name + glossary_name = url.strip().rstrip('/') + match = GLOSSARY_NAME_PATTERN.match(glossary_name) + if match: + return glossary_name + + raise ValueError( + f"Could not extract glossary resource from URL: {url}. " + f"Expected format: 'projects/{{project}}/locations/{{location}}/glossaries/{{glossary}}' " + f"or a URL containing this pattern." + ) + + +def generate_entry_name_from_term_name(term_name: str) -> str: + """ + Generates a Dataplex entry ID from a glossary term name. + + Args: + term_name: The full term name in format: + projects/{project}/locations/{location}/glossaries/{glossary}/terms/{term} + Returns: + The generated entry ID in format: + projects/{project}/locations/{location}/entryGroups/@dataplex/entries/projects/{project}/locations/{location}/glossaries/{glossary}/terms/{term} + """ + match = TERM_NAME_PATTERN.match(term_name) + if not match: + raise InvalidTermNameError(f"Invalid term name format: {term_name}") + + project_id = match.group('project_id') + location_id = match.group('location_id') + glossary_id = match.group('glossary_id') + term_id = match.group('term_id') + + return ( + f"projects/{project_id}/locations/{location_id}/entryGroups/{DATAPLEX_SYSTEM_ENTRY_GROUP}/entries/" + f"projects/{project_id}/locations/{location_id}/glossaries/{glossary_id}/terms/{term_id}" + ) + + +def normalize_id(name: str) -> str: + """ + Converts a string to a valid Dataplex ID (lowercase, numbers, hyphens), starting with a letter. + + Args: + name: The string to normalize + + Returns: + A normalized ID suitable for Dataplex (lowercase, numbers, hyphens, starts with letter) + + Example: + >>> normalize_id("My Special ID!") + 'my-special-id' + >>> normalize_id("123-start-with-number") + 'g123-start-with-number' + """ + if not name: + return "" + normalized = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-") + # Ensure starts with a letter + if not normalized or not normalized[0].isalpha(): + normalized = "g" + normalized + return normalized + + +def get_entry_link_id() -> str: + """ + Generate a unique entry link ID that starts with a lowercase letter + and contains only lowercase letters and numbers. + + Returns: + A unique entry link ID in format: g{uuid_hex} + + Example: + >>> id = get_entry_link_id() + >>> id.startswith('g') + True + >>> len(id) + 33 + """ + entrylink_id = 'g' + uuid.uuid4().hex + return entrylink_id + diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/constants.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/constants.py new file mode 100644 index 00000000..6b2269d4 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/constants.py @@ -0,0 +1,93 @@ +"""Constants used by the Business Glossary Migration Tool.""" + +import re + +# --- URLs --- +DATACATALOG_BASE_URL = "https://datacatalog.googleapis.com/v2" +DATAPLEX_BASE_URL = "https://dataplex.googleapis.com/v1" +SEARCH_BASE_URL = "https://datacatalog.googleapis.com/v1/catalog:search" +CLOUD_RESOURCE_MANAGER_BASE_URL = "https://cloudresourcemanager.googleapis.com/v3" + +# --- Dataplex Entry Group Constants --- +DATAPLEX_SYSTEM_ENTRY_GROUP = "@dataplex" +BIGQUERY_SYSTEM_ENTRY_GROUP = "@bigquery" + +# --- Regex Patterns --- +# Glossary and Term Patterns +GLOSSARY_URL_PATTERN = re.compile(r".*dp-glossaries/projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/?#]+).*") +GLOSSARY_NAME_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/]+)") +TERM_NAME_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/]+)/terms/(?P[^/]+)") +CATEGORY_NAME_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/glossaries/(?P[^/?#]+)/categories/(?P[^/]+)") + +# Entry Patterns +ENTRY_NAME_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/entryGroups/(?P[^/]+)/entries/(?P.*)") +DATAPLEX_ENTRY_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/entryGroups/@dataplex/entries/.*") + +# EntryLink Patterns +ENTRYLINK_NAME_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/entryGroups/(?P[^/]+)/entryLinks/(?P[^/]+)") +ENTRYLINK_TYPE_PATTERN = re.compile(r"projects/(?:655216118709|dataplex-types)/locations/global/entryLinkTypes/(?P[^/]+)") + +# Source Entry Pattern (for extracting project/location/entryGroup from full entry paths) +SOURCE_ENTRY_PATTERN = re.compile(r"projects/(?P[^/]+)/locations/(?P[^/]+)/entryGroups/(?P[^/]+)/entries/") + +# Google Sheets Pattern +SPREADSHEET_URL_PATTERN = re.compile(r"https://docs\.google\.com/spreadsheets/d/(?P[^/]+)") + +# Validation Patterns +EMAIL_PATTERN = re.compile(r"^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$") +ID_PATTERN = re.compile(r"^[a-z][a-z0-9_-]*$") +PARENT_PATTERN = re.compile(r"^[a-z][a-z0-9_-]*$") +LABEL_PATTERN = re.compile(r"^[a-z0-9_-]+$") + +# Project Pattern +PROJECT_PATTERN = re.compile(r"projects/(?P\d+)") + + +# --- Dataplex Constants --- +# Dataplex Aspects +ASPECT_CONTACTS = "contacts" +ASPECT_OVERVIEW = "overview" + +# Dataplex Link Types +DP_LINK_TYPE_DEFINITION = "definition" +DP_LINK_TYPE_RELATED = "related" +DP_LINK_TYPE_SYNONYM = "synonym" + +# Entry Reference Types +ENTRY_REFERENCE_TYPE_SOURCE = "SOURCE" +ENTRY_REFERENCE_TYPE_TARGET = "TARGET" + +# Dataplex Entry Types / Aspect Prefixes +DP_TYPE_GLOSSARY_CATEGORY = "glossary-category" +DP_TYPE_GLOSSARY_TERM = "glossary-term" +DP_TYPE_GLOSSARY = "glossary" +ASPECT_TYPE_CATEGORY = "glossary-category-aspect" +ASPECT_TYPE_TERM = "glossary-term-aspect" + +# --- General Constants --- +CATEGORIES = "categories" +TERMS = "terms" +MAX_DESC_SIZE_BYTES = 120 * 1024 +MAX_WORKERS = 10 +PAGE_SIZE = 1000 +PROJECT_NUMBER = "655216118709" + +# -- BACKOFF Constants --- +MAX_ATTEMPTS = 10 +INITIAL_BACKOFF_SECONDS = 1.0 +MAX_BACKOFF_SECONDS = 300 + +# --- Filesystem Constants --- +LOGS_DIRECTORY = "logs" +SUMMARY_DIRECTORY = "summary" + +MAX_BUCKETS = 20 +MAX_POLLS = 12*12 # 12 hours +POLL_INTERVAL_MINUTES = 5 +QUEUED_TIMEOUT_MINUTES = 10 + +LINK_TYPES = { + DP_LINK_TYPE_DEFINITION: 'projects/dataplex-types/locations/global/entryLinkTypes/definition', + DP_LINK_TYPE_SYNONYM: 'projects/dataplex-types/locations/global/entryLinkTypes/synonym', + DP_LINK_TYPE_RELATED: 'projects/dataplex-types/locations/global/entryLinkTypes/related' +} \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/dataplex_dao.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/dataplex_dao.py new file mode 100644 index 00000000..b450d9ff --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/dataplex_dao.py @@ -0,0 +1,155 @@ +# Standard library imports +import re +import time +import uuid + +# Third-party imports +import google.auth +import google_auth_httplib2 +import httplib2 +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +# Local imports +from utils import logging_utils +from utils.constants import INITIAL_BACKOFF_SECONDS, MAX_ATTEMPTS, MAX_POLLS, POLL_INTERVAL_MINUTES +from utils.error_utils import TRANSIENT_EXCEPTIONS, extract_error_detail, handle_transient_error, is_transient_http_error + +logger = logging_utils.get_logger() + +def get_dataplex_service(): + """Returns an authenticated Dataplex service client.""" + logger.debug("Initializing Dataplex service client.") + credentials, _ = google.auth.default() + http_client = httplib2.Http(timeout=300) + authorized_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http_client) + return build('dataplex', 'v1', http=authorized_http, cache_discovery=False) + +def is_job_succeeded(state: str) -> bool: + return state in ("SUCCEEDED", "SUCCEEDED_WITH_ERRORS") + +def is_job_failed(state: str) -> bool: + return state == "FAILED" + +def is_job_queued(state: str) -> bool: + return state == "QUEUED" + +def log_job_failure(job: dict, job_id: str): + error_msg = job.get("status", {}).get("message", "No error message provided.") + logger.error(f"Job '{job_id}' FAILED. Reason: {error_msg}") + +def normalize_job_id(job_prefix: str) -> str: + return re.sub(r"[^a-z0-9-]", "-", job_prefix.lower()).strip("-")[:50] + +def generate_job_id(job_prefix: str) -> str: + normalized_job_id = normalize_job_id(job_prefix) + return f"{normalized_job_id}-{uuid.uuid4().hex[:8]}" + +def validate_create_job_params(service, project_id, location, payload, generated_job_id) -> bool: + if not service or not project_id or not location or not payload or not generated_job_id: + logger.debug( + f"create_metadata_job input: service={service}, project_id={project_id}, " + f"location={location}, payload={payload}, job_id={generated_job_id} | output: False (Missing parameters)" + ) + logger.error("Missing required parameters for metadata job creation.") + return False + return True + +def log_metadata_job_submission(service, project_id, location, payload, generated_job_id, response): + logger.debug( + f"create_metadata_job input: service={service}, project_id={project_id}, " + f"location={location}, payload={payload}, job_id={generated_job_id} | output: {response}" + ) + logger.info(f"Job '{generated_job_id}' submitted successfully.") + +def create_metadata_job(service, project_id: str, location: str, payload: dict, job_prefix: str, fake_job: bool = False) -> str: + """ + Generates a unique job ID and creates a metadata job with exponential backoff retry. + + Retries on transient server errors (5xx), rate-limit (429) and network/transport exceptions + until TOTAL_RETRY_TIMEOUT_SECONDS elapses. Returns the generated job id on success; + on failure returns '' (or error detail string when fake_job=True). + """ + generated_job_id = generate_job_id(job_prefix) + parent = f"projects/{project_id}/locations/{location}" + + if not validate_create_job_params(service, project_id, location, payload, generated_job_id): + return "" + + backoff = INITIAL_BACKOFF_SECONDS + for attempt in range(1, MAX_ATTEMPTS + 1): + try: + response = service.projects().locations().metadataJobs().create( + parent=parent, metadataJobId=generated_job_id, body=payload + ).execute() + log_metadata_job_submission(service, project_id, location, payload, generated_job_id, response) + return generated_job_id + except HttpError as error: + error_detail = extract_error_detail(error) + # Retry only if transient http error (5xx or 429) and attempts remain + if is_transient_http_error(error) and attempt < MAX_ATTEMPTS: + backoff = handle_transient_error(generated_job_id, backoff, attempt, error_detail) + continue + if fake_job: + return error_detail + logger.error(f"Failed to create metadata job '{generated_job_id}' with error: {error_detail}") + return "" + except Exception as exception: + error_message = str(exception) + # Retry only for transient exceptions + if isinstance(exception, TRANSIENT_EXCEPTIONS) and attempt < MAX_ATTEMPTS: + backoff = handle_transient_error(generated_job_id, backoff, attempt, error_message) + continue + if fake_job: + return error_message + logger.error(f"Unexpected error during metadata job creation for job id {generated_job_id} - {error_message}") + return "" + return "" + +def create_and_monitor_job(service, project_id: str, location: str, payload: dict, job_prefix: str) -> bool: + """Creates a metadata job and monitors it until completion. Returns True only if job creation and monitoring succeed.""" + try: + job_id = create_metadata_job(service, project_id, location, payload, job_prefix) + if job_id: + return poll_metadata_job(service, project_id, location, job_id) + else: + logger.error(f"Failed to create job '{job_prefix}': job_id is empty") + return False + except Exception as e: + logger.error(f"Failed to create or monitor job '{job_prefix}': {e}") + logger.debug(f"create_and_monitor_job input: service={service}, project_id={project_id}, location={location}, payload={payload}, job_id={job_prefix} | output: {e}") + return False + + +def poll_metadata_job(service, project_id: str, location: str, job_id: str) -> bool: + """Polls a metadata job until completion or failure.""" + logger.info(f"Polling status for job '{job_id}' every {POLL_INTERVAL_MINUTES} minutes...") + poll_interval = POLL_INTERVAL_MINUTES * 60 + max_polls = MAX_POLLS + job_path = f"projects/{project_id}/locations/{location}/metadataJobs/{job_id}" + + for i in range(max_polls): + time.sleep(poll_interval) + job, state = get_job_and_state(service, job_path, job_id) + if job is None: + return False + if is_job_succeeded(state): + logger.info(f"Job '{job_id}' SUCCEEDED.") + return True + if is_job_failed(state): + log_job_failure(job, job_id) + return False + logger.info(f"Job '{job_id}' is {state}. Continuing to wait... (check {i+1}/{max_polls})") + logger.warning(f"Polling timed out for job '{job_id}'.") + return False + +def get_job_and_state(service, job_path: str, job_id: str): + try: + job = service.projects().locations().metadataJobs().get(name=job_path).execute() + state = job.get("status", {}).get("state") + logger.debug(f"Job '{job_id}' and entire job: {job}") + return job, state + except HttpError as err: + logger.error(f"Error polling job '{job_id}'") + logger.debug(f"input: service={service}, job_path={job_path}, job_id={job_id} | output: {err}") + return None, None \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/error.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/error.py new file mode 100644 index 00000000..d3f17ff7 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/error.py @@ -0,0 +1,38 @@ +""" Custom exception classes for handling specific error scenarios + in the Dataplex Glossary Import utility.""" +# --- Custom Exception Classes --- +class InvalidSpreadsheetURLError(Exception): + """Raised when the provided spreadsheet URL is invalid.""" + pass + +class InvalidGlossaryNameError(Exception): + """Raised when the provided glossary name is invalid.""" + pass + +class DataplexAPIError(Exception): + """Raised when there is an error interacting with the Dataplex API.""" + pass + +class SheetsAPIError(Exception): + """Raised when there is an error interacting with the Google Sheets API.""" + pass + +class NoCategoriesFoundError(Exception): + """Raised when no categories are found for the given glossary.""" + pass + +class NoTermsFoundError(Exception): + """Raised when no terms are found for the given glossary.""" + pass + +class InvalidTermNameError(Exception): + """Raised when term name is invalid.""" + pass + +class InvalidCategoryNameError(Exception): + """Raised when Category name is invalid.""" + pass + +class InvalidEntryIdFormatError(Exception): + """Raised when the entry ID format is invalid.""" + pass diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/error_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/error_utils.py new file mode 100644 index 00000000..70b29b37 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/error_utils.py @@ -0,0 +1,52 @@ +import time +import random +import json +import logging +from googleapiclient.errors import HttpError +import requests +import socket +from utils.constants import MAX_ATTEMPTS + +logger = logging.getLogger(__name__) + +TRANSIENT_EXCEPTIONS = ( + requests.exceptions.RequestException, + ConnectionError, + TimeoutError, + socket.timeout, + ) +def handle_transient_error(generated_job_id, backoff, attempt, error_detail): + sleep_time = backoff + random.uniform(0, 0.5) + logger.info(f"Transient error creating metadata job '{generated_job_id}' (attempt {attempt}/{MAX_ATTEMPTS}): {error_detail}. " + f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + backoff *= 2 + return backoff + +def is_transient_http_error(http_error: HttpError) -> bool: + """Return True if the HttpError is a transient error we should retry (5xx or 429).""" + try: + # googleapiclient.errors.HttpError often exposes `.resp.status` (or .status_code) + if hasattr(http_error, "resp") and getattr(http_error.resp, "status", None) is not None: + status = int(http_error.resp.status) + elif hasattr(http_error, "status_code"): + status = int(http_error.status_code) + else: + return False + return status >= 500 or status == 429 + except Exception: + return False + +def extract_error_detail(error: HttpError) -> str: + try: + # error.content is a byte string containing the HTTP response body + error_json = json.loads(error.content.decode('utf-8')) + details = error_json.get("error", {}).get("details", []) + for detail in details: + if "detail" in detail: + return detail["detail"] + + return error_json.get("error", {}).get("message", "Unknown error") + + except Exception: + return f"HttpError {error}" \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/file_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/file_utils.py new file mode 100644 index 00000000..ef850484 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/file_utils.py @@ -0,0 +1,88 @@ +""" +Helper functions for writing data to files and reading metadata from files. +""" + +# Standard library imports +import json +import os + +# Local imports +from utils import logging_utils + +logger = logging_utils.get_logger() + +def ensure_dir(path: str): + """Create directory if it doesn't exist.""" + if not os.path.exists(path): + os.makedirs(path) + logger.debug(f"Created directory: {path}") + + +def parse_json_line(line: str) -> dict | None: + """Parse a JSON line, return None if invalid.""" + try: + return json.loads(line) + except json.JSONDecodeError: + return None + + +def read_first_json_line(file_path: str) -> dict | None: + """Returns the JSON object from the first line of a file, or None if unreadable.""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + first_line = f.readline() + if not first_line: + return None + return json.loads(first_line) + except (IOError, json.JSONDecodeError): + return None + + +def is_file_empty(file_path: str) -> bool: + """Check if a file is empty or doesn't exist.""" + if not os.path.exists(file_path): + return True + return os.path.getsize(file_path) == 0 + + +def move_file_to_imported_folder(file_path: str): + """Delete the processed file.""" + if not os.path.exists(file_path): + logger.warning(f"File not found: {file_path}. Skipping move/delete operation.") + return + try: + os.remove(file_path) + logger.debug(f"Deleted local file: {file_path}") + except Exception as e: + logger.error(f"Failed to delete local file {file_path}: {e}") + logger.debug(f"Failed to delete local file {file_path}: {e}", exc_info=True) + + +def get_link_type(file_path: str) -> str | None: + """Get the entry link type from a JSON file.""" + data = read_first_json_line(file_path) + if data: + return data.get("entryLink", {}).get("entryLinkType", "") + return None + + +def get_entry_group(file_path: str) -> str | None: + """Extract entry group from an entrylink file.""" + data = read_first_json_line(file_path) + if data: + name = data.get("entryLink", {}).get("name", "") + return name.split('/entryLinks/')[0] if name else None + return None + + +def write_entrylinks_to_file(entrylinks: list[dict], output_dir: str, filename: str) -> str: + """Write entrylinks to a JSON file (one entry per line).""" + ensure_dir(output_dir) + filepath = os.path.join(output_dir, filename) + + with open(filepath, 'w', encoding='utf-8') as f: + for entrylink in entrylinks: + f.write(f"{json.dumps(entrylink)}\n") + + logger.debug(f"Wrote {len(entrylinks)} entrylinks to {filepath}") + return filepath \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/gcs_dao.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/gcs_dao.py new file mode 100644 index 00000000..4be309e1 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/gcs_dao.py @@ -0,0 +1,86 @@ + + +from google.cloud import storage +from utils import logging_utils, dataplex_dao +logger = logging_utils.get_logger() + + +def prepare_gcs_bucket(gcs_bucket: str, file_path: str, filename: str) -> bool: + """Prepares GCS bucket by clearing it and uploading the file. Returns True only if both operations succeed.""" + if not clear_bucket(gcs_bucket): + logger.error(f"Failed to clear bucket '{gcs_bucket}'. Skipping upload.") + return False + + if not upload_to_gcs(gcs_bucket, file_path, filename): + logger.error(f"Failed to upload '{filename}' to bucket '{gcs_bucket}'.") + return False + + return True + + +def upload_to_gcs(bucket_name: str, file_path: str, file_name: str) -> bool: + try: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(file_name) + blob.upload_from_filename(file_path) + logger.debug(f"Uploaded {file_path} -> gs://{bucket_name}/{file_name}") + return True + except Exception as error: + logger.error("Failed to upload '%s' to bucket '%s' with error '%s'", file_path, bucket_name, error) + return False + + +def clear_bucket(bucket_name: str) -> bool: + """Deletes all objects in a bucket. Returns True on success, False on failure.""" + try: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blobs = list(bucket.list_blobs()) + if not blobs: + logger.debug(f"Bucket '{bucket_name}' is already empty.") + return True + bucket.delete_blobs(blobs) + logger.debug(f"Deleted {len(blobs)} objects from bucket '{bucket_name}'.") + return True + except Exception as error: + logger.error("Failed to clear GCS bucket '%s' with error as '%s'", bucket_name, error) + return False + +def build_dummy_payload(bucket_name): + return { + "type": "IMPORT", + "import_spec": { + "log_level": "DEBUG", + "source_storage_uri": f"gs://{bucket_name}/", + "entry_sync_mode": "FULL", + "aspect_sync_mode": "INCREMENTAL", + "scope": { + "glossaries": [f"projects/dummy-project-id/locations/global/glossaries/dummy-glossary"] + } + } + } + +def check_metadata_job_creation_for_bucket(service, project_id: str, bucket_name: str) -> bool: + """ + Tries to create a dummy metadata job using the specific GCS bucket to check if the Dataplex service account has permissions. + Returns True if the permission check passes, False if permission is denied for that bucket. + """ + dummy_payload = build_dummy_payload(bucket_name) + job_prefix = "permission-check" + location = "global" + result = dataplex_dao.create_metadata_job(service, project_id, location, dummy_payload, job_prefix, fake_job=True) + + if "does not have sufficient permission" in result: + logger.error(result) + return False + return True + + +def check_all_buckets_permissions(buckets: list[str], project_number: str) -> bool: + """Checks if the Dataplex service account associated with the project number has permissions on all specified GCS buckets.""" + service = dataplex_dao.get_dataplex_service() + for bucket in buckets: + if not check_metadata_job_creation_for_bucket(service, project_number, bucket): + return False + return True diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/import_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/import_utils.py new file mode 100644 index 00000000..738c6e44 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/import_utils.py @@ -0,0 +1,144 @@ +# Standard library imports +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from itertools import cycle +from typing import Dict, List + +# Local imports +from utils import dataplex_dao, file_utils, gcs_dao, import_utils, logging_utils, payloads +from utils.constants import ( + DP_LINK_TYPE_RELATED, + DP_LINK_TYPE_SYNONYM, + ENTRYLINK_NAME_PATTERN, +) + +logger = logging_utils.get_logger() + + +def create_import_json_files(grouped_entrylinks: Dict[str, Dict[str, List[Dict]]], archive_dir: str) -> List[str]: + """ + Create JSON files from grouped entrylinks for import. + + Args: + grouped_entrylinks: Dictionary grouped by link_type and entry_group + archive_dir: Directory path where JSON files should be created + + Returns: + List of file paths to the created JSON files + """ + import_files = [] + + for link_type, entry_groups in grouped_entrylinks.items(): + for group_key, group_entrylinks in entry_groups.items(): + # group_key is already in project_id_location_entryGroup format + filename = f"entrylinks_{link_type}_{group_key}.json" + json_file = file_utils.write_entrylinks_to_file(group_entrylinks, archive_dir, filename) + import_files.append(json_file) + logger.info(f"Created import file: {json_file} with {len(group_entrylinks)} entrylinks") + + return import_files + + +def get_referenced_scopes(file_path: str, main_project_id: str) -> list: + link_type = file_utils.get_link_type(file_path) + if link_type and (DP_LINK_TYPE_RELATED in link_type or DP_LINK_TYPE_SYNONYM in link_type): + scopes = file_utils.get_project_scopes_from_all_lines(file_path) + else: + scopes = file_utils.get_project_scope_from_first_line(file_path) + scopes.add(f"projects/{main_project_id}") + return list(scopes) + +def extract_project_id_from_entrylink(entry_link_json: Dict) -> str: + """Extract project ID from the EntryLink name itself.""" + try: + # Extract from EntryLink name: projects/{project_id}/locations/{location}/entryGroups/{entryGroup}/entryLinks/{id} + entrylink_name = entry_link_json['entryLink']['name'] + match = ENTRYLINK_NAME_PATTERN.match(entrylink_name) + if not match: + logger.error(f"Invalid EntryLink name format: {entrylink_name}") + return None + return match.group('project_id') + except (KeyError, IndexError) as e: + logger.error(f"Failed to extract project ID from EntryLink name: {e}") + return None + + +def process_import_file(file_path: str, gcs_bucket: str) -> bool: + """ + Processes a single glossary or entrylink file: + - Builds payload + - Uploads file to GCS (via prepare_gcs_bucket) + - Creates & monitors Dataplex job + - Moves local file to imported folder on success (or removes empty files) + """ + filename = os.path.basename(file_path) + + if file_utils.is_file_empty(file_path): + logger.info(f"File {filename} is empty. Skipping the import job.") + file_utils.move_file_to_imported_folder(file_path) + return True + + logger.debug(f"Processing file: {filename}") + + service = dataplex_dao.get_dataplex_service() + first_entry = file_utils.read_first_json_line(file_path) + project_id = import_utils.extract_project_id_from_entrylink(first_entry) + job_id, payload, job_location = payloads.build_payload(file_path, project_id, gcs_bucket) + if not payload or not job_id or not job_location: + return False + + try: + # Upload file to GCS first; only continue if upload succeeded + upload_status = gcs_dao.prepare_gcs_bucket(gcs_bucket, file_path, filename) + if not upload_status: + logger.error(f"Failed to prepare GCS bucket '{gcs_bucket}' for file '{filename}'. Skipping import.") + return False + job_success = dataplex_dao.create_and_monitor_job(service, project_id, job_location, payload, job_id) + if job_success: + file_utils.move_file_to_imported_folder(file_path) + return job_success + except Exception as e: + logger.error(f"Error processing file {file_path}: {e}") + logger.debug(f"Error processing file {file_path}: {e}", exc_info=True) + return False + + +def _process_files_for_bucket(files_for_bucket: List[str], bucket: str) -> List[bool]: + """Worker: processes all files assigned to a bucket sequentially.""" + results = [] + for f in files_for_bucket: + try: + result = import_utils.process_import_file(f, bucket) + except Exception as e: + logger.error(f"Error processing file {f} in bucket {bucket}: {e}") + logger.debug(f"Error processing file {f} in bucket {bucket}: {e}", exc_info=True) + result = False + results.append(result) + return results + + +def run_import_files(files: List[str], buckets: List[str]) -> List[bool]: + """ + Distribute files round-robin to buckets, start one worker per bucket and process sequentially + to guarantee no two threads operate on the same bucket. + """ + if not buckets: + logger.error("No buckets provided to run_import_files.") + return [False] * len(files) + + bucket_file_map: Dict[str, List[str]] = {b: [] for b in buckets} + cycler = cycle(buckets) + for f in files: + b = next(cycler) + bucket_file_map[b].append(f) + + results: List[bool] = [] + with ThreadPoolExecutor(max_workers=len(buckets)) as executor: + future_map = { + executor.submit(_process_files_for_bucket, bucket_file_map[bucket], bucket): bucket + for bucket in buckets + } + for future in as_completed(future_map): + bucket_results = future.result() or [] + results.extend(bucket_results) + return results diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/logging_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/logging_utils.py new file mode 100644 index 00000000..e184ac9f --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/logging_utils.py @@ -0,0 +1,130 @@ +"""Custom logger for the Business Glossary import tool. +""" +import logging +import sys +import os +from datetime import datetime +from utils.constants import LOGS_DIRECTORY + +# Global instance of logger, instantiated only once. The instance is accessed by +# using the public method get_logger() +_logger = None + +class ConsoleLogFilter(logging.Filter): + """ + This filter is for the console. It allows only INFO and WARNING levels through + to the stdout console handler. DEBUG is blocked and ERROR+ will be routed to stderr. + """ + def filter(self, record): + # Allow only INFO and WARNING to stdout handler. + return record.levelno >= logging.INFO and record.levelno < logging.ERROR + +def get_logger() -> logging.Logger: + """ + Returns a singleton logger instance. + The logger is ALWAYS configured at the DEBUG level. + Handlers control what is actually displayed. + """ + global _logger + if _logger is not None: + return _logger + + # 1. Create the logger and set its level to the lowest possible. + _logger = logging.getLogger("glossary_tool") + _logger.setLevel(logging.DEBUG) + + # 2. Create the default console handler (shows INFO and WARNING only) + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.set_name("default_console") # Give it a name to find it later + stdout_handler.setLevel(logging.INFO) # Only process INFO and higher + stdout_handler.setFormatter(_LogFormatter()) + + # 3. Create the error handler for the console (ERROR and above) + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.set_name("default_err_console") + stderr_handler.setLevel(logging.ERROR) + stderr_handler.setFormatter(_LogFormatter()) + + # Attach handlers + _logger.addHandler(stdout_handler) + _logger.addHandler(stderr_handler) + + # Prevent messages from propagating to the root logger (avoid duplicates) + _logger.propagate = False + + return _logger + +def setup_file_logging(): + """ + Reconfigures the logging system for debugging mode. + - Creates `logs/` directory at dataplex-glossary level if missing. + - Adds a file handler to log all DEBUG messages. If a file handler already exists + it will be removed and replaced (so the content is rewritten). + - Adds a filter to the console handler to hide DEBUG messages and to exclude ERRORs from stdout. + """ + logger = get_logger() + + # Ensure logs directory exists at dataplex-glossary level + # Get the dataplex-glossary directory (parent of utils directory) + utils_dir = os.path.dirname(os.path.abspath(__file__)) + dataplex_glossary_dir = os.path.dirname(utils_dir) + logs_dir = os.path.join(dataplex_glossary_dir, LOGS_DIRECTORY) + os.makedirs(logs_dir, exist_ok=True) + + # Build filename and path + log_filename = f"logs_{datetime.now().strftime('%b-%d-%Y_%I-%M-%S%p')}.txt" + log_path = os.path.join(logs_dir, log_filename) + + # Create new file handler (mode='w' will overwrite any existing file with same name) + file_handler = logging.FileHandler(log_path, mode='w', encoding='utf-8') + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s' + ) + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.DEBUG) + + # Remove existing FileHandler(s) to avoid duplicates and replace with new one + # (iterate on a copy to avoid modifying list during iteration) + existing_handlers = list(logger.handlers) + for h in existing_handlers: + if isinstance(h, logging.FileHandler): + logger.removeHandler(h) + try: + h.close() + except Exception: + pass + + # Attach the new file handler + logger.addHandler(file_handler) + + # Add the ConsoleLogFilter to the stdout handler so stdout shows only INFO/WARNING. + for handler in logger.handlers: + if getattr(handler, "name", None) == "default_console": + handler.addFilter(ConsoleLogFilter()) + break + + logger.info("Debug logging is active. Logs will be saved in %s", log_filename) + + +class _LogFormatter(logging.Formatter): + """Format configuration for each logging level.""" + purple = "\x1b[35m" + green = "\x1b[32m" + yellow = "\x1b[33;20m" + red = "\x1b[31;20m" + reset = "\x1b[0m" + header = "%(asctime)s : %(levelname)s : " + debug_header = "%(funcName)s : %(levelname)s : " + message_format = "%(message)s" + + FORMATS = { + logging.DEBUG: purple + debug_header + reset + message_format, + logging.INFO: green + header + reset + message_format, + logging.WARNING: yellow + header + reset + message_format, + logging.ERROR: red + header + reset + message_format, + } + + def format(self, record): + log_format = self.FORMATS.get(record.levelno) + formatter = logging.Formatter(log_format, "%H:%M:%S") + return formatter.format(record) diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/models.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/models.py new file mode 100644 index 00000000..1fb893aa --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/models.py @@ -0,0 +1,84 @@ +""" +Data models for Dataplex Glossary Import/Export operations. + +These dataclasses provide type-safe representations of Dataplex resources +and eliminate the need for fragile dictionary access patterns. +""" + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class EntryReference: + """Represents a source or target reference within an EntryLink.""" + name: str + path: str = "" + type: Optional[str] = None + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'EntryReference': + """Create an EntryReference from a dictionary.""" + return cls( + name=data.get('name', ''), + path=data.get('path', ''), + type=data.get('type') + ) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary, excluding None values.""" + result = {'name': self.name} + if self.path: + result['path'] = self.path + if self.type: + result['type'] = self.type + return result + + +@dataclass +class EntryLink: + """Represents a complete EntryLink for import/export operations.""" + name: str + entryLinkType: str + entryReferences: List[EntryReference] = field(default_factory=list) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'EntryLink': + """Create EntryLink from a dictionary.""" + # Handle both nested and flat dictionary formats + entry_data = data.get('entryLink', data) + refs = [EntryReference.from_dict(ref) for ref in entry_data.get('entryReferences', [])] + return cls( + name=entry_data.get('name', ''), + entryLinkType=entry_data.get('entryLinkType', ''), + entryReferences=refs + ) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return { + 'entryLink': { + 'name': self.name, + 'entryLinkType': self.entryLinkType, + 'entryReferences': [ref.to_dict() for ref in self.entryReferences] + } + } + + +@dataclass +class SpreadsheetRow: + """Represents a row from the EntryLink import spreadsheet.""" + entry_link_type: str + source_entry: str + target_entry: str + source_path: str = "" + + @classmethod + def from_dict(cls, data: Dict[str, str]) -> 'SpreadsheetRow': + """Create a SpreadsheetRow from a dictionary.""" + return cls( + entry_link_type=data.get('entry_link_type', ''), + source_entry=data.get('source_entry', ''), + target_entry=data.get('target_entry', ''), + source_path=data.get('source_path', '') + ) diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/payloads.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/payloads.py new file mode 100644 index 00000000..41eaf6e6 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/payloads.py @@ -0,0 +1,168 @@ +import re +import os +from typing import Tuple, List, Optional +from utils import logging_utils, file_utils, business_glossary_utils +from utils.constants import DP_LINK_TYPE_DEFINITION + +logger = logging_utils.get_logger() + +def build_import_spec_base(gcs_bucket: str) -> dict: + return { + "log_level": "DEBUG", + "source_storage_uri": f"gs://{gcs_bucket}/", + "entry_sync_mode": "FULL", + "aspect_sync_mode": "INCREMENTAL" + } + +def extract_job_location_from_entry_group(entry_group: str) -> str: + if not entry_group: + return "global" + match = re.search(r'locations/([^/]+)', entry_group) + return match.group(1) if match else "global" + +def extract_glossary_id_from_synonym_related_filename(filename: str) -> str: + match = re.search(r'entrylinks_related_synonyms_(.*?)\.json', filename) + return match.group(1) if match else "unknown" + +def get_link_type(file_path: str) -> str | None: + data = file_utils.read_first_json_line(file_path) + if data: + return data.get("entryLink", {}).get("entryLinkType", "") + return None + + +def extract_scopes_from_entry_references(data: dict) -> set: + scopes = set() + entry_references = data.get("entryLink", {}).get("entryReferences", []) + logger.debug(f"Extracting scopes from entry references: {entry_references}") + for ref in entry_references[:2]: + name = ref.get("name", "") + match = re.search(r"(projects/[^/]+)", name) + if match: + logger.debug(f"Extracted project scope: {match.group(1)}") + scopes.add(match.group(1)) + logger.debug(f"Extracted scopes: {scopes}") + return scopes + + +def get_project_scopes_from_all_lines(file_path: str) -> set: + scopes = set() + try: + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + if not line.strip(): + continue + data = file_utils.parse_json_line(line) + if data: + scopes.update(extract_scopes_from_entry_references(data)) + except IOError: + pass + logger.debug(f"Project scopes from all lines in {file_path}: {scopes}") + return scopes + + +def get_project_scope_from_first_line(file_path: str) -> set: + scopes = set() + data = file_utils.read_first_json_line(file_path) + if data: + scopes.update(extract_scopes_from_entry_references(data)) + return scopes + +def build_related_synonym_referenced_entry_scopes(file_path: str, main_project_id: str) -> List[str]: + logger.debug(f"Building referenced scopes for file: {file_path} and main_project_id: {main_project_id}") + scopes = get_project_scopes_from_all_lines(file_path) + scopes.add(f"projects/{main_project_id}") + logger.debug(f"Built scopes: {scopes}") + return list(scopes) + +def build_defintion_referenced_entry_scopes(file_path: str, main_project_id: str) -> List[str]: + scopes = get_project_scope_from_first_line(file_path) + scopes.add(f"projects/{main_project_id}") + return list(scopes) + +def build_glossary_payload(filename: str, project_id: str, import_spec_base: dict) -> Tuple[str, dict, str]: + glossary_id = filename.replace("glossary_", "").replace(".json", "").replace("_", "-") + job_id_prefix = f"glossary-{glossary_id}" + job_location = "global" + payload = { + "type": "IMPORT", + "import_spec": { + **import_spec_base, + "scope": { + "glossaries": [f"projects/{project_id}/locations/global/glossaries/{glossary_id}"] + } + } + } + logger.debug(f"build_glossary_payload input: filename={filename}, project_id={project_id}, import_spec_base={import_spec_base} | output: {job_id_prefix}, {payload}, {job_location}") + return job_id_prefix, payload, job_location + + +def build_definition_entrylink_payload(file_path: str, project_id: str, import_spec_base: dict) -> Tuple[str, dict, str]: + filename = os.path.basename(file_path) + entry_group = file_utils.get_entry_group(file_path) + job_location = extract_job_location_from_entry_group(entry_group) + job_id_prefix = business_glossary_utils.normalize_id(filename) + referenced_scopes = build_defintion_referenced_entry_scopes(file_path, project_id) + payload = { + "type": "IMPORT", + "import_spec": { + **import_spec_base, + "scope": { + "entry_groups": [entry_group], + "entry_link_types": [f"projects/dataplex-types/locations/global/entryLinkTypes/definition"], + "referenced_entry_scopes": referenced_scopes + } + } + } + logger.debug(f"input: file_path={file_path}, filename={filename}, import_spec_base={import_spec_base}, referenced_scopes={referenced_scopes} | output: {job_id_prefix}, {payload}, {job_location}") + return job_id_prefix, payload, job_location + + +def build_synonym_related_entrylink_payload(file_path: str, project_id: str, import_spec_base: dict) -> Tuple[str, dict, str]: + referenced_scopes = build_related_synonym_referenced_entry_scopes(file_path, project_id) + job_id_prefix = f"entrylinks-synonym-related-{project_id}" + entry_group = file_utils.get_entry_group(file_path) + job_location = extract_job_location_from_entry_group(entry_group) + payload = { + "type": "IMPORT", + "import_spec": { + **import_spec_base, + "scope": { + "entry_groups": [f"projects/{project_id}/locations/{job_location}/entryGroups/@dataplex"], + "entry_link_types": [ + "projects/dataplex-types/locations/global/entryLinkTypes/synonym", + "projects/dataplex-types/locations/global/entryLinkTypes/related" + ], + "referenced_entry_scopes": referenced_scopes + } + } + } + logger.debug(f"input: file_path={file_path}, project_id={project_id}, import_spec_base={import_spec_base}, referenced_scopes={referenced_scopes} | output: {job_id_prefix}, {payload}, {job_location}") + return job_id_prefix, payload, job_location + + +def build_entrylink_payload(file_path: str, project_id: str, import_spec_base: dict) -> Tuple[Optional[str], Optional[dict], Optional[str]]: + """ + Wrapper for choosing definition vs synonyms/related. + """ + link_type = get_link_type(file_path) + if not link_type: + logger.warning(f"Cannot determine link type for {file_path}. Skipping.") + return None, None, None + + if DP_LINK_TYPE_DEFINITION in link_type: + return build_definition_entrylink_payload(file_path, project_id, import_spec_base) + else: + return build_synonym_related_entrylink_payload(file_path, project_id, import_spec_base) + + +def build_payload(file_path: str, project_id: str, gcs_bucket: str): + import_spec_base = build_import_spec_base(gcs_bucket) + filename = os.path.basename(file_path) + if filename.startswith("glossary_"): + return build_glossary_payload(filename, project_id, import_spec_base) + elif filename.startswith("entrylinks_"): + return build_entrylink_payload(file_path, project_id, import_spec_base) + else: + logger.warning(f"Unknown file type: {filename}. Skipping.") + return None, None, None diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/requirements.txt b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/requirements.txt new file mode 100644 index 00000000..7231b07c --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/requirements.txt @@ -0,0 +1,20 @@ +google-api-python-client==2.172.0 +google-auth==2.40.3 +google-auth-httplib2==0.2.0 +google-auth-oauthlib==1.2.2 +google-cloud-storage==3.1.0 +gspread==6.2.1 +oauth2client==4.1.3 +pandas==2.2.3 +requests==2.32.4 +tqdm==4.67.1 +google==3.0.0 +google-api-core==2.25.1 +google-cloud==0.34.0 +google-cloud-core==2.4.3 +httplib2==0.22.0 +idna==3.10 +numpy==2.0.2 +oauthlib==3.2.2 +requests==2.32.4 +requests-oauthlib==2.0.0 \ No newline at end of file diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/sheet_utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/sheet_utils.py new file mode 100644 index 00000000..cc6e4a14 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/dataplex-glossary/utils/sheet_utils.py @@ -0,0 +1,168 @@ +""" +Sheet Utility Functions + +Common utility functions for converting data between Google Sheets and Dataplex EntryLinks. +Handles parsing spreadsheet data and transforming it to/from EntryLink format. +""" + +# Standard library imports +from typing import Any, Dict, List, Tuple + +# Local imports +from utils import logging_utils +from utils.constants import ( + ENTRYLINK_TYPE_PATTERN, + ENTRY_REFERENCE_TYPE_SOURCE, + ENTRY_REFERENCE_TYPE_TARGET, +) + +logger = logging_utils.get_logger() + + +def entry_links_to_rows(entry_links: List[Dict[str, Any]]) -> List[List[str]]: + """ + Convert EntryLinks to CSV rows format for export to spreadsheet. + + Args: + entry_links: List of EntryLink dictionaries from Dataplex API + + Returns: + List of rows, where each row is [link_type, source_entry, target_entry, source_path] + + Example: + >>> links = [{ + ... 'entryLinkType': 'projects/.../entryLinkTypes/definition', + ... 'entryReferences': [ + ... {'name': 'entry1', 'type': 'SOURCE', 'path': '/path'}, + ... {'name': 'entry2', 'type': 'TARGET'} + ... ] + ... }] + >>> rows = entry_links_to_rows(links) + >>> rows[0][0] # link_type + 'definition' + """ + rows = [] + for link in entry_links: + # Extract link type using regex (e.g., 'definition', 'synonym', 'related') + link_type_full = link.get('entryLinkType', '') + match = ENTRYLINK_TYPE_PATTERN.match(link_type_full) + if not match: + logger.warning(f"Invalid entryLinkType format: {link_type_full}") + continue + link_type = match.group('link_type') + + # Get entry references + first_ref = None + second_ref = None + entry_refs = link.get('entryReferences', []) + if not entry_refs: + continue + + # Try to find explicit source/target references first + source_ref = next((ref for ref in entry_refs if ref.get('type') == ENTRY_REFERENCE_TYPE_SOURCE), None) + target_ref = next((ref for ref in entry_refs if ref.get('type') == ENTRY_REFERENCE_TYPE_TARGET), None) + + if source_ref and target_ref: + # For directional links with explicit source/target + first_ref = source_ref + second_ref = target_ref + else: + # For non-directional links or when source/target not specified + # Just use the references in order they appear + first_ref = entry_refs[0] + second_ref = entry_refs[1] if len(entry_refs) > 1 else None + + if first_ref and second_ref: + _add_entry_link_to_rows(rows, link_type, first_ref, second_ref) + + return rows + + +def _add_entry_link_to_rows(rows: List[List[str]], link_type: str, + first_ref: Dict[str, Any], second_ref: Dict[str, Any]) -> None: + """ + Helper function to add a single entry link as a row. + + Args: + rows: The list of rows to append to + link_type: The type of link (e.g., 'definition', 'synonym') + first_ref: First entry reference dictionary + second_ref: Second entry reference dictionary + """ + row = [ + link_type, + first_ref.get('name', ''), + second_ref.get('name', ''), + first_ref.get('path', '') # Empty string if no path + ] + rows.append(row) + + +def extract_column_indices(data: List[List[str]]) -> Tuple[int, int, int, int]: + """ + Extract column indices from spreadsheet headers. + + Args: + data: Spreadsheet data with headers in first row + + Returns: + Tuple of (type_idx, source_idx, target_idx, path_idx) + + Raises: + ValueError: If required columns are not found + """ + headers = [h.lower().strip() for h in data[0]] + try: + type_idx = headers.index('entry_link_type') + source_idx = headers.index('source_entry') + target_idx = headers.index('target_entry') + path_idx = headers.index('source_path') if 'source_path' in headers else -1 + except ValueError as e: + logger.error(f"Required column not found in spreadsheet: {e}") + raise ValueError(f"Spreadsheet must have required columns: {e}") + return type_idx, source_idx, target_idx, path_idx + + +def rows_to_entry_link_dicts(data: List[List[str]], type_idx: int, source_idx: int, + target_idx: int, path_idx: int) -> List[Dict[str, str]]: + """ + Convert spreadsheet rows to entry link dictionaries. + + Args: + data: Spreadsheet data (first row should be headers, already processed) + type_idx: Column index for entry_link_type + source_idx: Column index for source_entry + target_idx: Column index for target_entry + path_idx: Column index for source_path (-1 if not present) + + Returns: + List of dictionaries with entry link data + + Example: + >>> data = [['type', 'source', 'target'], ['definition', 'entry1', 'entry2']] + >>> dicts = rows_to_entry_link_dicts(data, 0, 1, 2, -1) + >>> dicts[0]['entry_link_type'] + 'definition' + """ + entries = [] + for row_num, row in enumerate(data[1:], start=2): + if len(row) <= max(type_idx, source_idx, target_idx): + logger.warning(f"Row {row_num} has insufficient columns, skipping") + continue + + # Create row dict + row_dict = { + 'entry_link_type': row[type_idx].strip(), + 'source_entry': row[source_idx].strip(), + 'target_entry': row[target_idx].strip(), + 'source_path': row[path_idx].strip() if path_idx >= 0 and len(row) > path_idx else '' + } + + # Validate required fields + if not row_dict['source_entry'] or not row_dict['target_entry']: + logger.warning(f"Row {row_num} missing source or target entry, skipping") + continue + + entries.append(row_dict) + + return entries