-
Notifications
You must be signed in to change notification settings - Fork 39
feat: Add block_simultaneous_read to DefaultStream #870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e8697e0
1b4a9f0
ec8dd6f
450d7cf
3e7de2f
b7fa9a5
314ded1
80d8b2b
8c06ce6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,17 +66,61 @@ def __init__( | |
| self._streams_done: Set[str] = set() | ||
| self._exceptions_per_stream_name: dict[str, List[Exception]] = {} | ||
|
|
||
| # Track which streams (by name) are currently active | ||
| # A stream is "active" if it's generating partitions or has partitions being read | ||
| self._active_stream_names: Set[str] = set() | ||
|
|
||
| # Store blocking group names for streams that require blocking simultaneous reads | ||
| # Maps stream name -> group name (empty string means no blocking) | ||
| self._stream_block_simultaneous_read: Dict[str, str] = { | ||
| stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from | ||
| } | ||
|
|
||
| # Track which groups are currently active | ||
| # Maps group name -> set of stream names in that group | ||
| self._active_groups: Dict[str, Set[str]] = {} | ||
|
|
||
| for stream in stream_instances_to_read_from: | ||
| if stream.block_simultaneous_read: | ||
| self._logger.info( | ||
| f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " | ||
| f"Will defer starting this stream if another stream in the same group or its parents are active." | ||
| ) | ||
|
|
||
| def on_partition_generation_completed( | ||
| self, sentinel: PartitionGenerationCompletedSentinel | ||
| ) -> Iterable[AirbyteMessage]: | ||
| """ | ||
| This method is called when a partition generation is completed. | ||
| 1. Remove the stream from the list of streams currently generating partitions | ||
| 2. If the stream is done, mark it as such and return a stream status message | ||
| 3. If there are more streams to read from, start the next partition generator | ||
| 2. Deactivate parent streams (they were only needed for partition generation) | ||
| 3. If the stream is done, mark it as such and return a stream status message | ||
| 4. If there are more streams to read from, start the next partition generator | ||
| """ | ||
| stream_name = sentinel.stream.name | ||
| self._streams_currently_generating_partitions.remove(sentinel.stream.name) | ||
|
|
||
| # Deactivate all parent streams now that partition generation is complete | ||
| # Parents were only needed to generate slices, they can now be reused | ||
| parent_streams = self._collect_all_parent_stream_names(stream_name) | ||
| for parent_stream_name in parent_streams: | ||
| if parent_stream_name in self._active_stream_names: | ||
| self._logger.debug(f"Removing '{parent_stream_name}' from active streams") | ||
| self._active_stream_names.discard(parent_stream_name) | ||
|
|
||
| # Remove from active groups | ||
| parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") | ||
| if parent_group: | ||
| if parent_group in self._active_groups: | ||
| self._active_groups[parent_group].discard(parent_stream_name) | ||
| if not self._active_groups[parent_group]: | ||
| del self._active_groups[parent_group] | ||
| self._logger.info( | ||
| f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " | ||
| f"partition generation completed for child '{stream_name}'. " | ||
| f"Blocked streams in the queue will be retried on next start_next_partition_generator call." | ||
| ) | ||
|
|
||
| # It is possible for the stream to already be done if no partitions were generated | ||
| # If the partition generation process was completed and there are no partitions left to process, the stream is done | ||
| if ( | ||
|
|
@@ -85,7 +129,9 @@ def on_partition_generation_completed( | |
| ): | ||
| yield from self._on_stream_is_done(stream_name) | ||
| if self._stream_instances_to_start_partition_generation: | ||
| yield self.start_next_partition_generator() # type:ignore # None may be yielded | ||
| status_message = self.start_next_partition_generator() | ||
| if status_message: | ||
| yield status_message | ||
|
|
||
| def on_partition(self, partition: Partition) -> None: | ||
| """ | ||
|
|
@@ -113,6 +159,7 @@ def on_partition_complete_sentinel( | |
| 1. Close the partition | ||
| 2. If the stream is done, mark it as such and return a stream status message | ||
| 3. Emit messages that were added to the message repository | ||
| 4. If there are more streams to read from, start the next partition generator | ||
| """ | ||
| partition = sentinel.partition | ||
|
|
||
|
|
@@ -125,6 +172,11 @@ def on_partition_complete_sentinel( | |
| and len(partitions_running) == 0 | ||
| ): | ||
| yield from self._on_stream_is_done(partition.stream_name()) | ||
| # Try to start the next stream in the queue (may be a deferred stream) | ||
| if self._stream_instances_to_start_partition_generation: | ||
| status_message = self.start_next_partition_generator() | ||
| if status_message: | ||
| yield status_message | ||
| yield from self._message_repository.consume_queue() | ||
|
|
||
| def on_record(self, record: Record) -> Iterable[AirbyteMessage]: | ||
|
|
@@ -181,24 +233,112 @@ def _flag_exception(self, stream_name: str, exception: Exception) -> None: | |
|
|
||
| def start_next_partition_generator(self) -> Optional[AirbyteMessage]: | ||
| """ | ||
| Start the next partition generator. | ||
| 1. Pop the next stream to read from | ||
| 2. Submit the partition generator to the thread pool manager | ||
| 3. Add the stream to the list of streams currently generating partitions | ||
| 4. Return a stream status message | ||
| Submits the next partition generator to the thread pool. | ||
|
|
||
| A stream will be deferred (moved to end of queue) if: | ||
| 1. The stream itself has block_simultaneous_read=True AND is already active | ||
| 2. Any parent stream has block_simultaneous_read=True AND is currently active | ||
|
|
||
| This prevents simultaneous reads of streams that shouldn't be accessed concurrently. | ||
|
|
||
| :return: A status message if a partition generator was started, otherwise None | ||
| """ | ||
| if self._stream_instances_to_start_partition_generation: | ||
| if not self._stream_instances_to_start_partition_generation: | ||
| return None | ||
|
|
||
| # Remember initial queue size to avoid infinite loops if all streams are blocked | ||
| max_attempts = len(self._stream_instances_to_start_partition_generation) | ||
| attempts = 0 | ||
|
|
||
| while self._stream_instances_to_start_partition_generation and attempts < max_attempts: | ||
| attempts += 1 | ||
|
|
||
| # Pop the first stream from the queue | ||
| stream = self._stream_instances_to_start_partition_generation.pop(0) | ||
| stream_name = stream.name | ||
| stream_group = self._stream_block_simultaneous_read.get(stream_name, "") | ||
|
|
||
| # Check if this stream has a blocking group and is already active | ||
| if stream_group and stream_name in self._active_stream_names: | ||
| # Add back to the END of the queue for retry later | ||
| self._stream_instances_to_start_partition_generation.append(stream) | ||
| self._logger.info( | ||
| f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." | ||
| ) | ||
| continue # Try the next stream in the queue | ||
|
|
||
| # Check if this stream's group is already active (another stream in the same group is running) | ||
| if ( | ||
| stream_group | ||
| and stream_group in self._active_groups | ||
| and self._active_groups[stream_group] | ||
| ): | ||
| # Add back to the END of the queue for retry later | ||
| self._stream_instances_to_start_partition_generation.append(stream) | ||
| active_streams_in_group = self._active_groups[stream_group] | ||
| self._logger.info( | ||
| f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " | ||
| f"{active_streams_in_group} in the same group are active. Trying next stream." | ||
| ) | ||
| continue # Try the next stream in the queue | ||
|
|
||
| # Check if any parent streams have a blocking group and are currently active | ||
| parent_streams = self._collect_all_parent_stream_names(stream_name) | ||
| blocked_by_parents = [ | ||
| p | ||
| for p in parent_streams | ||
| if self._stream_block_simultaneous_read.get(p, "") | ||
| and p in self._active_stream_names | ||
| ] | ||
|
|
||
| if blocked_by_parents: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One scenario I want to make sure I understand and that we have covered is when a substream So let's say that we happen to have the
This is how I interpreted the flow below. Does that sound right to you Anatolii Yatsuk (@tolik0) ? |
||
| # Add back to the END of the queue for retry later | ||
| self._stream_instances_to_start_partition_generation.append(stream) | ||
| parent_groups = { | ||
| self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents | ||
| } | ||
| self._logger.info( | ||
| f"Deferring stream '{stream_name}' because parent stream(s) " | ||
| f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." | ||
| ) | ||
| continue # Try the next stream in the queue | ||
|
|
||
| # No blocking - start this stream | ||
| # Mark stream as active before starting | ||
| self._active_stream_names.add(stream_name) | ||
| self._streams_currently_generating_partitions.append(stream_name) | ||
|
|
||
| # Track this stream in its group if it has one | ||
| if stream_group: | ||
| if stream_group not in self._active_groups: | ||
| self._active_groups[stream_group] = set() | ||
| self._active_groups[stream_group].add(stream_name) | ||
| self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") | ||
|
|
||
| # Also mark all parent streams as active (they will be read from during partition generation) | ||
| parent_streams = self._collect_all_parent_stream_names(stream_name) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to call |
||
| for parent_stream_name in parent_streams: | ||
| parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") | ||
| if parent_group: | ||
| self._active_stream_names.add(parent_stream_name) | ||
| if parent_group not in self._active_groups: | ||
| self._active_groups[parent_group] = set() | ||
| self._active_groups[parent_group].add(parent_stream_name) | ||
| self._logger.info( | ||
| f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " | ||
| f"(will be read during partition generation for '{stream_name}')" | ||
| ) | ||
|
|
||
| self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) | ||
| self._streams_currently_generating_partitions.append(stream.name) | ||
| self._logger.info(f"Marking stream {stream.name} as STARTED") | ||
| self._logger.info(f"Syncing stream: {stream.name} ") | ||
| self._logger.info(f"Marking stream {stream_name} as STARTED") | ||
| self._logger.info(f"Syncing stream: {stream_name}") | ||
| return stream_status_as_airbyte_message( | ||
| stream.as_airbyte_stream(), | ||
| AirbyteStreamStatus.STARTED, | ||
| ) | ||
| else: | ||
| return None | ||
|
|
||
| # All streams in the queue are currently blocked | ||
| return None | ||
|
|
||
| def is_done(self) -> bool: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a check if no streams in _stream_instances_to_start_partition_generation
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also worth it to maybe check that active stream groups is empty too |
||
| """ | ||
|
|
@@ -230,6 +370,47 @@ def is_done(self) -> bool: | |
| def _is_stream_done(self, stream_name: str) -> bool: | ||
| return stream_name in self._streams_done | ||
|
|
||
| def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]: | ||
| """ | ||
| Recursively collect all parent stream names for a given stream. | ||
| For example, if we have: epics -> issues -> comments | ||
| Then for comments, this returns {issues, epics} | ||
|
|
||
| :param stream_name: The stream to collect parents for | ||
| :return: Set of all parent stream names (recursively) | ||
| """ | ||
| parent_names: Set[str] = set() | ||
| stream = self._stream_name_to_instance.get(stream_name) | ||
|
|
||
| if not stream: | ||
| return parent_names | ||
|
|
||
| # Get partition router if it exists (this is where parent streams are defined) | ||
| partition_router = None | ||
|
|
||
| # Try DefaultStream path first (_stream_partition_generator._stream_slicer._partition_router) | ||
| if ( | ||
| hasattr(stream, "_stream_partition_generator") | ||
| and hasattr(stream._stream_partition_generator, "_stream_slicer") | ||
| and hasattr(stream._stream_partition_generator._stream_slicer, "_partition_router") | ||
| ): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels pretty messy to have to perform such a careful assertion on a bunch of potentially null private fields. I know it'll be too much work to make these public, but can we create a helper method on |
||
| partition_router = stream._stream_partition_generator._stream_slicer._partition_router | ||
| # Fallback to legacy path (retriever.partition_router) for backward compatibility and test mocks | ||
| elif hasattr(stream, "retriever") and hasattr(stream.retriever, "partition_router"): | ||
| partition_router = stream.retriever.partition_router | ||
|
|
||
| # SubstreamPartitionRouter has parent_stream_configs | ||
| if partition_router and hasattr(partition_router, "parent_stream_configs"): | ||
| for parent_config in partition_router.parent_stream_configs: | ||
| parent_stream = parent_config.stream | ||
| parent_name = parent_stream.name | ||
| parent_names.add(parent_name) | ||
|
|
||
| # Recursively collect grandparents, great-grandparents, etc. | ||
| parent_names.update(self._collect_all_parent_stream_names(parent_name)) | ||
|
|
||
| return parent_names | ||
|
|
||
| def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: | ||
| self._logger.info( | ||
| f"Read {self._record_counter[stream_name]} records from {stream_name} stream" | ||
|
|
@@ -246,3 +427,19 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: | |
| else AirbyteStreamStatus.COMPLETE | ||
| ) | ||
| yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) | ||
|
|
||
| # Remove only this stream from active set (NOT parents) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the reason why we only remove this stream from the active set because we are already removing the parents from the active set when partition generation is completed in |
||
| if stream_name in self._active_stream_names: | ||
| self._active_stream_names.discard(stream_name) | ||
|
|
||
| # Remove from active groups | ||
| stream_group = self._stream_block_simultaneous_read.get(stream_name, "") | ||
| if stream_group: | ||
| if stream_group in self._active_groups: | ||
| self._active_groups[stream_group].discard(stream_name) | ||
| if not self._active_groups[stream_group]: | ||
| del self._active_groups[stream_group] | ||
| self._logger.info( | ||
| f"Stream '{stream_name}' (group '{stream_group}') is no longer active. " | ||
| f"Blocked streams in the queue will be retried on next start_next_partition_generator call." | ||
| ) | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1553,6 +1553,26 @@ definitions: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: "" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| example: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - "Users" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| block_simultaneous_read: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| title: Block Simultaneous Read | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| description: > | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional group name for blocking simultaneous reads. Streams with the same | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| block_simultaneous_read value will not be read concurrently. This prevents | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| duplicate API calls when a stream is used as both a standalone stream and a | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| parent stream, or when multiple streams share the same endpoint/session. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If set to a non-empty string, the stream will be deferred if: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 1. Another stream in the same group is currently active | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 2. Any parent stream is in an active group | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Examples: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - "issues_endpoint" - All streams with this value block each other | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - "" or null - No blocking (default) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This is useful for APIs that don't allow concurrent access to the same | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| endpoint or session. Only applies to ConcurrentDeclarativeSource. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type: string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: "" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1556
to
+1575
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc/schema mismatch for Your description says Proposed fix (option A: allow null) block_simultaneous_read:
title: Block Simultaneous Read
description: >
@@
- - "" or null - No blocking (default)
+ - "" or null - No blocking (default)
@@
- type: string
+ type:
+ - string
+ - "null"
default: ""Alternative (option B: keep string-only and remove “null” from docs)- - "" or null - No blocking (default)
+ - "" - No blocking (default)📝 Committable suggestion
Suggested change
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retriever: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| title: Retriever | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| description: Component used to coordinate how records are extracted across stream slices and request pages. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,3 @@ | ||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
|
|
||
| # generated by datamodel-codegen: | ||
| # filename: declarative_component_schema.yaml | ||
|
|
||
|
|
@@ -2497,6 +2495,11 @@ class Config: | |
|
|
||
| type: Literal["DeclarativeStream"] | ||
| name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") | ||
| block_simultaneous_read: Optional[str] = Field( | ||
| "", | ||
| description='Optional group name for blocking simultaneous reads. Streams with the same block_simultaneous_read value will not be read concurrently. This prevents duplicate API calls when a stream is used as both a standalone stream and a parent stream, or when multiple streams share the same endpoint/session.\nIf set to a non-empty string, the stream will be deferred if: 1. Another stream in the same group is currently active 2. Any parent stream is in an active group\nExamples: - "issues_endpoint" - All streams with this value block each other - "" or null - No blocking (default)\nThis is useful for APIs that don\'t allow concurrent access to the same endpoint or session. Only applies to ConcurrentDeclarativeSource.\n', | ||
| title="Block Simultaneous Read", | ||
| ) | ||
| retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( | ||
| ..., | ||
| description="Component used to coordinate how records are extracted across stream slices and request pages.", | ||
|
|
@@ -2741,7 +2744,7 @@ class HttpRequester(BaseModelWithDeprecations): | |
| ) | ||
| use_cache: Optional[bool] = Field( | ||
| False, | ||
| description="Enables stream requests caching. This field is automatically set by the CDK.", | ||
| description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", | ||
| title="Use Cache", | ||
| ) | ||
|
Comment on lines
2746
to
2749
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The updated wording says “Only set this to false…” while the default is already 🤖 Prompt for AI Agents |
||
| parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this case. Why do we put the stream into the back of the
stream_instances_to_startif this stream is already active. If this stream is already active, it's presumably already generating partitions and syncing so why do we want to retry it by putting it in the back of the queue?