Skip to content

Conversation

@apascal07
Copy link
Collaborator

@apascal07 apascal07 commented Dec 17, 2025

Adds support for durable streaming, allowing clients to reconnect to in-progress or completed streams using a stream ID.

Usage:

sm := core.NewInMemoryStreamManager(core.WithTTL(10 * time.Minute))
handler := genkit.Handler(myStreamingFlow, genkit.WithStreamManager(sm))

Behavior:

  • Streaming responses include X-Genkit-Stream-Id header
  • Clients can subscribe by sending X-Genkit-Stream-Id in request header
  • Subscribing replays buffered chunks then continues with live updates
  • Subscribing to non-existent stream returns 204 No Content

Non in-memory implementations like Firestore to follow.

Checklist (if applicable):

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the streaming capabilities of Go flows by introducing durable streaming. This feature allows clients to seamlessly reconnect to ongoing or completed streaming operations, improving the robustness and user experience of applications that rely on real-time data. It provides the necessary interfaces, an in-memory implementation, and integrates this functionality directly into the existing handler mechanism, making it easy to adopt.

Highlights

  • Durable Streaming Interfaces: Introduced StreamManager and ActionStreamInput interfaces to define the contract for managing and interacting with durable streams, allowing for flexible storage backends.
  • In-Memory Stream Manager: Provided an InMemoryStreamManager implementation, complete with options like WithTTL for automatic cleanup of completed streams, suitable for testing or single-instance deployments.
  • Handler Integration: Added a WithStreamManager option to genkit.Handler to enable durable streaming for flows, automatically managing stream IDs and client reconnections.
  • Client Reconnection: Clients can now reconnect to in-progress or completed streams by sending an X-Genkit-Stream-Id header, which replays buffered chunks and continues with live updates or the final result.
  • New Sample Application: A new sample (durable-streaming/main.go) demonstrates how to set up and use durable streaming with a countdown flow, including curl examples for testing.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-designed feature for durable streaming in Go flows. The new interfaces and the in-memory implementation are clear and robust. The integration into the HTTP server is also well-handled. I've identified a critical issue regarding error handling in durable writes, a high-severity issue with the memory cleanup strategy that could lead to a memory leak in long-running services, and a minor issue regarding a magic number. Overall, this is a great addition, and with these fixes, it will be even more solid.

@apascal07 apascal07 requested a review from pavelgj December 17, 2025 14:53
@apascal07 apascal07 requested a review from huangjeff5 December 19, 2025 16:13
// NewInMemoryStreamManager creates a new InMemoryStreamManager.
// A background goroutine is started to periodically clean up expired streams.
// Call Close to stop the goroutine when the manager is no longer needed.
func NewInMemoryStreamManager(opts ...StreamManagerOption) *InMemoryStreamManager {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some unit tests for the in memory stream manager?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants