Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion): use default generate_browse_path_v2 even if no pipeline_config #13117

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

sgomezvillamor
Copy link
Contributor

Default for generate_browse_path_v2 is True, however if there is no pipeline_config, we are not generating browse paths v2.

This fixes this inconsistency by applying the default even if pipeline_config is not given.

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Apr 8, 2025
Copy link

codecov bot commented Apr 8, 2025

❌ 13 Tests Failed:

Tests completed Failed Passed Skipped
2395 13 2382 46
View the top 3 failed test(s) by shortest run time
tests.unit.test_pulsar_source.TestPulsarSource::test_pulsar_source_get_workunits_all_tenant
Stack Traces | 0.007s run time
self = <tests.unit.test_pulsar_source.TestPulsarSource testMethod=test_pulsar_source_get_workunits_all_tenant>
mock_session = <function get at 0x7f2ed55a8940>

    @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
    def test_pulsar_source_get_workunits_all_tenant(self, mock_session):
        ctx = PipelineContext(run_id="test")
        pulsar_source = PulsarSource.create(
            {
                "web_service_url": "http://localhost:8080",
            },
            ctx,
        )
    
        # Mock fetching Pulsar metadata
        with patch(
            "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
        ) as mock:
            mock.side_effect = [
                ["t_1"],  # tenant list
                ["t_1/ns_1"],  # namespaces list
                ["persistent:.../t_1/ns_1/topic_1"],  # persistent topic list
                [],  # persistent partitioned topic list
                [],  # none-persistent topic list
                [],  # none-persistent partitioned topic list
                mock_schema_response,
            ]  # schema for persistent:.../t_1/ns_1/topic
    
            work_units = list(pulsar_source.get_workunits())
            first_mcp = work_units[0].metadata
            assert isinstance(first_mcp, MetadataChangeProposalWrapper)
    
            # Expected calls 7
            # http://localhost:.../admin/v2/tenants
            # http://localhost:.../v2/namespaces/t_1
            # http://localhost:.../persistent/t_1/ns_1
            # http://localhost:.../persistent/t_1/ns_1/partitioned
            # http://localhost:.../non-persistent/t_1/ns_1
            # http://localhost:.../non-persistent/t_1/ns_1/partitioned
            # http://localhost:.../ns_1/topic_1/schema
            assert mock.call_count == 7
            # expecting 5 mcp for one topic with default config
>           assert len(work_units) == 5
E           assert 6 == 5
E            +  where 6 = len([MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-status', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='status', aspect=StatusClass({'removed': False}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004925, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-schemaMetadata', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='schemaMetadata', aspect=SchemaMetadataClass({'schemaName': 'foo.bar.FooSchema', 'platform': 'urn:li:dataPlatform:pulsar', 'version': 1, 'created': AuditStampClass({'time': 0, 'actor': 'urn:li:corpuser:unknown', 'impersonator': None, 'message': None}), 'lastModified': AuditStampClass({'time': 0, 'actor': 'urn:li:corpuser:unknown', 'impersonator': None, 'message': None}), 'deleted': None, 'dataset': None, 'cluster': None, 'hash': '6b90be4b680fb5af9d80f05880c57472', 'platformSchema': KafkaSchemaClass({'documentSchema': '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}', 'documentSchemaType': 'AVRO', 'keySchema': None, 'keySchemaType': None}), 'fields': [SchemaFieldClass({'fieldPath': '[version=2.0].[type=FooSchema].[type=string].field1', 'jsonPath': None, 'nullable': False, 'description': 'Description of field1', 'label': None, 'created': None, 'lastModified': None, 'type': SchemaFieldDataTypeClass({'type': StringTypeClass({})}), 'nativeDataType': 'field1', 'recursive': False, 'globalTags': None, 'glossaryTerms': None, 'isPartOfKey': False, 'isPartitioningKey': None, 'jsonProps': '{"avro.java.string": "String"}'}), SchemaFieldClass({'fieldPath': '[version=2.0].[type=FooSchema].[type=long].field2', 'jsonPath': None, 'nullable': False, 'description': 'Some description\nField default value: 0', 'label': None, 'created': None, 'lastModified': None, 'type': SchemaFieldDataTypeClass({'type': NumberTypeClass({})}), 'nativeDataType': 'field2', 'recursive': False, 'globalTags': None, 'glossaryTerms': None, 'isPartOfKey': False, 'isPartitioningKey': None, 'jsonProps': None})], 'primaryKeys': None, 'foreignKeysSpecs': None, 'foreignKeys': None}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004926, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-datasetProperties', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='datasetProperties', aspect=DatasetPropertiesClass({'customProperties': {'__jsr310ConversionEnabled': 'false', '__alwaysAllowNull': 'true', 'schema_version': '1', 'schema_type': 'AVRO', 'partitioned': 'false'}, 'externalUrl': None, 'name': None, 'qualifiedName': None, 'description': 'Description of FooSchema', 'uri': None, 'created': None, 'lastModified': None, 'tags': []}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004926, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-browsePaths', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='browsePaths', aspect=BrowsePathsClass({'paths': ['.../t_1/ns_1/topic_1']}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004927, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-subTypes', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='subTypes', aspect=SubTypesClass({'typeNames': [<DatasetSubTypes.TOPIC: 'Topic'>]}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004927, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-browsePathsV2', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='browsePathsV2', aspect=BrowsePathsV2Class({'path': [BrowsePathEntryClass({'id': 't_1', 'urn': None}), BrowsePathEntryClass({'id': 'ns_1', 'urn': None}), BrowsePathEntryClass({'id': 'topic_1', 'urn': None})]}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004927, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True)])

tests/unit/test_pulsar_source.py:153: AssertionError
tests.unit.test_pulsar_source.TestPulsarSource::test_pulsar_source_get_workunits_custom_tenant
Stack Traces | 0.007s run time
self = <tests.unit.test_pulsar_source.TestPulsarSource testMethod=test_pulsar_source_get_workunits_custom_tenant>
mock_session = <function get at 0x7f2ed686fd80>

    @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
    def test_pulsar_source_get_workunits_custom_tenant(self, mock_session):
        ctx = PipelineContext(run_id="test")
        pulsar_source = PulsarSource.create(
            {
                "web_service_url": "http://localhost:8080",
                "tenants": ["t_1", "t_2"],
            },
            ctx,
        )
    
        # Mock fetching Pulsar metadata
        with patch(
            "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
        ) as mock:
            mock.side_effect = [
                ["t_1/ns_1"],  # namespaces list
                ["persistent:.../t_1/ns_1/topic_1"],  # topic list
                [],  # empty persistent partitioned topic list
                [],  # empty none-persistent topic list
                [],  # empty none-persistent partitioned topic list
                mock_schema_response,  # schema for persistent:.../t_1/ns_1/topic
                [],  # no namespaces for tenant t_2
            ]
    
            work_units = list(pulsar_source.get_workunits())
            first_mcp = work_units[0].metadata
            assert isinstance(first_mcp, MetadataChangeProposalWrapper)
    
            # Expected calls 7
            # http://localhost:.../v2/namespaces/t_1
            # http://localhost:.../persistent/t_1/ns_1
            # http://localhost:.../persistent/t_1/ns_1/partitioned
            # http://localhost:.../non-persistent/t_1/ns_1
            # http://localhost:.../non-persistent/t_1/ns_1/partitioned
            # http://localhost:.../ns_1/topic_1/schema
            # http://localhost:.../v2/namespaces/t_2
            assert mock.call_count == 7
            # expecting 5 mcp for one topic with default config
>           assert len(work_units) == 5
E           assert 6 == 5
E            +  where 6 = len([MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-status', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='status', aspect=StatusClass({'removed': False}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004952, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-schemaMetadata', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='schemaMetadata', aspect=SchemaMetadataClass({'schemaName': 'foo.bar.FooSchema', 'platform': 'urn:li:dataPlatform:pulsar', 'version': 1, 'created': AuditStampClass({'time': 0, 'actor': 'urn:li:corpuser:unknown', 'impersonator': None, 'message': None}), 'lastModified': AuditStampClass({'time': 0, 'actor': 'urn:li:corpuser:unknown', 'impersonator': None, 'message': None}), 'deleted': None, 'dataset': None, 'cluster': None, 'hash': '6b90be4b680fb5af9d80f05880c57472', 'platformSchema': KafkaSchemaClass({'documentSchema': '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}', 'documentSchemaType': 'AVRO', 'keySchema': None, 'keySchemaType': None}), 'fields': [SchemaFieldClass({'fieldPath': '[version=2.0].[type=FooSchema].[type=string].field1', 'jsonPath': None, 'nullable': False, 'description': 'Description of field1', 'label': None, 'created': None, 'lastModified': None, 'type': SchemaFieldDataTypeClass({'type': StringTypeClass({})}), 'nativeDataType': 'field1', 'recursive': False, 'globalTags': None, 'glossaryTerms': None, 'isPartOfKey': False, 'isPartitioningKey': None, 'jsonProps': '{"avro.java.string": "String"}'}), SchemaFieldClass({'fieldPath': '[version=2.0].[type=FooSchema].[type=long].field2', 'jsonPath': None, 'nullable': False, 'description': 'Some description\nField default value: 0', 'label': None, 'created': None, 'lastModified': None, 'type': SchemaFieldDataTypeClass({'type': NumberTypeClass({})}), 'nativeDataType': 'field2', 'recursive': False, 'globalTags': None, 'glossaryTerms': None, 'isPartOfKey': False, 'isPartitioningKey': None, 'jsonProps': None})], 'primaryKeys': None, 'foreignKeysSpecs': None, 'foreignKeys': None}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004953, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-datasetProperties', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='datasetProperties', aspect=DatasetPropertiesClass({'customProperties': {'__jsr310ConversionEnabled': 'false', '__alwaysAllowNull': 'true', 'schema_version': '1', 'schema_type': 'AVRO', 'partitioned': 'false'}, 'externalUrl': None, 'name': None, 'qualifiedName': None, 'description': 'Description of FooSchema', 'uri': None, 'created': None, 'lastModified': None, 'tags': []}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004953, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-browsePaths', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='browsePaths', aspect=BrowsePathsClass({'paths': ['.../t_1/ns_1/topic_1']}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004953, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-subTypes', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='subTypes', aspect=SubTypesClass({'typeNames': [<DatasetSubTypes.TOPIC: 'Topic'>]}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004953, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-browsePathsV2', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='browsePathsV2', aspect=BrowsePathsV2Class({'path': [BrowsePathEntryClass({'id': 't_1', 'urn': None}), BrowsePathEntryClass({'id': 'ns_1', 'urn': None}), BrowsePathEntryClass({'id': 'topic_1', 'urn': None})]}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004953, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True)])

tests/unit/test_pulsar_source.py:194: AssertionError
tests.unit.test_pulsar_source.TestPulsarSource::test_pulsar_source_get_workunits_patterns
Stack Traces | 0.009s run time
self = <tests.unit.test_pulsar_source.TestPulsarSource testMethod=test_pulsar_source_get_workunits_patterns>
mock_session = <function get at 0x7f2ed55a85d0>

    @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
    def test_pulsar_source_get_workunits_patterns(self, mock_session):
        ctx = PipelineContext(run_id="test")
        pulsar_source = PulsarSource.create(
            {
                "web_service_url": "http://localhost:8080",
                "tenants": ["t_1", "t_2", "bad_t_3"],
                "tenant_patterns": {"deny": ["bad_t_3"]},
                "namespace_patterns": {"allow": [r"t_1/ns_1"]},
                "topic_patterns": {"allow": [r"persistent:.../t_1/ns_1/topic_1"]},
            },
            ctx,
        )
    
        # Mock fetching Pulsar metadata
        with patch(
            "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
        ) as mock:
            mock.side_effect = [
                ["t_1/ns_1", "t_2/ns_1"],  # namespaces list
                [
                    "persistent:.../t_1/ns_1/topic_1",  # persistent topic list
                    "non-persistent:.../t_1/ns_1/bad_topic",
                ],  # topic will be filtered out
                [],  # persistent partitioned topic list
                [],  # none-persistent topic list
                [],  # none-persistent partitioned topic list
                mock_schema_response,  # schema for persistent:.../t_1/ns_1/topic
                [],  # no namespaces for tenant t_2
            ]
    
            work_units = list(pulsar_source.get_workunits())
            first_mcp = work_units[0].metadata
            assert isinstance(first_mcp, MetadataChangeProposalWrapper)
    
            # Expected calls 7
            # http://localhost:.../v2/namespaces/t_1
            # http://localhost:.../persistent/t_1/ns_1
            # http://localhost:.../persistent/t_1/ns_1/partitioned
            # http://localhost:.../non-persistent/t_1/ns_1
            # http://localhost:.../non-persistent/t_1/ns_1/partitioned
            # http://localhost:.../ns_1/topic_1/schema
            # http://localhost:.../v2/namespaces/t_2
            assert mock.call_count == 7
            # expecting 5 mcp for one topic with default config
>           assert len(work_units) == 5
E           assert 6 == 5
E            +  where 6 = len([MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-status', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='status', aspect=StatusClass({'removed': False}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004910, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-schemaMetadata', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='schemaMetadata', aspect=SchemaMetadataClass({'schemaName': 'foo.bar.FooSchema', 'platform': 'urn:li:dataPlatform:pulsar', 'version': 1, 'created': AuditStampClass({'time': 0, 'actor': 'urn:li:corpuser:unknown', 'impersonator': None, 'message': None}), 'lastModified': AuditStampClass({'time': 0, 'actor': 'urn:li:corpuser:unknown', 'impersonator': None, 'message': None}), 'deleted': None, 'dataset': None, 'cluster': None, 'hash': '6b90be4b680fb5af9d80f05880c57472', 'platformSchema': KafkaSchemaClass({'documentSchema': '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}', 'documentSchemaType': 'AVRO', 'keySchema': None, 'keySchemaType': None}), 'fields': [SchemaFieldClass({'fieldPath': '[version=2.0].[type=FooSchema].[type=string].field1', 'jsonPath': None, 'nullable': False, 'description': 'Description of field1', 'label': None, 'created': None, 'lastModified': None, 'type': SchemaFieldDataTypeClass({'type': StringTypeClass({})}), 'nativeDataType': 'field1', 'recursive': False, 'globalTags': None, 'glossaryTerms': None, 'isPartOfKey': False, 'isPartitioningKey': None, 'jsonProps': '{"avro.java.string": "String"}'}), SchemaFieldClass({'fieldPath': '[version=2.0].[type=FooSchema].[type=long].field2', 'jsonPath': None, 'nullable': False, 'description': 'Some description\nField default value: 0', 'label': None, 'created': None, 'lastModified': None, 'type': SchemaFieldDataTypeClass({'type': NumberTypeClass({})}), 'nativeDataType': 'field2', 'recursive': False, 'globalTags': None, 'glossaryTerms': None, 'isPartOfKey': False, 'isPartitioningKey': None, 'jsonProps': None})], 'primaryKeys': None, 'foreignKeysSpecs': None, 'foreignKeys': None}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004911, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-datasetProperties', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='datasetProperties', aspect=DatasetPropertiesClass({'customProperties': {'__jsr310ConversionEnabled': 'false', '__alwaysAllowNull': 'true', 'schema_version': '1', 'schema_type': 'AVRO', 'partitioned': 'false'}, 'externalUrl': None, 'name': None, 'qualifiedName': None, 'description': 'Description of FooSchema', 'uri': None, 'created': None, 'lastModified': None, 'tags': []}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004911, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-browsePaths', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='browsePaths', aspect=BrowsePathsClass({'paths': ['.../t_1/ns_1/topic_1']}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004911, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-subTypes', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='subTypes', aspect=SubTypesClass({'typeNames': [<DatasetSubTypes.TOPIC: 'Topic'>]}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004911, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True), MetadataWorkUnit(id='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)-browsePathsV2', metadata=MetadataChangeProposalWrapper(entityType='dataset', changeType='UPSERT', entityUrn='urn:li:dataset:(urn:li:dataPlatform:pulsar,persistent:.../t_1/ns_1/topic_1,PROD)', entityKeyAspect=None, auditHeader=None, aspectName='browsePathsV2', aspect=BrowsePathsV2Class({'path': [BrowsePathEntryClass({'id': 't_1', 'urn': None}), BrowsePathEntryClass({'id': 'ns_1', 'urn': None}), BrowsePathEntryClass({'id': 'topic_1', 'urn': None})]}), systemMetadata=SystemMetadataClass({'lastObserved': 1744093004911, 'runId': 'test', 'lastRunId': 'no-run-id-provided', 'pipelineName': None, 'registryName': None, 'registryVersion': None, 'properties': None, 'version': None})), treat_errors_as_warnings=False, is_primary_source=True)])

tests/unit/test_pulsar_source.py:241: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants