Skip to content

Commit 137ffb7

Browse files
authored
fix(ingest): only add to samples where platform match (#14722)
1 parent 5f23652 commit 137ffb7

File tree

11 files changed

+113
-11
lines changed

11 files changed

+113
-11
lines changed

metadata-ingestion/scripts/avro_codegen.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,10 @@ def create_from_ids(cls, data_flow_urn: str, job_id: str) -> "DataJobUrn":
469469
def get_data_flow_urn(self) -> "DataFlowUrn":
470470
return DataFlowUrn.from_string(self.flow)
471471
472+
@property
473+
def orchestrator(self) -> str:
474+
return self.get_data_flow_urn().orchestrator
475+
472476
@deprecated(reason="Use .job_id instead")
473477
def get_job_id(self) -> str:
474478
return self.job_id

metadata-ingestion/src/datahub/ingestion/api/report.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from datahub.utilities.file_backed_collections import FileBackedDict
3131
from datahub.utilities.lossy_collections import LossyList
32+
from datahub.utilities.urns.urn import guess_platform_name
3233

3334
logger = logging.getLogger(__name__)
3435
LogLevel = Literal["ERROR", "WARNING", "INFO", "DEBUG"]
@@ -41,6 +42,15 @@ def as_obj(self) -> dict: ...
4142

4243
@dataclass
4344
class Report(SupportsAsObj):
45+
def __post_init__(self) -> None:
46+
self.platform: Optional[str] = None
47+
48+
def set_platform(self, platform: str) -> None:
49+
self.platform = platform
50+
51+
def get_platform(self) -> Optional[str]:
52+
return self.platform
53+
4454
@staticmethod
4555
def to_str(some_val: Any) -> str:
4656
if isinstance(some_val, Enum):
@@ -213,6 +223,7 @@ class ExamplesReport(Report, Closeable):
213223
_lineage_aspects_seen: Set[str] = field(default_factory=set)
214224

215225
def __post_init__(self) -> None:
226+
super().__post_init__()
216227
self._file_based_dict = FileBackedDict(
217228
tablename="urn_aspects",
218229
extra_columns={
@@ -347,6 +358,9 @@ def _update_file_based_dict(
347358
aspectName: str,
348359
mcp: Union[MetadataChangeProposalClass, MetadataChangeProposalWrapper],
349360
) -> None:
361+
platform_name = guess_platform_name(urn)
362+
if platform_name != self.get_platform():
363+
return
350364
if is_lineage_aspect(entityType, aspectName):
351365
self._lineage_aspects_seen.add(aspectName)
352366
has_fine_grained_lineage = self._has_fine_grained_lineage(mcp)

metadata-ingestion/src/datahub/ingestion/api/source.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
531531
auto_status_aspect,
532532
auto_materialize_referenced_tags_terms,
533533
partial(
534-
auto_fix_duplicate_schema_field_paths, platform=self._infer_platform()
534+
auto_fix_duplicate_schema_field_paths, platform=self.infer_platform()
535535
),
536-
partial(auto_fix_empty_field_paths, platform=self._infer_platform()),
536+
partial(auto_fix_empty_field_paths, platform=self.infer_platform()),
537537
browse_path_processor,
538538
partial(auto_workunit_reporter, self.get_report()),
539539
auto_patch_last_modified,
@@ -583,7 +583,7 @@ def get_report(self) -> SourceReport:
583583
def close(self) -> None:
584584
self.get_report().close()
585585

586-
def _infer_platform(self) -> Optional[str]:
586+
def infer_platform(self) -> Optional[str]:
587587
config = self.get_config()
588588
platform = (
589589
getattr(config, "platform_name", None)
@@ -598,7 +598,7 @@ def _infer_platform(self) -> Optional[str]:
598598
def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
599599
config = self.get_config()
600600

601-
platform = self._infer_platform()
601+
platform = self.infer_platform()
602602
env = getattr(config, "env", None)
603603
browse_path_drop_dirs = [
604604
platform,

metadata-ingestion/src/datahub/ingestion/run/pipeline.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,19 @@ def _time_to_print(self) -> bool:
440440
return True
441441
return False
442442

443+
def _set_platform(self) -> None:
444+
platform = self.source.infer_platform()
445+
if platform:
446+
self.source.get_report().set_platform(platform)
447+
else:
448+
self.source.get_report().warning(
449+
message="Platform not found",
450+
title="Platform not found",
451+
context="Platform not found",
452+
)
453+
443454
def run(self) -> None:
455+
self._set_platform()
444456
self._warn_old_cli_version()
445457
with self.exit_stack, self.inner_exit_stack:
446458
if self.config.flags.generate_memory_profiles:

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
200200
auto_lowercase_dataset_urns,
201201
auto_materialize_referenced_tags_terms,
202202
partial(
203-
auto_fix_duplicate_schema_field_paths, platform=self._infer_platform()
203+
auto_fix_duplicate_schema_field_paths, platform=self.infer_platform()
204204
),
205-
partial(auto_fix_empty_field_paths, platform=self._infer_platform()),
205+
partial(auto_fix_empty_field_paths, platform=self.infer_platform()),
206206
partial(auto_workunit_reporter, self.get_report()),
207207
auto_patch_last_modified,
208208
EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size,
Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,47 @@
1-
from datahub.metadata.urns import Urn
1+
from typing import Optional
22

3-
__all__ = ["Urn", "guess_entity_type"]
3+
from datahub.metadata.urns import (
4+
DataPlatformUrn,
5+
Urn,
6+
)
7+
8+
__all__ = ["Urn", "guess_entity_type", "guess_platform_name"]
49

510

611
def guess_entity_type(urn: str) -> str:
712
assert urn.startswith("urn:li:"), "urns must start with urn:li:"
813
return urn.split(":")[2]
14+
15+
16+
def guess_platform_name(urn: str) -> Optional[str]:
17+
"""Extract platform from URN using a mapping dictionary."""
18+
urn_obj = Urn.from_string(urn)
19+
20+
try:
21+
platform = None
22+
try:
23+
platform = urn_obj.platform # type: ignore[attr-defined]
24+
platform_name = DataPlatformUrn.from_string(
25+
platform
26+
).get_entity_id_as_string()
27+
return platform_name
28+
except AttributeError:
29+
pass
30+
try:
31+
return urn_obj.orchestrator # type: ignore[attr-defined]
32+
except AttributeError:
33+
pass
34+
try:
35+
return urn_obj.dashboard_tool # type: ignore[attr-defined]
36+
except AttributeError:
37+
pass
38+
try:
39+
return urn_obj.ml_model_tool # type: ignore[attr-defined]
40+
except AttributeError:
41+
pass
42+
43+
if platform is None:
44+
return None
45+
except AttributeError:
46+
pass
47+
return None

metadata-ingestion/tests/unit/api/test_pipeline.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from datahub.configuration.common import DynamicTypedConfig
1010
from datahub.ingestion.api.committable import CommitPolicy, Committable
1111
from datahub.ingestion.api.common import RecordEnvelope
12+
from datahub.ingestion.api.decorators import platform_name
1213
from datahub.ingestion.api.source import Source, SourceReport
1314
from datahub.ingestion.api.transform import Transformer
1415
from datahub.ingestion.api.workunit import MetadataWorkUnit
@@ -514,6 +515,7 @@ def transform(
514515
yield record_envelope
515516

516517

518+
@platform_name("fake")
517519
class FakeSource(Source):
518520
def __init__(self, ctx: PipelineContext):
519521
super().__init__(ctx)
@@ -537,6 +539,7 @@ def close(self):
537539
pass
538540

539541

542+
@platform_name("fake")
540543
class FakeSourceWithWarnings(FakeSource):
541544
def __init__(self, ctx: PipelineContext):
542545
super().__init__(ctx)
@@ -546,6 +549,7 @@ def get_report(self) -> SourceReport:
546549
return self.source_report
547550

548551

552+
@platform_name("fake")
549553
class FakeSourceWithFailures(FakeSource):
550554
def __init__(self, ctx: PipelineContext):
551555
super().__init__(ctx)

metadata-ingestion/tests/unit/datahub/utilities/__init__.py

Whitespace-only changes.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from datahub.utilities.urns.urn import guess_platform_name
2+
3+
4+
def test_guess_platform_name():
5+
assert guess_platform_name("urn:li:corpuser:jdoe") is None
6+
assert (
7+
guess_platform_name(
8+
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"
9+
)
10+
== "kafka"
11+
)
12+
assert guess_platform_name("urn:li:dataFlow:(airflow,dag_abc,PROD)") == "airflow"
13+
assert (
14+
guess_platform_name(
15+
"urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_123)"
16+
)
17+
== "airflow"
18+
)
19+
assert guess_platform_name("urn:li:chart:(looker,baz1)") == "looker"
20+
assert guess_platform_name("urn:li:dashboard:(looker,baz)") == "looker"
21+
assert (
22+
guess_platform_name(
23+
"urn:li:mlModel:(urn:li:dataPlatform:science,scienceModel,PROD)"
24+
)
25+
== "science"
26+
)

metadata-ingestion/tests/unit/test_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from datahub.emitter.mcp import MetadataChangeProposalWrapper
44
from datahub.ingestion.api.common import PipelineContext
5+
from datahub.ingestion.api.decorators import platform_name
56
from datahub.ingestion.api.source import Source, SourceReport
67
from datahub.ingestion.api.workunit import MetadataWorkUnit
78
from datahub.metadata.schema_classes import (
@@ -24,13 +25,14 @@
2425
def _get_urn(table_name: str = "fooIndex") -> str:
2526
return str(
2627
DatasetUrn.create_from_ids(
27-
platform_id="elasticsearch",
28+
platform_id="fake",
2829
table_name=table_name,
2930
env="PROD",
3031
)
3132
)
3233

3334

35+
@platform_name("fake")
3436
class FakeSource(Source):
3537
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
3638
return [
@@ -46,6 +48,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
4648
def __init__(self, ctx: PipelineContext):
4749
super().__init__(ctx)
4850
self.source_report = SourceReport()
51+
self.source_report.set_platform("fake")
4952

5053
@classmethod
5154
def create(cls, config_dict: dict, ctx: PipelineContext) -> "FakeSource":

0 commit comments

Comments
 (0)