Skip to content

Commit 5f23652

Browse files
authored
fix(ingestion/iceberg): Improve iceberg source resiliency to server errors (#14731)
1 parent 3fbef4a commit 5f23652

File tree

2 files changed

+314
-35
lines changed

2 files changed

+314
-35
lines changed

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
NoSuchNamespaceError,
1313
NoSuchPropertyException,
1414
NoSuchTableError,
15-
ServerError,
15+
RESTError,
1616
)
1717
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
1818
from pyiceberg.table import Table
@@ -154,6 +154,10 @@ def __init__(self, config: IcebergSourceConfig, ctx: PipelineContext) -> None:
154154
self.report: IcebergSourceReport = IcebergSourceReport()
155155
self.config: IcebergSourceConfig = config
156156
self.ctx: PipelineContext = ctx
157+
self.stamping_processor = AutoSystemMetadata(
158+
self.ctx
159+
) # single instance used only when processing namespaces
160+
self.namespaces: List[Tuple[Identifier, str]] = []
157161

158162
@classmethod
159163
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "IcebergSource":
@@ -246,6 +250,13 @@ def _get_datasets(
246250
context=str(namespace),
247251
exc=e,
248252
)
253+
except RESTError as e:
254+
self.report.warning(
255+
title="Iceberg REST Server Error",
256+
message="Iceberg REST Server returned error status when trying to list tables for a namespace, skipping it.",
257+
context=str(namespace),
258+
exc=e,
259+
)
249260
except Exception as e:
250261
self.report.report_failure(
251262
title="Error when processing a namespace",
@@ -322,10 +333,10 @@ def _try_processing_dataset(
322333
context=dataset_name,
323334
exc=e,
324335
)
325-
except ServerError as e:
336+
except RESTError as e:
326337
self.report.warning(
327338
title="Iceberg REST Server Error",
328-
message="Iceberg returned 500 HTTP status when trying to process a table, skipping it.",
339+
message="Iceberg REST Server returned error status when trying to process a table, skipping it.",
329340
context=dataset_name,
330341
exc=e,
331342
)
@@ -365,7 +376,7 @@ def _process_dataset(
365376
)
366377

367378
try:
368-
catalog = self.config.get_catalog()
379+
self.catalog = self.config.get_catalog()
369380
except Exception as e:
370381
self.report.report_failure(
371382
title="Failed to initialize catalog object",
@@ -375,33 +386,7 @@ def _process_dataset(
375386
return
376387

377388
try:
378-
stamping_processor = AutoSystemMetadata(self.ctx)
379-
namespace_ids = self._get_namespaces(catalog)
380-
namespaces: List[Tuple[Identifier, str]] = []
381-
for namespace in namespace_ids:
382-
namespace_repr = ".".join(namespace)
383-
LOGGER.debug(f"Processing namespace {namespace_repr}")
384-
namespace_urn = make_container_urn(
385-
NamespaceKey(
386-
namespace=namespace_repr,
387-
platform=self.platform,
388-
instance=self.config.platform_instance,
389-
env=self.config.env,
390-
)
391-
)
392-
namespace_properties: Properties = catalog.load_namespace_properties(
393-
namespace
394-
)
395-
namespaces.append((namespace, namespace_urn))
396-
for aspect in self._create_iceberg_namespace_aspects(
397-
namespace, namespace_properties
398-
):
399-
yield stamping_processor.stamp_wu(
400-
MetadataChangeProposalWrapper(
401-
entityUrn=namespace_urn, aspect=aspect
402-
).as_workunit()
403-
)
404-
LOGGER.debug("Namespaces ingestion completed")
389+
yield from self._process_namespaces()
405390
except Exception as e:
406391
self.report.report_failure(
407392
title="Failed to list namespaces",
@@ -415,13 +400,70 @@ def _process_dataset(
415400
args_list=[
416401
(dataset_path, namespace_urn)
417402
for dataset_path, namespace_urn in self._get_datasets(
418-
catalog, namespaces
403+
self.catalog, self.namespaces
419404
)
420405
],
421406
max_workers=self.config.processing_threads,
422407
):
423408
yield wu
424409

410+
def _try_processing_namespace(
411+
self, namespace: Identifier
412+
) -> Iterable[MetadataWorkUnit]:
413+
namespace_repr = ".".join(namespace)
414+
try:
415+
LOGGER.debug(f"Processing namespace {namespace_repr}")
416+
namespace_urn = make_container_urn(
417+
NamespaceKey(
418+
namespace=namespace_repr,
419+
platform=self.platform,
420+
instance=self.config.platform_instance,
421+
env=self.config.env,
422+
)
423+
)
424+
425+
namespace_properties: Properties = self.catalog.load_namespace_properties(
426+
namespace
427+
)
428+
for aspect in self._create_iceberg_namespace_aspects(
429+
namespace, namespace_properties
430+
):
431+
yield self.stamping_processor.stamp_wu(
432+
MetadataChangeProposalWrapper(
433+
entityUrn=namespace_urn, aspect=aspect
434+
).as_workunit()
435+
)
436+
self.namespaces.append((namespace, namespace_urn))
437+
except NoSuchNamespaceError as e:
438+
self.report.report_warning(
439+
title="Failed to retrieve namespace properties",
440+
message="Couldn't find the namespace, was it deleted during the ingestion?",
441+
context=namespace_repr,
442+
exc=e,
443+
)
444+
return
445+
except RESTError as e:
446+
self.report.warning(
447+
title="Iceberg REST Server Error",
448+
message="Iceberg REST Server returned error status when trying to retrieve namespace properties, skipping it.",
449+
context=str(namespace),
450+
exc=e,
451+
)
452+
except Exception as e:
453+
self.report.report_failure(
454+
title="Failed to process namespace",
455+
message="Unhandled exception happened during processing of the namespace",
456+
context=namespace_repr,
457+
exc=e,
458+
)
459+
460+
def _process_namespaces(self) -> Iterable[MetadataWorkUnit]:
461+
namespace_ids = self._get_namespaces(self.catalog)
462+
for namespace in namespace_ids:
463+
yield from self._try_processing_namespace(namespace)
464+
465+
LOGGER.debug("Namespaces ingestion completed")
466+
425467
def _create_iceberg_table_aspects(
426468
self, dataset_name: str, table: Table, namespace_urn: str
427469
) -> Iterable[_Aspect]:

0 commit comments

Comments
 (0)