-
Notifications
You must be signed in to change notification settings - Fork 40
feat(cdk): Add cursor age validation to StateDelegatingStream #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
cf01a75
67bc5c8
45772f4
1edeedd
61d8d5d
21da112
0e33418
324344f
da8a5a5
37e046e
dceb70d
c14f963
86d5ea6
567ca7a
be72c5c
2b54cc5
f199583
fbda39f
a2d4b56
1defe9e
a017dff
d3e76d4
d31c26b
653022b
43dc47e
1531b39
b4c24c6
67f9e60
8608b5f
8faa0ae
ea7a757
714c667
16a895e
bddc671
8828eea
6b65b7a
17f857a
1163395
acd7156
8afe8e1
2a4f385
e4f71ff
9340d3c
e021f58
1dcc8ab
6d95923
a3a2073
020d2f5
21bb2a9
2a2459d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,7 @@ | |
| DynamicStreamCheckConfig, | ||
| ) | ||
| from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel | ||
| from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser | ||
| from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime | ||
| from airbyte_cdk.sources.declarative.decoders import ( | ||
| Decoder, | ||
|
|
@@ -3568,11 +3569,106 @@ def create_state_delegating_stream( | |
| def _get_state_delegating_stream_model( | ||
| self, has_parent_state: bool, model: StateDelegatingStreamModel | ||
| ) -> DeclarativeStreamModel: | ||
| return ( | ||
| model.incremental_stream | ||
| if self._connector_state_manager.get_stream_state(model.name, None) or has_parent_state | ||
| else model.full_refresh_stream | ||
| stream_state = self._connector_state_manager.get_stream_state(model.name, None) | ||
|
|
||
| if not stream_state and not has_parent_state: | ||
| return model.full_refresh_stream | ||
|
|
||
| if model.api_retention_period and stream_state: | ||
| incremental_sync = model.incremental_stream.incremental_sync | ||
| if incremental_sync and self._is_cursor_older_than_retention_period( | ||
| stream_state, incremental_sync, model.api_retention_period, model.name | ||
| ): | ||
| return model.full_refresh_stream | ||
|
|
||
| return model.incremental_stream | ||
|
|
||
| def _is_cursor_older_than_retention_period( | ||
| self, | ||
| stream_state: Mapping[str, Any], | ||
| incremental_sync: Any, | ||
| api_retention_period: str, | ||
| stream_name: str, | ||
| ) -> bool: | ||
| """Check if the cursor value in the state is older than the API's retention period. | ||
|
|
||
| If the cursor is too old, the incremental API may not have data going back that far, | ||
| so we should fall back to a full refresh to avoid data loss. | ||
|
|
||
| Returns True if the cursor is older than the retention period (should use full refresh). | ||
| Returns False if the cursor is within the retention period (safe to use incremental). | ||
| """ | ||
| cursor_field = getattr(incremental_sync, "cursor_field", None) | ||
| if not cursor_field: | ||
| return False | ||
|
|
||
| cursor_value = stream_state.get(cursor_field) | ||
| if not cursor_value: | ||
| return False | ||
|
|
||
| if not isinstance(cursor_value, (str, int)): | ||
| return False | ||
|
|
||
| cursor_value_str = str(cursor_value) | ||
|
|
||
| retention_duration = parse_duration(api_retention_period) | ||
| retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration | ||
|
|
||
| cursor_datetime = self._parse_cursor_datetime( | ||
tolik0 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| cursor_value_str, incremental_sync, stream_name | ||
| ) | ||
| if cursor_datetime is None: | ||
| return False | ||
|
|
||
| if cursor_datetime < retention_cutoff: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So for the case where we have a state delegating stream where the full refresh implementation has no cursor and therefore has a And if that is the case, because of how the code is written, my worry is that because we only ever evaluate the FinalStateCursor and short circuit before we check incremental, we will always use the Anatolii Yatsuk (@tolik0) I might not be understand this flow right because i think it is written in a fairly overcomplicated way, but just want to check my understanding against this condition
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that FinalCursor emits the state only at the end of the sync. So, if we have a state from a full refresh, we switch to incremental. If the incremental state is outdated, we switch back to the FinalCursor to retrieve all records.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, your understanding is correct! The flow is:
The current implementation handles this via:
Note: This comment is on an older version of the code. The latest version (per brianjlai's feedback) simplified the logic to only check the incremental cursor directly, since FinalStateCursor doesn't have a meaningful cursor datetime anyway. |
||
| self._emit_warning_for_stale_cursor( | ||
| stream_name, cursor_value_str, api_retention_period, retention_cutoff | ||
| ) | ||
| return True | ||
|
|
||
| return False | ||
|
|
||
| def _parse_cursor_datetime( | ||
| self, | ||
| cursor_value: str, | ||
| incremental_sync: Any, | ||
| stream_name: str, | ||
| ) -> Optional[datetime.datetime]: | ||
| """Parse the cursor value into a datetime object using the cursor's datetime formats.""" | ||
| parser = DatetimeParser() | ||
|
|
||
| datetime_format = getattr(incremental_sync, "datetime_format", None) | ||
tolik0 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| cursor_datetime_formats = getattr(incremental_sync, "cursor_datetime_formats", None) or [] | ||
|
|
||
| formats_to_try = cursor_datetime_formats + ([datetime_format] if datetime_format else []) | ||
|
|
||
| for fmt in formats_to_try: | ||
| try: | ||
| return parser.parse(cursor_value, fmt) | ||
| except (ValueError, TypeError): | ||
| continue | ||
|
|
||
| logging.warning( | ||
| f"Could not parse cursor value '{cursor_value}' for stream '{stream_name}' " | ||
| f"using formats {formats_to_try}. Skipping cursor age validation." | ||
| ) | ||
| return None | ||
|
|
||
| def _emit_warning_for_stale_cursor( | ||
| self, | ||
| stream_name: str, | ||
| cursor_value: str, | ||
| api_retention_period: str, | ||
| retention_cutoff: datetime.datetime, | ||
| ) -> None: | ||
| """Emit a warning message when the cursor is older than the API's retention period.""" | ||
| warning_message = ( | ||
| f"Stream '{stream_name}' has a cursor value '{cursor_value}' that is older than " | ||
| f"the API's retention period of {api_retention_period} (cutoff: {retention_cutoff.isoformat()}). " | ||
| f"Falling back to full refresh to avoid data loss. " | ||
| f"This may happen if a previous sync failed mid-way and the state was checkpointed." | ||
| ) | ||
| logging.warning(warning_message) | ||
|
|
||
| def _create_async_job_status_mapping( | ||
| self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.