feat: add intermediate state checkpointing during pagination#915
Draft
devin-ai-integration[bot] wants to merge 3 commits intomainfrom
Draft
feat: add intermediate state checkpointing during pagination#915devin-ai-integration[bot] wants to merge 3 commits intomainfrom
devin-ai-integration[bot] wants to merge 3 commits intomainfrom
Conversation
When records are sorted in ascending order by cursor field, the CDK will now emit state checkpoints every N pages (default: 5) during pagination within a partition. This prevents loss of all progress when a sync fails mid-pagination due to rate limits or errors. Changes: - Add emit_intermediate_state() to ConcurrentCursor - Extend PaginationTracker with page counting and checkpoint triggering - Call on_page_complete() in SimpleRetriever._read_pages() - Wire up checkpoint cursor in model_to_component_factory Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1771602439-intermediate-state-checkpoint#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1771602439-intermediate-state-checkpointPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
…remental sync cursors Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
PyTest Results (Full)3 884 tests 3 872 ✅ 10m 49s ⏱️ Results for commit e0ef3eb. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
feat: add intermediate state checkpointing during pagination
Summary
When a stream paginates through many pages within a single slice/partition, state is currently only emitted when the partition closes. If the sync fails mid-pagination (e.g., rate limits, 504 errors), all progress is lost.
This PR adds intermediate state checkpointing to the CDK: when
ConcurrentCursordetects that records are arriving in ascending cursor order, it will emit a state checkpoint every N pages. On the next sync, the stream resumes from the last checkpoint rather than restarting from the beginning of the slice.Motivation: airbytehq/oncall#11335 —
source-zendesk-supportticket_commentsstream loses ~25k records of progress on each failure because no state is emitted during pagination.Changes:
declarative_component_schema.yaml) — Addedpages_per_checkpoint_interval(optional integer) to bothDatetimeBasedCursorandIncrementingCountCursordefinitions. Defaults to disabled (no intermediate checkpointing unless explicitly configured).declarative_component_schema.py) — UpdatedDatetimeBasedCursorandIncrementingCountCursormodel classes with the newpages_per_checkpoint_intervalfield.ConcurrentCursor.emit_intermediate_state(stream_slice)— New method that adds a partial[start, cursor_value]slice to state and emits a state message, but only when_is_ascending_orderis True. Handles both streams with and withoutslice_boundary_fields.PaginationTracker— Extended withcheckpoint_cursorandpages_per_checkpoint_intervalparams. Newon_page_complete()method increments a page counter and triggers intermediate checkpoint when the interval is reached.SimpleRetriever._read_pages()— Callspagination_tracker.on_page_complete(stream_slice)after each successful page.model_to_component_factory._create_pagination_tracker_factory()— Now readspages_per_checkpoint_intervalfrom the incremental sync model (if present) and passes it through toPaginationTracker. The feature is only active when aConcurrentCursoris present AND the schema value is set.Safety: The feature is a no-op when records are not in ascending order (the cursor tracks this via
_is_ascending_order). Themerge_intervalscall ensures intermediate slices are correctly merged with the final partition close. When not configured in the schema, behavior is unchanged from before this PR.Review & Testing Checklist for Human
close_partitionfor intermediate slices — Whenemit_intermediate_stateadds a partial slice, thenclose_partitionalso adds a slice for the same range. The merge should combine them correctly, but this interaction is only unit-tested in isolation. Verify end-to-end that state doesn't get corrupted or duplicated after a full partition lifecycle with intermediate checkpoints._is_ascending_ordercheck —emit_intermediate_state()readsself._is_ascending_orderoutside the lock, butobserve()writes it without a lock. Likely benign (flag only transitions True→False), but worth verifying no race exists._page_countnever resets — Unlike_record_count, the_page_countinPaginationTrackeris never reset (even in_reset()). This means checkpoint intervals span pagination resets. Is this intended?pages_per_checkpoint_intervalonly works when records are in ascending cursor order. The schema description mentions this, but consider whether additional documentation is needed.declarative_component_schema.pywere manually edited rather than regenerated viabin/generate_component_manifest_files.py. Verify the manual changes match what the code generator would produce.Recommended test plan:
ConcurrentCursor.emit_intermediate_state()— covering ascending/non-ascending order, with/without boundary fieldsPaginationTracker.on_page_complete()— verifying page counting and checkpoint triggeringpages_per_checkpoint_interval: 5, verify state is emitted at pages 5, 10, 15, 20source-zendesk-supportor similar connector: configurepages_per_checkpoint_intervalin the manifest, verify state advances during pagination, and that a mid-pagination failure resumes from the last checkpointNotes
pages_per_checkpoint_intervalon their incremental sync cursor to enable intermediate checkpointing. When not set, behavior is unchanged.close_partitionstill emits the full slice state._create_pagination_tracker_factorycaptures the actual cursor (not a copy), so allPaginationTrackerinstances share the same cursor reference. This is intended — the lock inemit_intermediate_statehandles concurrent access.Devin session
Requested by: gl_anatolii.yatsuk@airbyte.io