Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: async streaming handler send to queue logic #162

Merged
merged 1 commit into from
Feb 25, 2025

Conversation

acoola
Copy link
Collaborator

@acoola acoola commented Feb 25, 2025

Important

Refactor queue handling in StreamingQueueCallbackHandler by introducing send_to_queue method, with async handling in AsyncStreamingIteratorCallbackHandler.

  • Refactoring:
    • Introduce send_to_queue method in StreamingQueueCallbackHandler to encapsulate queue sending logic.
    • Override send_to_queue in AsyncStreamingIteratorCallbackHandler to use asyncio.run_coroutine_threadsafe for async queue operations.
  • Behavior:
    • Replace self.queue.put_nowait(event) with self.send_to_queue(event) in on_node_execute_stream and on_workflow_end methods of StreamingQueueCallbackHandler.

This description was created by Ellipsis for 6da2821. It will automatically update as commits are pushed.

@acoola acoola requested a review from a team as a code owner February 25, 2025 15:13
Copy link

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good to me! Reviewed everything up to 6da2821 in 1 minute and 18 seconds

More details
  • Looked at 60 lines of code in 1 files
  • Skipped 0 files when reviewing.
  • Skipped posting 8 drafted comments based on config settings.
1. dynamiq/callbacks/streaming.py:62
  • Draft comment:
    Nice abstraction: using send_to_queue instead of directly calling put_nowait centralizes the queue logic.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50%
    None
2. dynamiq/callbacks/streaming.py:83
  • Draft comment:
    Consistent refactoring: using send_to_queue for workflow end event as well.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50%
    None
3. dynamiq/callbacks/streaming.py:95
  • Draft comment:
    The new send_to_queue method in the base handler encapsulates queue logic well.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50%
    None
4. dynamiq/callbacks/streaming.py:178
  • Draft comment:
    Using asyncio.run_coroutine_threadsafe for queue.put is appropriate for thread-safe calls; consider checking or handling the returned Future for potential exceptions.
  • Reason this comment was not posted:
    Confidence changes required: 33% <= threshold 50%
    None
5. dynamiq/callbacks/streaming.py:176
  • Draft comment:
    Consider potential deprecation of asyncio.get_event_loop; using asyncio.get_running_loop() might be preferable if the context allows.
  • Reason this comment was not posted:
    Confidence changes required: 33% <= threshold 50%
    None
6. dynamiq/callbacks/streaming.py:64
  • Draft comment:
    Refactored to use send_to_queue(event) improves maintainability by centralizing event dispatch logic. Ensure all overrides maintain consistent behavior.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50%
    None
7. dynamiq/callbacks/streaming.py:179
  • Draft comment:
    Using asyncio.run_coroutine_threadsafe in send_to_queue requires the event loop to be running in a separate thread. Consider checking the returned Future for exceptions to handle potential errors.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 30% vs. threshold = 50%
    The comment makes a valid technical point - run_coroutine_threadsafe returns a Future that could contain exceptions that should be handled. However, the base class already has error handling in _iter_queue_events. The comment also assumes threading usage that may not be accurate - the loop could be in the same thread. The suggestion is somewhat speculative without knowing the full threading context.
    I may be overlooking a critical error case that could be missed by the existing error handling. The threading assumptions could be correct in ways not visible in this file.
    While valid points, keeping error handling at the iteration level rather than queue insertion level is a reasonable design choice. Without stronger evidence that this is definitely an issue, the comment remains somewhat speculative.
    The comment should be removed as it makes assumptions about threading and suggests error handling changes without clear evidence that the current approach is problematic.
8. dynamiq/callbacks/streaming.py:173
  • Draft comment:
    Consider using asyncio.get_running_loop() instead of asyncio.get_event_loop() for retrieving the loop, to ensure a valid running loop in modern Python async contexts.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50%
    get_running_loop() is indeed preferred in modern Python as it ensures there's an active loop, while get_event_loop() may create new loops leading to issues. However, this class appears to be designed to work across threads (given run_coroutine_threadsafe usage), where get_running_loop() could fail if called from a different thread. The current implementation is likely intentional for cross-thread compatibility.
    I might be wrong about the cross-thread usage intention. Maybe the class is always used within the same event loop context.
    The use of run_coroutine_threadsafe strongly suggests this is designed for cross-thread scenarios, where get_event_loop() is actually the correct choice.
    The current implementation using get_event_loop() appears to be correct for this use case. The comment should be deleted as it could lead to incorrect changes.

Workflow ID: wflow_Twr2khEx1t2rmmEM


You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

Copy link

Coverage

Coverage Report •
FileStmtsMissCoverMissing
dynamiq/callbacks
   streaming.py682169%96, 134–135, 143, 170–176, 180, 188–193, 201, 209–210
TOTAL11845366569% 

Tests Skipped Failures Errors Time
413 0 💤 0 ❌ 0 🔥 44.473s ⏱️

@acoola acoola merged commit 7703aad into main Feb 25, 2025
8 checks passed
@acoola acoola deleted the fix/async-streaming-handler branch February 25, 2025 16:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants