Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,10 @@ definitions:
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
record_expander:
title: Record Expander
description: Optional component to expand records by extracting items from nested array fields.
"$ref": "#/definitions/RecordExpander"
$parameters:
type: object
additionalProperties: true
Expand All @@ -1943,6 +1947,86 @@ definitions:
$parameters:
type: object
additionalProperties: true
RecordExpander:
title: Record Expander
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
type: object
required:
- type
- expand_records_from_field
properties:
type:
type: string
enum: [RecordExpander]
expand_records_from_field:
title: Expand Records From Field
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
type: array
items:
type: string
interpolation_context:
- config
examples:
- ["lines", "data"]
- ["items"]
- ["nested", "array"]
- ["sections", "*", "items"]
remain_original_record:
title: Remain Original Record
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
type: boolean
default: false
on_no_records:
title: On No Records
description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
type: string
enum:
- skip
- emit_parent
default: skip
parent_fields_to_copy:
title: Parent Fields To Copy
description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.
type: array
items:
"$ref": "#/definitions/ParentFieldMapping"
$parameters:
type: object
additionalProperties: true
ParentFieldMapping:
title: Parent Field Mapping
description: Defines a mapping from a parent record field to a child record field.
type: object
required:
- type
- source_field_path
- target_field
properties:
type:
type: string
enum: [ParentFieldMapping]
source_field_path:
title: Source Field Path
description: Path to the field in the parent record to copy.
type: array
items:
type: string
interpolation_context:
- config
examples:
- ["id"]
- ["created"]
- ["metadata", "timestamp"]
target_field:
title: Target Field
description: Name of the field in the child record where the value will be copied.
type: string
examples:
- "parent_id"
- "subscription_updated"
$parameters:
type: object
additionalProperties: true
ExponentialBackoffStrategy:
title: Exponential Backoff
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Expand Down
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.expanders.record_expander import (
ParentFieldMapping,
RecordExpander,
)

__all__ = ["ParentFieldMapping", "RecordExpander"]
153 changes: 153 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/record_expander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, MutableMapping

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


@dataclass
class ParentFieldMapping:
"""Defines a mapping from a parent record field to a child record field."""

source_field_path: list[str | InterpolatedString]
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[str | InterpolatedString]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List and typing.Union for type annotations. For consistency, this should be List[Union[str, InterpolatedString]] with appropriate imports from the typing module.

Copilot uses AI. Check for mistakes.
target_field: str
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._source_path = [
InterpolatedString.create(path, parameters=parameters)
for path in self.source_field_path
]

def copy_field(
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
) -> None:
"""Copy a field from parent record to child record."""
source_path = [path.eval(self.config) for path in self._source_path]
try:
value = dpath.get(dict(parent_record), source_path)
child_record[self.target_field] = value
except KeyError:
pass # Missing source fields in parent record are expected and intentionally skipped


@dataclass
class RecordExpander:
"""Expands records by extracting items from a nested array field.

When configured, this component extracts items from a specified nested array path
within each record and emits each item as a separate record. Optionally, the original
parent record can be embedded in each expanded item for context preservation.

The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
When wildcards are used, items from all matched arrays are extracted and emitted.

Examples of instantiating this component:
```
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

```
record_expander:
type: RecordExpander
expand_records_from_field:
- "sections"
- "*"
- "items"
on_no_records: emit_parent
parent_fields_to_copy:
- type: ParentFieldMapping
source_field_path: ["id"]
target_field: "parent_id"
```

Attributes:
expand_records_from_field: Path to a nested array field within each record.
Items from this array will be extracted and emitted as separate records.
Supports wildcards (*).
remain_original_record: If True, each expanded record will include the original
parent record in an "original_record" field. Defaults to False.
on_no_records: Behavior when expansion produces no records. "skip" (default)
emits nothing. "emit_parent" emits the original parent record unchanged.
parent_fields_to_copy: List of field mappings to copy from parent to each
expanded child record.
config: The user-provided configuration as specified by the source's spec.
"""

expand_records_from_field: list[str | InterpolatedString]
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[str | InterpolatedString]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List and typing.Union for type annotations. For consistency, this should be List[Union[str, InterpolatedString]] with appropriate imports from the typing module.

Copilot uses AI. Check for mistakes.
config: Config
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: str = "skip"
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on_no_records parameter is typed as str but should be more restrictive. Consider using Literal["skip", "emit_parent"] from the typing module to ensure type safety and prevent invalid values from being passed at runtime. This would make the type annotation match the schema definition which specifies only these two enum values.

Copilot uses AI. Check for mistakes.
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ style (list[ParentFieldMapping]) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.List for type annotations. For consistency, this should be List[ParentFieldMapping] with appropriate import from the typing module.

Copilot uses AI. Check for mistakes.

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: list[InterpolatedString] | None = [
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses Python 3.10+ union style (list[InterpolatedString] | None) which is inconsistent with the rest of the codebase. The codebase consistently uses typing.Optional and typing.List. For consistency, this should be Optional[List[InterpolatedString]] with appropriate imports from the typing module.

Copilot uses AI. Check for mistakes.
InterpolatedString.create(path, parameters=parameters)
for path in self.expand_records_from_field
]

def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
"""Expand a record by extracting items from a nested array field."""
if not self._expand_path:
yield record
return

parent_record = record
expand_path = [path.eval(self.config) for path in self._expand_path]
expanded_any = False

if "*" in expand_path:
extracted: Any = dpath.values(parent_record, expand_path)
for record in extracted:
if isinstance(record, list):
for item in record:
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop variable record shadows the method parameter record (line 102). While this doesn't cause a bug because parent_record is used instead, it reduces code clarity and could lead to confusion. Consider renaming the loop variable to something more descriptive like extracted_value or matched_array.

Suggested change
for record in extracted:
if isinstance(record, list):
for item in record:
for extracted_value in extracted:
if isinstance(extracted_value, list):
for item in extracted_value:

Copilot uses AI. Check for mistakes.
if isinstance(item, dict):
expanded_record = dict(item)
self._apply_parent_context(parent_record, expanded_record)
yield expanded_record
expanded_any = True
else:
yield item
expanded_any = True
else:
try:
extracted = dpath.get(parent_record, expand_path)
except KeyError:
extracted = None

if isinstance(extracted, list):
for item in extracted:
if isinstance(item, dict):
expanded_record = dict(item)
self._apply_parent_context(parent_record, expanded_record)
yield expanded_record
expanded_any = True
else:
yield item
expanded_any = True

if not expanded_any and self.on_no_records == "emit_parent":
yield parent_record

def _apply_parent_context(
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
) -> None:
"""Apply parent context to a child record."""
if self.remain_original_record:
child_record["original_record"] = parent_record

for field_mapping in self.parent_fields_to_copy:
field_mapping.copy_field(parent_record, child_record)
35 changes: 32 additions & 3 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
Expand All @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor):
If the field path points to an empty object, an empty array is returned.
If the field path points to a non-existing path, an empty array is returned.

Optionally, records can be expanded by providing a RecordExpander component.
When record_expander is configured, each extracted record is passed through the
expander which extracts items from nested array fields and emits each item as a
separate record.

Examples of instantiating this transform:
```
extractor:
Expand All @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
field_path: []
```

```
extractor:
type: DpathExtractor
field_path:
- "data"
- "object"
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

Attributes:
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
config (Config): The user-provided configuration as specified by the source's spec
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
"""

field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
record_expander: Optional[RecordExpander] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
if not self.record_expander:
yield from extracted
else:
for record in extracted:
yield from self.record_expander.expand_record(record)
elif extracted:
yield extracted
if self.record_expander:
yield from self.record_expander.expand_record(extracted)
else:
yield extracted
else:
yield from []
Loading