Skip to content
Open
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
cf01a75
feat(cdk): Add cursor age validation to StateDelegatingStream
devin-ai-integration[bot] Feb 2, 2026
67bc5c8
chore: re-trigger CI
devin-ai-integration[bot] Feb 2, 2026
45772f4
Merge branch 'main' into devin/1770066385-state-delegating-stream-cur…
agarctfi Feb 3, 2026
1edeedd
Auto-fix lint and format issues
Feb 3, 2026
61d8d5d
Potential fix for pull request finding 'Unused import'
agarctfi Feb 3, 2026
21da112
Potential fix for pull request finding 'Unused import'
agarctfi Feb 3, 2026
0e33418
fix: Address Copilot review comments
devin-ai-integration[bot] Feb 3, 2026
324344f
fix: Correct ruff format for assert statement
devin-ai-integration[bot] Feb 3, 2026
da8a5a5
fix: Convert cursor_value to str for type safety
devin-ai-integration[bot] Feb 3, 2026
37e046e
fix: Format long line for ruff compliance
devin-ai-integration[bot] Feb 3, 2026
dceb70d
Potential fix for pull request finding 'Unused import'
agarctfi Feb 3, 2026
c14f963
refactor: Move incremental_sync check to _get_state_delegating_stream…
devin-ai-integration[bot] Feb 3, 2026
86d5ea6
fix: Return True (full refresh) when cursor is invalid/unparseable
devin-ai-integration[bot] Feb 3, 2026
567ca7a
fix: Parse cursor from both full_refresh_stream and incremental_stream
devin-ai-integration[bot] Feb 3, 2026
be72c5c
feat: Add support for per-partition state and IncrementingCountCursor…
devin-ai-integration[bot] Feb 4, 2026
2b54cc5
feat: Add get_cursor_datetime_from_state method to cursor classes
devin-ai-integration[bot] Feb 5, 2026
f199583
feat: Add get_cursor_datetime_from_state to concurrent cursor classes
devin-ai-integration[bot] Feb 9, 2026
fbda39f
fix: Fix MyPy type errors in ConcurrentCursor.get_cursor_datetime_fro…
devin-ai-integration[bot] Feb 9, 2026
a2d4b56
refactor: Wire factory to use cursor class get_cursor_datetime_from_s…
devin-ai-integration[bot] Feb 18, 2026
1defe9e
fix: Fix ruff format and mypy errors in model_to_component_factory
devin-ai-integration[bot] Feb 18, 2026
a017dff
fix: Skip retention check for concurrent state format
devin-ai-integration[bot] Feb 18, 2026
d3e76d4
fix: Skip retention check for IncrementingCountCursor instead of rais…
devin-ai-integration[bot] Feb 18, 2026
d31c26b
fix: Return False (skip) when no datetime-based cursors found for ret…
devin-ai-integration[bot] Feb 18, 2026
653022b
fix: Remove unused pytest import
devin-ai-integration[bot] Feb 18, 2026
43dc47e
fix: Raise ValueError for unparseable cursor datetime when api_retent…
devin-ai-integration[bot] Feb 18, 2026
1531b39
refactor: Use stream cursor for retention period check, remove legacy…
devin-ai-integration[bot] Feb 18, 2026
b4c24c6
fix: Try both full_refresh and incremental cursors for state parsing
devin-ai-integration[bot] Feb 18, 2026
67f9e60
fix: Remove per-partition state fallback, let cursor classes handle s…
devin-ai-integration[bot] Feb 18, 2026
8608b5f
fix: Re-add _get_state_delegating_stream_model and fix ruff format
devin-ai-integration[bot] Feb 18, 2026
8faa0ae
Revert "fix: Re-add _get_state_delegating_stream_model and fix ruff f…
devin-ai-integration[bot] Feb 18, 2026
ea7a757
fix: ruff format long lines in create_state_delegating_stream
devin-ai-integration[bot] Feb 18, 2026
714c667
fix: Restore _get_state_delegating_stream_model and fix MyPy errors
devin-ai-integration[bot] Feb 18, 2026
16a895e
fix: Handle FinalStateCursor gracefully and detect final-state for re…
devin-ai-integration[bot] Feb 19, 2026
bddc671
refactor: Move FinalStateCursor handling to cursor classes, replace h…
devin-ai-integration[bot] Feb 19, 2026
8828eea
refactor: Clean NO_CURSOR_STATE_KEY from ConcurrentCursor, add tests …
devin-ai-integration[bot] Feb 19, 2026
6b65b7a
style: Fix ruff format issues in factory and test files
devin-ai-integration[bot] Feb 19, 2026
17f857a
fix: Raise error for incompatible cursor types with api_retention_period
devin-ai-integration[bot] Feb 19, 2026
1163395
refactor: Simplify cursor age validation per brianjlai's review
devin-ai-integration[bot] Feb 19, 2026
acd7156
fix: Use Cursor type instead of Any for cursor parameter
devin-ai-integration[bot] Feb 19, 2026
8afe8e1
fix: Clear state when falling back to full refresh due to stale cursor
devin-ai-integration[bot] Feb 20, 2026
2a4f385
style: Fix ruff format issues in state clearing code
devin-ai-integration[bot] Feb 20, 2026
e4f71ff
fix: Implement tolik0's FinalStateCursor feedback with NO_CURSOR_STAT…
devin-ai-integration[bot] Feb 23, 2026
9340d3c
fix: Update FinalStateCursor test to match new behavior per tolik0's …
devin-ai-integration[bot] Feb 23, 2026
e021f58
style: Fix ruff format issues in test file
devin-ai-integration[bot] Feb 23, 2026
1dcc8ab
refactor: Remove early return for NO_CURSOR_STATE_KEY per tolik0's re…
devin-ai-integration[bot] Feb 23, 2026
6d95923
fix: Remove unused NO_CURSOR_STATE_KEY import
devin-ai-integration[bot] Feb 23, 2026
a3a2073
fix: Update FinalStateCursor test to match actual ConcurrentCursor be…
devin-ai-integration[bot] Feb 23, 2026
020d2f5
fix: Skip state emission for streams not in configured catalog
devin-ai-integration[bot] Feb 25, 2026
21bb2a9
refactor: Move catalog check to skip entire retention validation for …
devin-ai-integration[bot] Feb 25, 2026
2a2459d
style: Fix ruff format issue in create_state_delegating_stream
devin-ai-integration[bot] Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3752,6 +3752,22 @@ definitions:
title: Incremental Stream
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
"$ref": "#/definitions/DeclarativeStream"
api_retention_period:
title: API Retention Period
description: |
The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.
This is useful for APIs like Stripe Events API which only retain data for 30 days.
* **PT1H**: 1 hour
* **P1D**: 1 day
* **P1W**: 1 week
* **P1M**: 1 month
* **P1Y**: 1 year
* **P30D**: 30 days
type: string
examples:
- "P30D"
- "P90D"
- "P1Y"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import copy
import datetime
import logging
import threading
import time
Expand Down Expand Up @@ -658,3 +659,21 @@ def get_global_state(
if stream_state and "state" in stream_state
else None
)

def get_cursor_datetime_from_state(
self, stream_state: Mapping[str, Any]
) -> datetime.datetime | None:
"""Extract and parse the cursor datetime from the global cursor in per-partition state.

For per-partition cursors, the global cursor is stored under the "state" key.
This method delegates to the underlying cursor factory to parse the datetime.

Returns None if the global cursor is not present or cannot be parsed.
"""
global_state = stream_state.get(self._GLOBAL_STATE_KEY)
if not global_state or not isinstance(global_state, dict):
return None

# Create a cursor to delegate the parsing
cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None)
return cursor.get_cursor_datetime_from_state(global_state)
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2885,6 +2883,12 @@ class StateDelegatingStream(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
title="Incremental Stream",
)
api_retention_period: Optional[str] = Field(
None,
description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n",
examples=["P30D", "P90D", "P1Y"],
title="API Retention Period",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
130 changes: 105 additions & 25 deletions airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import re
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -27,6 +28,11 @@
get_type_hints,
)

if TYPE_CHECKING:
from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (
DatetimeBasedCursor,
)

from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
from isodate import parse_duration
from pydantic.v1 import BaseModel
Expand Down Expand Up @@ -612,6 +618,7 @@
NoopMessageRepository,
)
from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
from airbyte_cdk.sources.streams.call_rate import (
APIBudget,
FixedWindowCallRatePolicy,
Expand Down Expand Up @@ -3548,7 +3555,6 @@ def create_state_delegating_stream(
self,
model: StateDelegatingStreamModel,
config: Config,
has_parent_state: Optional[bool] = None,
**kwargs: Any,
) -> DefaultStream:
if (
Expand All @@ -3559,18 +3565,99 @@ def create_state_delegating_stream(
f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}."
)

stream_model = self._get_state_delegating_stream_model(
False if has_parent_state is None else has_parent_state, model
)
if model.api_retention_period:
for stream_model in (model.full_refresh_stream, model.incremental_stream):
if isinstance(stream_model.incremental_sync, IncrementingCountCursorModel):
raise ValueError(
f"Stream '{model.name}' uses IncrementingCountCursor which is not supported "
f"with api_retention_period. IncrementingCountCursor does not use datetime-based "
f"cursors, so cursor age validation cannot be performed."
)

stream_state = self._connector_state_manager.get_stream_state(model.name, None)

if not stream_state:
return self._create_component_from_model( # type: ignore[no-any-return]
model.full_refresh_stream, config=config, **kwargs
)

incremental_stream: DefaultStream = self._create_component_from_model(
model.incremental_stream, config=config, **kwargs
) # type: ignore[assignment]

if model.api_retention_period:
full_refresh_stream: DefaultStream = self._create_component_from_model(
model.full_refresh_stream, config=config, **kwargs
) # type: ignore[assignment]
if self._is_cursor_older_than_retention_period(
Copy link
Contributor

@tolik0 Anatolii Yatsuk (tolik0) Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow should be like this:

# get the retention duration from now() - api_retention
# get full refresh cursor datetime
# if has value then compare against that value w/ retention duration
# if doesn't have value then get the incremental cursor date time
# compare incremental value against that value w/ retention duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed flow! This is on an older version of the code. The current implementation (per brianjlai's feedback) simplified to only check the incremental cursor, with NO_CURSOR_STATE_KEY handled as an early return.

Your suggested flow would require checking both cursors in sequence:

# get the retention duration from now() - api_retention
# get full refresh cursor datetime
# if has value then compare against that value w/ retention duration
# if doesn't have value then get the incremental cursor date time
# compare incremental value against that value w/ retention duration

This is a different approach than brianjlai's simplification. I've replied to your other comment asking for alignment between you and brianjlai on the preferred approach.


Devin session

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final cursor is fixed; now implement this flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming the FinalStateCursor changes are good!

I believe I've already implemented the flow you described in commit e4f71ff:

  1. FinalStateCursor.get_cursor_datetime_from_state returns now() for NO_CURSOR_STATE_KEY: True state, otherwise None
  2. _is_cursor_older_than_retention_period checks full refresh cursor first, then incremental cursor if the first returns None

However, I kept an early return for NO_CURSOR_STATE_KEY in _is_cursor_older_than_retention_period because the full_refresh_stream.cursor is actually a DatetimeBasedCursor (or ConcurrentCursor), not a FinalStateCursor. This is because the manifest defines a DatetimeBasedCursor in the incremental_sync field of the full_refresh_stream.

Could you clarify if you want me to:

  1. Keep the current implementation (early return for NO_CURSOR_STATE_KEY + sequential cursor checking)
  2. Remove the early return and find a way to use FinalStateCursor directly for the retention check (would require creating a FinalStateCursor instance explicitly rather than using full_refresh_stream.cursor)
  3. Something else I may have missed?

All 13 tests pass locally with the current implementation.


Devin session

stream_state,
full_refresh_stream.cursor,
incremental_stream.cursor,
model.api_retention_period,
model.name,
):
self._connector_state_manager.update_state_for_stream(model.name, None, {})
state_message = self._connector_state_manager.create_state_message(model.name, None)
self._message_repository.emit_message(state_message)
return full_refresh_stream

return incremental_stream

@staticmethod
def _is_cursor_older_than_retention_period(
stream_state: Mapping[str, Any],
full_refresh_cursor: Cursor,
incremental_cursor: Cursor,
api_retention_period: str,
stream_name: str,
) -> bool:
"""Check if the cursor value in the state is older than the API's retention period.

return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel
Checks cursors in sequence: full refresh cursor first, then incremental cursor.
If state has NO_CURSOR_STATE_KEY, it means the previous sync was a completed full
refresh, so the cursor is "current" and we should use incremental.

Returns True if the cursor is older than the retention period (should use full refresh).
Returns False if the cursor is within the retention period (safe to use incremental).
"""
# NO_CURSOR_STATE_KEY indicates a completed full refresh - cursor is "current"
if stream_state.get(NO_CURSOR_STATE_KEY):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check, since it is already covered by the final cursor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! Yes, this check is still necessary because in practice, full_refresh_stream.cursor is a DatetimeBasedCursor (created from the manifest's full_refresh_stream definition), not a FinalStateCursor.

When we call full_refresh_cursor.get_cursor_datetime_from_state(stream_state) with state {NO_CURSOR_STATE_KEY: True}:

  • DatetimeBasedCursor tries to extract the cursor field from the state, but since the state only has NO_CURSOR_STATE_KEY, it returns None
  • Then incremental_cursor.get_cursor_datetime_from_state() (also DatetimeBasedCursor) returns None for the same reason
  • If both cursors return None, we fall back to full refresh (return True), which is incorrect for NO_CURSOR_STATE_KEY state

The FinalStateCursor.get_cursor_datetime_from_state implementation exists for completeness in the concurrent cursor hierarchy, but it's not used in this specific flow because the declarative stream factory creates DatetimeBasedCursor instances, not FinalStateCursor instances.

If you'd like to remove this early return, we'd need to ensure one of the cursors can correctly parse NO_CURSOR_STATE_KEY state. One option would be to add NO_CURSOR_STATE_KEY handling to DatetimeBasedCursor.get_cursor_datetime_from_state, but that feels like it's mixing concerns.

Would you prefer to:

  1. Keep the early return as-is (current approach)
  2. Move the NO_CURSOR_STATE_KEY handling into DatetimeBasedCursor
  3. Some other approach?

Devin session

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the state is NO_CURSOR_STATE_KEY: True - it is produced by the finalcursor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - the {NO_CURSOR_STATE_KEY: True} state is produced by FinalStateCursor.

The issue is that in the current flow, full_refresh_stream.cursor is created from the manifest's full_refresh_stream definition, which uses DatetimeBasedCursor (not FinalStateCursor). So when we call full_refresh_cursor.get_cursor_datetime_from_state(), we're calling DatetimeBasedCursor's method, not FinalStateCursor's method.

Would you like me to:

  1. Create a FinalStateCursor instance specifically for checking NO_CURSOR_STATE_KEY state, then call its get_cursor_datetime_from_state method (which returns now() and would correctly indicate the cursor is "current")?
  2. Or change the flow to use FinalStateCursor as the first cursor to check instead of full_refresh_stream.cursor?

Either approach would let us remove the early return and rely on FinalStateCursor.get_cursor_datetime_from_state to handle NO_CURSOR_STATE_KEY state.


Devin session

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the finalcursor created to be used during the sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FinalStateCursor is created in two places:

  1. In create_simple_retriever (line 3301-3302): When no cursor is provided to the retriever (i.e., the stream doesn't have an incremental_sync defined):

    if cursor is None:
        cursor = FinalStateCursor(name, None, self._message_repository)
  2. In _build_concurrent_cursor (line 2289): When there's no incremental_sync defined for concurrent streams.

In the context of StateDelegatingStream:

When we call self._create_component_from_model(model.full_refresh_stream, ...), it creates a DefaultStream. The DefaultStream.cursor property returns the cursor from its retriever.

  • If full_refresh_stream doesn't have an incremental_sync defined → the retriever gets a FinalStateCursor → state produced is {NO_CURSOR_STATE_KEY: True}
  • If full_refresh_stream does have an incremental_sync defined → the retriever gets a DatetimeBasedCursor → state produced is {"cursor_field": "value"}

The early return for NO_CURSOR_STATE_KEY handles the first case where the full_refresh_stream used FinalStateCursor and produced {NO_CURSOR_STATE_KEY: True} state. In this case, full_refresh_stream.cursor (which is now a DatetimeBasedCursor if the incremental_stream has one) can't parse that state, so we need the explicit check.

Would you like me to change the approach to explicitly create a FinalStateCursor instance and use it to check NO_CURSOR_STATE_KEY state instead of the early return?


Devin session

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, finalcursor will be created if the stream does not define incremental sync at all. Why do we need to create it explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - if the full_refresh_stream doesn't define incremental_sync, then full_refresh_stream.cursor is already a FinalStateCursor, and it will handle NO_CURSOR_STATE_KEY correctly by returning now().

The early return is a defensive check for an edge case where:

  1. The state {NO_CURSOR_STATE_KEY: True} was produced by a previous sync (when full_refresh_stream used FinalStateCursor)
  2. But the manifest was later updated to add incremental_sync to full_refresh_stream
  3. Now full_refresh_stream.cursor is a DatetimeBasedCursor which can't parse NO_CURSOR_STATE_KEY state

If you think this edge case is unlikely and we should remove the early return to simplify the code, I can do that. The code would rely on FinalStateCursor.get_cursor_datetime_from_state to handle NO_CURSOR_STATE_KEY state, which works correctly when full_refresh_stream doesn't have incremental_sync.

Should I remove the early return?


Devin session

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Removed the early return for NO_CURSOR_STATE_KEY in commit 1dcc8ab. The FinalStateCursor.get_cursor_datetime_from_state method now handles this case by returning now(), which is always within any retention period.


Devin session

return False

retention_duration = parse_duration(api_retention_period)
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration

# Check full refresh cursor first
cursor_datetime = full_refresh_cursor.get_cursor_datetime_from_state(stream_state)

# If full refresh cursor returns None, check incremental cursor
if cursor_datetime is None:
cursor_datetime = incremental_cursor.get_cursor_datetime_from_state(stream_state)

if cursor_datetime is None:
# Neither cursor could parse the state - fall back to full refresh to be safe
return True

if cursor_datetime < retention_cutoff:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the case where we have a state delegating stream where the full refresh implementation has no cursor and therefore has a FinalStateCursor.get_cursor_datetime_from_state() that returns now(). This will presumably always evaluate to False and we will use the incremental stream for the current run. Am I understanding this right?

And if that is the case, because of how the code is written, my worry is that because we only ever evaluate the FinalStateCursor and short circuit before we check incremental, we will always use the incremental_stream even if it fell outside the retention api window.

Anatolii Yatsuk (@tolik0) I might not be understand this flow right because i think it is written in a fairly overcomplicated way, but just want to check my understanding against this condition

Copy link
Contributor

@tolik0 Anatolii Yatsuk (tolik0) Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that FinalCursor emits the state only at the end of the sync. So, if we have a state from a full refresh, we switch to incremental. If the incremental state is outdated, we switch back to the FinalCursor to retrieve all records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, your understanding is correct! The flow is:

  1. State from completed full refresh (NO_CURSOR_STATE_KEY: True) → Use incremental stream (the full refresh already completed successfully)
  2. Incremental state within retention period → Use incremental stream (safe to continue from where we left off)
  3. Incremental state older than retention period → Fall back to full refresh (FinalStateCursor) to retrieve all records and avoid data loss

The current implementation handles this via:

  • Early return False when stream_state.get(NO_CURSOR_STATE_KEY) is true (case 1)
  • Checking incremental_stream.cursor.get_cursor_datetime_from_state() against the retention cutoff (cases 2 & 3)

Note: This comment is on an older version of the code. The latest version (per brianjlai's feedback) simplified the logic to only check the incremental cursor directly, since FinalStateCursor doesn't have a meaningful cursor datetime anyway.


Devin session

logging.warning(
f"Stream '{stream_name}' has a cursor value older than "
f"the API's retention period of {api_retention_period} "
f"(cutoff: {retention_cutoff.isoformat()}). "
f"Falling back to full refresh to avoid data loss."
)
return True

return False

def _get_state_delegating_stream_model(
self, has_parent_state: bool, model: StateDelegatingStreamModel
self,
model: StateDelegatingStreamModel,
parent_state: Optional[Mapping[str, Any]] = None,
) -> DeclarativeStreamModel:
"""Return the appropriate underlying stream model based on state."""
return (
model.incremental_stream
if self._connector_state_manager.get_stream_state(model.name, None) or has_parent_state
if self._connector_state_manager.get_stream_state(model.name, None) or parent_state
else model.full_refresh_stream
)

Expand Down Expand Up @@ -3901,17 +3988,13 @@ def create_substream_partition_router(
def create_parent_stream_config_with_substream_wrapper(
self, model: ParentStreamConfigModel, config: Config, *, stream_name: str, **kwargs: Any
) -> Any:
# getting the parent state
child_state = self._connector_state_manager.get_stream_state(stream_name, None)

# This flag will be used exclusively for StateDelegatingStream when a parent stream is created
has_parent_state = bool(
self._connector_state_manager.get_stream_state(stream_name, None)
if model.incremental_dependency
else False
parent_state: Optional[Mapping[str, Any]] = (
child_state if model.incremental_dependency and child_state else None
)
connector_state_manager = self._instantiate_parent_stream_state_manager(
child_state, config, model, has_parent_state
child_state, config, model, parent_state
)

substream_factory = ModelToComponentFactory(
Expand Down Expand Up @@ -3943,7 +4026,7 @@ def _instantiate_parent_stream_state_manager(
child_state: MutableMapping[str, Any],
config: Config,
model: ParentStreamConfigModel,
has_parent_state: bool,
parent_state: Optional[Mapping[str, Any]] = None,
) -> ConnectorStateManager:
"""
With DefaultStream, the state needs to be provided during __init__ of the cursor as opposed to the
Expand All @@ -3955,36 +4038,33 @@ def _instantiate_parent_stream_state_manager(
"""
if model.incremental_dependency and child_state:
parent_stream_name = model.stream.name or ""
parent_state = ConcurrentPerPartitionCursor.get_parent_state(
extracted_parent_state = ConcurrentPerPartitionCursor.get_parent_state(
child_state, parent_stream_name
)

if not parent_state:
# there are two migration cases: state value from child stream or from global state
parent_state = ConcurrentPerPartitionCursor.get_global_state(
if not extracted_parent_state:
extracted_parent_state = ConcurrentPerPartitionCursor.get_global_state(
child_state, parent_stream_name
)

if not parent_state and not isinstance(parent_state, dict):
if not extracted_parent_state and not isinstance(extracted_parent_state, dict):
cursor_values = child_state.values()
if cursor_values and len(cursor_values) == 1:
# We assume the child state is a pair `{<cursor_field>: <cursor_value>}` and we will use the
# cursor value as a parent state.
incremental_sync_model: Union[
DatetimeBasedCursorModel,
IncrementingCountCursorModel,
] = (
model.stream.incremental_sync # type: ignore # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
if isinstance(model.stream, DeclarativeStreamModel)
else self._get_state_delegating_stream_model(
has_parent_state, model.stream
model.stream, parent_state=parent_state
).incremental_sync
)
cursor_field = InterpolatedString.create(
incremental_sync_model.cursor_field,
parameters=incremental_sync_model.parameters or {},
).eval(config)
parent_state = AirbyteStateMessage(
extracted_parent_state = AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(
Expand All @@ -3995,7 +4075,7 @@ def _instantiate_parent_stream_state_manager(
),
),
)
return ConnectorStateManager([parent_state] if parent_state else [])
return ConnectorStateManager([extracted_parent_state] if extracted_parent_state else [])

return ConnectorStateManager([])

Expand Down
Loading
Loading