From 33bca24096a114fae431ff9850b855b9c02187d1 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Tue, 16 Dec 2025 19:28:34 -0800 Subject: [PATCH 1/5] Added support for durable streaming in flows. --- go/core/streaming.go | 350 +++++++++++++++++++++++++++ go/genkit/servers.go | 281 +++++++++++++++++---- go/genkit/servers_test.go | 129 +++++++++- go/samples/durable-streaming/main.go | 100 ++++++++ 4 files changed, 809 insertions(+), 51 deletions(-) create mode 100644 go/core/streaming.go create mode 100644 go/samples/durable-streaming/main.go diff --git a/go/core/streaming.go b/go/core/streaming.go new file mode 100644 index 0000000000..eadfc510e4 --- /dev/null +++ b/go/core/streaming.go @@ -0,0 +1,350 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package core + +import ( + "context" + "encoding/json" + "sync" + "time" +) + +// StreamEventType indicates the type of stream event. +type StreamEventType int + +const ( + StreamEventChunk StreamEventType = iota + StreamEventDone + StreamEventError +) + +// StreamEvent represents an event in a durable stream. +type StreamEvent struct { + Type StreamEventType + Chunk json.RawMessage // set when Type == StreamEventChunk + Output json.RawMessage // set when Type == StreamEventDone + Err error // set when Type == StreamEventError +} + +// ActionStreamInput provides methods for writing to a durable stream. +type ActionStreamInput interface { + // Write sends a chunk to the stream and notifies all subscribers. + Write(chunk json.RawMessage) error + // Done marks the stream as successfully completed with the given output. + Done(output json.RawMessage) error + // Error marks the stream as failed with the given error. + Error(err error) error + // Close releases resources without marking the stream as done or errored. + Close() error +} + +// StreamManager manages durable streams, allowing creation and subscription. +// Implementations can provide different storage backends (e.g., in-memory, database, cache). +type StreamManager interface { + // Open creates a new stream for writing. + // Returns an error if a stream with the given ID already exists. + Open(ctx context.Context, streamID string) (ActionStreamInput, error) + // Subscribe subscribes to an existing stream. + // Returns a channel that receives stream events, an unsubscribe function, and an error. + // If the stream has already completed, all buffered events are sent before the done/error event. + // Returns NOT_FOUND error if the stream doesn't exist. + Subscribe(ctx context.Context, streamID string) (<-chan StreamEvent, func(), error) +} + +// streamStatus represents the current state of a stream. +type streamStatus int + +const ( + streamStatusOpen streamStatus = iota + streamStatusDone + streamStatusError +) + +// streamState holds the internal state of a single stream. +type streamState struct { + status streamStatus + chunks []json.RawMessage + output json.RawMessage + err error + subscribers []chan StreamEvent + lastTouched time.Time + mu sync.RWMutex +} + +// InMemoryStreamManager is an in-memory implementation of StreamManager. +// Useful for testing or single-instance deployments where persistence is not required. +type InMemoryStreamManager struct { + streams map[string]*streamState + mu sync.RWMutex + ttl time.Duration + cleanupMu sync.Mutex + lastCleanup time.Time +} + +// StreamManagerOption configures an InMemoryStreamManager. +type StreamManagerOption interface { + applyInMemoryStreamManager(*streamManagerOptions) +} + +// streamManagerOptions holds configuration for InMemoryStreamManager. +type streamManagerOptions struct { + TTL time.Duration // Time-to-live for completed streams. +} + +func (o *streamManagerOptions) applyInMemoryStreamManager(opts *streamManagerOptions) { + if o.TTL > 0 { + opts.TTL = o.TTL + } +} + +// WithTTL sets the time-to-live for completed streams. +// Streams that have completed (done or error) will be cleaned up after this duration. +// Default is 5 minutes. +func WithTTL(ttl time.Duration) StreamManagerOption { + return &streamManagerOptions{TTL: ttl} +} + +// NewInMemoryStreamManager creates a new InMemoryStreamManager. +func NewInMemoryStreamManager(opts ...StreamManagerOption) *InMemoryStreamManager { + options := &streamManagerOptions{ + TTL: 5 * time.Minute, + } + for _, opt := range opts { + opt.applyInMemoryStreamManager(options) + } + return &InMemoryStreamManager{ + streams: make(map[string]*streamState), + ttl: options.TTL, + } +} + +// cleanup removes expired streams. Called periodically during operations. +func (m *InMemoryStreamManager) cleanup() { + m.cleanupMu.Lock() + if time.Since(m.lastCleanup) < time.Minute { + m.cleanupMu.Unlock() + return + } + m.lastCleanup = time.Now() + m.cleanupMu.Unlock() + + now := time.Now() + m.mu.Lock() + defer m.mu.Unlock() + + for id, state := range m.streams { + state.mu.RLock() + shouldDelete := state.status != streamStatusOpen && now.Sub(state.lastTouched) > m.ttl + state.mu.RUnlock() + if shouldDelete { + delete(m.streams, id) + } + } +} + +// Open creates a new stream for writing. +func (m *InMemoryStreamManager) Open(ctx context.Context, streamID string) (ActionStreamInput, error) { + m.cleanup() + + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.streams[streamID]; exists { + return nil, NewPublicError(ALREADY_EXISTS, "stream already exists", nil) + } + + state := &streamState{ + status: streamStatusOpen, + chunks: make([]json.RawMessage, 0), + subscribers: make([]chan StreamEvent, 0), + lastTouched: time.Now(), + } + m.streams[streamID] = state + + return &inMemoryStreamInput{ + manager: m, + streamID: streamID, + state: state, + }, nil +} + +// Subscribe subscribes to an existing stream. +func (m *InMemoryStreamManager) Subscribe(ctx context.Context, streamID string) (<-chan StreamEvent, func(), error) { + m.mu.RLock() + state, exists := m.streams[streamID] + m.mu.RUnlock() + + if !exists { + return nil, nil, NewPublicError(NOT_FOUND, "stream not found", nil) + } + + ch := make(chan StreamEvent, 100) + + state.mu.Lock() + defer state.mu.Unlock() + + // Send all buffered chunks + for _, chunk := range state.chunks { + select { + case ch <- StreamEvent{Type: StreamEventChunk, Chunk: chunk}: + case <-ctx.Done(): + close(ch) + return nil, nil, ctx.Err() + } + } + + // Handle completed streams + switch state.status { + case streamStatusDone: + ch <- StreamEvent{Type: StreamEventDone, Output: state.output} + close(ch) + return ch, func() {}, nil + case streamStatusError: + ch <- StreamEvent{Type: StreamEventError, Err: state.err} + close(ch) + return ch, func() {}, nil + } + + // Stream is still open, add subscriber + state.subscribers = append(state.subscribers, ch) + + unsubscribe := func() { + state.mu.Lock() + defer state.mu.Unlock() + for i, sub := range state.subscribers { + if sub == ch { + state.subscribers = append(state.subscribers[:i], state.subscribers[i+1:]...) + close(ch) + break + } + } + } + + return ch, unsubscribe, nil +} + +// inMemoryStreamInput implements ActionStreamInput for the in-memory manager. +type inMemoryStreamInput struct { + manager *InMemoryStreamManager + streamID string + state *streamState + closed bool + mu sync.Mutex +} + +func (s *inMemoryStreamInput) Write(chunk json.RawMessage) error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil) + } + s.mu.Unlock() + + s.state.mu.Lock() + defer s.state.mu.Unlock() + + if s.state.status != streamStatusOpen { + return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil) + } + + s.state.chunks = append(s.state.chunks, chunk) + s.state.lastTouched = time.Now() + + event := StreamEvent{Type: StreamEventChunk, Chunk: chunk} + for _, ch := range s.state.subscribers { + select { + case ch <- event: + default: + // Channel full, skip (subscriber is slow) + } + } + + return nil +} + +func (s *inMemoryStreamInput) Done(output json.RawMessage) error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil) + } + s.closed = true + s.mu.Unlock() + + s.state.mu.Lock() + defer s.state.mu.Unlock() + + if s.state.status != streamStatusOpen { + return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil) + } + + s.state.status = streamStatusDone + s.state.output = output + s.state.lastTouched = time.Now() + + event := StreamEvent{Type: StreamEventDone, Output: output} + for _, ch := range s.state.subscribers { + select { + case ch <- event: + default: + } + close(ch) + } + s.state.subscribers = nil + + return nil +} + +func (s *inMemoryStreamInput) Error(err error) error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil) + } + s.closed = true + s.mu.Unlock() + + s.state.mu.Lock() + defer s.state.mu.Unlock() + + if s.state.status != streamStatusOpen { + return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil) + } + + s.state.status = streamStatusError + s.state.err = err + s.state.lastTouched = time.Now() + + event := StreamEvent{Type: StreamEventError, Err: err} + for _, ch := range s.state.subscribers { + select { + case ch <- event: + default: + } + close(ch) + } + s.state.subscribers = nil + + return nil +} + +func (s *inMemoryStreamInput) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + return nil +} diff --git a/go/genkit/servers.go b/go/genkit/servers.go index d48c11ffd6..db4b962f2b 100644 --- a/go/genkit/servers.go +++ b/go/genkit/servers.go @@ -31,23 +31,34 @@ import ( "github.com/firebase/genkit/go/core" "github.com/firebase/genkit/go/core/api" "github.com/firebase/genkit/go/core/logger" + "github.com/google/uuid" ) +// HandlerOption configures a Handler. type HandlerOption interface { - apply(params *handlerParams) + applyHandler(*handlerOptions) } -// handlerParams are the parameters for an action HTTP handler. -type handlerParams struct { +// handlerOptions are options for an action HTTP handler. +type handlerOptions struct { ContextProviders []core.ContextProvider // Providers for action context that may be used during runtime. + StreamManager core.StreamManager // Optional manager for durable stream storage. } -// apply applies the options to the handler params. -func (p *handlerParams) apply(params *handlerParams) { - if params.ContextProviders != nil { - panic("genkit.WithContextProviders: cannot set ContextProviders more than once") +func (o *handlerOptions) applyHandler(opts *handlerOptions) { + if o.ContextProviders != nil { + if opts.ContextProviders != nil { + panic("genkit.WithContextProviders: cannot set ContextProviders more than once") + } + opts.ContextProviders = o.ContextProviders + } + + if o.StreamManager != nil { + if opts.StreamManager != nil { + panic("genkit.WithStreamManager: cannot set StreamManager more than once") + } + opts.StreamManager = o.StreamManager } - params.ContextProviders = p.ContextProviders } // requestID is a unique ID for each request. @@ -56,7 +67,14 @@ var requestID atomic.Int64 // WithContextProviders adds providers for action context that may be used during runtime. // They are called in the order added and may overwrite previous context. func WithContextProviders(ctxProviders ...core.ContextProvider) HandlerOption { - return &handlerParams{ContextProviders: ctxProviders} + return &handlerOptions{ContextProviders: ctxProviders} +} + +// WithStreamManager enables durable streaming with the provided StreamManager. +// When enabled, streaming responses include an x-genkit-stream-id header that clients +// can use to reconnect to in-progress or completed streams. +func WithStreamManager(manager core.StreamManager) HandlerOption { + return &handlerOptions{StreamManager: manager} } // Handler returns an HTTP handler function that serves the action with the provided options. @@ -67,12 +85,12 @@ func WithContextProviders(ctxProviders ...core.ContextProvider) HandlerOption { // return api.ActionContext{"myKey": "myValue"}, nil // })) func Handler(a api.Action, opts ...HandlerOption) http.HandlerFunc { - params := &handlerParams{} + options := &handlerOptions{} for _, opt := range opts { - opt.apply(params) + opt.applyHandler(options) } - return wrapHandler(handler(a, params)) + return wrapHandler(handler(a, options)) } // wrapHandler wraps an HTTP handler function with common logging and error handling. @@ -101,8 +119,9 @@ func wrapHandler(h func(http.ResponseWriter, *http.Request) error) http.HandlerF } } -// handler returns an HTTP handler function that serves the action with the provided params. Responses are written in server-sent events (SSE) format. -func handler(a api.Action, params *handlerParams) func(http.ResponseWriter, *http.Request) error { +// handler returns an HTTP handler function that serves the action with the provided options. +// Streaming responses are written in server-sent events (SSE) format. +func handler(a api.Action, opts *handlerOptions) func(http.ResponseWriter, *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { if a == nil { return errors.New("action is nil; cannot serve") @@ -124,29 +143,9 @@ func handler(a api.Action, params *handlerParams) func(http.ResponseWriter, *htt } stream = stream || r.Header.Get("Accept") == "text/event-stream" - var callback streamingCallback[json.RawMessage] - if stream { - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Transfer-Encoding", "chunked") - callback = func(ctx context.Context, msg json.RawMessage) error { - _, err := fmt.Fprintf(w, "data: {\"message\": %s}\n\n", msg) - if err != nil { - return err - } - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - return nil - } - } else { - w.Header().Set("Content-Type", "application/json") - } - ctx := r.Context() - if params.ContextProviders != nil { - for _, ctxProvider := range params.ContextProviders { + if opts.ContextProviders != nil { + for _, ctxProvider := range opts.ContextProviders { headers := make(map[string]string, len(r.Header)) for k, v := range r.Header { headers[strings.ToLower(k)] = strings.Join(v, " ") @@ -170,22 +169,216 @@ func handler(a api.Action, params *handlerParams) func(http.ResponseWriter, *htt } } - out, err := a.RunJSON(ctx, body.Data, callback) - if err != nil { - if stream { - _, err = fmt.Fprintf(w, "data: {\"error\": {\"status\": \"INTERNAL\", \"message\": \"stream flow error\", \"details\": \"%v\"}}\n\n", err) - return err + if stream { + streamID := r.Header.Get("X-Genkit-Stream-Id") + + if streamID != "" && opts.StreamManager != nil { + return subscribeToStream(ctx, w, opts.StreamManager, streamID) + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Transfer-Encoding", "chunked") + + if opts.StreamManager != nil { + return runWithDurableStreaming(ctx, w, a, opts.StreamManager, body.Data) } + + return runWithStreaming(ctx, w, a, body.Data) + } + + w.Header().Set("Content-Type", "application/json") + out, err := a.RunJSON(ctx, body.Data, nil) + if err != nil { return err } - if stream { - _, err = fmt.Fprintf(w, "data: {\"result\": %s}\n\n", out) + return writeResultResponse(w, out) + } +} + +// runWithStreaming executes the action with standard HTTP streaming (no durability). +func runWithStreaming(ctx context.Context, w http.ResponseWriter, a api.Action, input json.RawMessage) error { + callback := func(ctx context.Context, msg json.RawMessage) error { + if err := writeSSEMessage(w, msg); err != nil { + return err + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return nil + } + + out, err := a.RunJSON(ctx, input, callback) + if err != nil { + if werr := writeSSEError(w, err); werr != nil { + return werr + } + return nil + } + return writeSSEResult(w, out) +} + +// runWithDurableStreaming executes the action with durable streaming support. +// Chunks are written to both the HTTP response and the stream manager for later replay. +func runWithDurableStreaming(ctx context.Context, w http.ResponseWriter, a api.Action, sm core.StreamManager, input json.RawMessage) error { + streamID := uuid.New().String() + + durableStream, err := sm.Open(ctx, streamID) + if err != nil { + return err + } + defer durableStream.Close() + + w.Header().Set("X-Genkit-Stream-Id", streamID) + + callback := func(ctx context.Context, msg json.RawMessage) error { + if err := writeSSEMessage(w, msg); err != nil { return err } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + + // Fire-and-forget write to durable stream. + go durableStream.Write(msg) + return nil + } + + out, err := a.RunJSON(ctx, input, callback) + if err != nil { + durableStream.Error(err) + if werr := writeSSEError(w, err); werr != nil { + return werr + } + return nil + } + + durableStream.Done(out) + return writeSSEResult(w, out) +} + +// subscribeToStream subscribes to an existing durable stream and writes events to the HTTP response. +func subscribeToStream(ctx context.Context, w http.ResponseWriter, sm core.StreamManager, streamID string) error { + events, unsubscribe, err := sm.Subscribe(ctx, streamID) + if err != nil { + var ufErr *core.UserFacingError + if errors.As(err, &ufErr) && ufErr.Status == core.NOT_FOUND { + w.WriteHeader(http.StatusNoContent) + return nil + } + return err + } + defer unsubscribe() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Transfer-Encoding", "chunked") + + for event := range events { + switch event.Type { + case core.StreamEventChunk: + if err := writeSSEMessage(w, event.Chunk); err != nil { + return err + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + case core.StreamEventDone: + if err := writeSSEResult(w, event.Output); err != nil { + return err + } + return nil + case core.StreamEventError: + streamErr := event.Err + if streamErr == nil { + streamErr = errors.New("unknown error") + } + if err := writeSSEError(w, streamErr); err != nil { + return err + } + return nil + } + } - _, err = fmt.Fprintf(w, "{\"result\": %s}\n", out) + return nil +} + +// flowResultResponse wraps a final action result for JSON serialization. +type flowResultResponse struct { + Result json.RawMessage `json:"result"` +} + +// flowMessageResponse wraps a streaming chunk for JSON serialization. +type flowMessageResponse struct { + Message json.RawMessage `json:"message"` +} + +// flowErrorResponse wraps an error for JSON serialization in streaming responses. +type flowErrorResponse struct { + Error *flowError `json:"error"` +} + +// flowError represents the error payload in a streaming error response. +type flowError struct { + Status core.StatusName `json:"status"` + Message string `json:"message"` + Details string `json:"details,omitempty"` +} + +// writeResultResponse writes a JSON result response for non-streaming requests. +func writeResultResponse(w http.ResponseWriter, result json.RawMessage) error { + resp := flowResultResponse{Result: result} + data, err := json.Marshal(resp) + if err != nil { + return err + } + _, err = w.Write(data) + if err != nil { + return err + } + _, err = w.Write([]byte("\n")) + return err +} + +// writeSSEResult writes a JSON result as a server-sent event for streaming requests. +func writeSSEResult(w http.ResponseWriter, result json.RawMessage) error { + resp := flowResultResponse{Result: result} + data, err := json.Marshal(resp) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "data: %s\n\n", data) + return err +} + +// writeSSEMessage writes a streaming chunk as a server-sent event. +func writeSSEMessage(w http.ResponseWriter, msg json.RawMessage) error { + resp := flowMessageResponse{Message: msg} + data, err := json.Marshal(resp) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "data: %s\n\n", data) + return err +} + +// writeSSEError writes an error as a server-sent event for streaming requests. +func writeSSEError(w http.ResponseWriter, flowErr error) error { + resp := flowErrorResponse{ + Error: &flowError{ + Status: core.INTERNAL, + Message: "stream flow error", + Details: flowErr.Error(), + }, + } + data, err := json.Marshal(resp) + if err != nil { return err } + _, err = fmt.Fprintf(w, "data: %s\n\n", data) + return err } func parseBoolQueryParam(r *http.Request, name string) (bool, error) { diff --git a/go/genkit/servers_test.go b/go/genkit/servers_test.go index a0a07cc21b..6a1e7308ae 100644 --- a/go/genkit/servers_test.go +++ b/go/genkit/servers_test.go @@ -222,17 +222,17 @@ func TestStreamingHandler(t *testing.T) { t.Errorf("want status code %d, got %d", http.StatusOK, resp.StatusCode) } - expected := `data: {"message": "h"} + expected := `data: {"message":"h"} -data: {"message": "e"} +data: {"message":"e"} -data: {"message": "l"} +data: {"message":"l"} -data: {"message": "l"} +data: {"message":"l"} -data: {"message": "o"} +data: {"message":"o"} -data: {"result": "hello-end"} +data: {"result":"hello-end"} ` if string(body) != expected { @@ -256,7 +256,7 @@ data: {"result": "hello-end"} t.Errorf("want status code %d, got %d", http.StatusOK, resp.StatusCode) } - expected := `data: {"error": {"status": "INTERNAL", "message": "stream flow error", "details": "streaming error"}} + expected := `data: {"error":{"status":"INTERNAL_SERVER_ERROR","message":"stream flow error","details":"streaming error"}} ` if string(body) != expected { @@ -264,3 +264,118 @@ data: {"result": "hello-end"} } }) } + +func TestDurableStreamingHandler(t *testing.T) { + g := Init(context.Background()) + + streamingFlow := DefineStreamingFlow(g, "durableStreaming", + func(ctx context.Context, input string, cb func(context.Context, string) error) (string, error) { + for _, c := range input { + if err := cb(ctx, string(c)); err != nil { + return "", err + } + } + return input + "-done", nil + }) + + t.Run("returns stream ID header", func(t *testing.T) { + sm := core.NewInMemoryStreamManager() + handler := Handler(streamingFlow, WithStreamManager(sm)) + + req := httptest.NewRequest("POST", "/", strings.NewReader(`{"data":"hi"}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "text/event-stream") + w := httptest.NewRecorder() + + handler(w, req) + + resp := w.Result() + body, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != http.StatusOK { + t.Errorf("want status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + streamID := resp.Header.Get("X-Genkit-Stream-Id") + if streamID == "" { + t.Error("want X-Genkit-Stream-Id header to be set") + } + + expected := `data: {"message":"h"} + +data: {"message":"i"} + +data: {"result":"hi-done"} + +` + if string(body) != expected { + t.Errorf("want streaming body:\n%q\n\nGot:\n%q", expected, string(body)) + } + }) + + t.Run("subscribe to completed stream", func(t *testing.T) { + sm := core.NewInMemoryStreamManager() + handler := Handler(streamingFlow, WithStreamManager(sm)) + + // First request - run the stream to completion + req1 := httptest.NewRequest("POST", "/", strings.NewReader(`{"data":"ab"}`)) + req1.Header.Set("Content-Type", "application/json") + req1.Header.Set("Accept", "text/event-stream") + w1 := httptest.NewRecorder() + + handler(w1, req1) + + resp1 := w1.Result() + streamID := resp1.Header.Get("X-Genkit-Stream-Id") + if streamID == "" { + t.Fatal("want X-Genkit-Stream-Id header to be set") + } + + // Second request - subscribe to the completed stream + req2 := httptest.NewRequest("POST", "/", strings.NewReader(`{"data":"ignored"}`)) + req2.Header.Set("Content-Type", "application/json") + req2.Header.Set("Accept", "text/event-stream") + req2.Header.Set("X-Genkit-Stream-Id", streamID) + w2 := httptest.NewRecorder() + + handler(w2, req2) + + resp2 := w2.Result() + body2, _ := io.ReadAll(resp2.Body) + + if resp2.StatusCode != http.StatusOK { + t.Errorf("want status code %d, got %d", http.StatusOK, resp2.StatusCode) + } + + // Should replay all chunks and the final result + expected := `data: {"message":"a"} + +data: {"message":"b"} + +data: {"result":"ab-done"} + +` + if string(body2) != expected { + t.Errorf("want replayed body:\n%q\n\nGot:\n%q", expected, string(body2)) + } + }) + + t.Run("subscribe to non-existent stream returns 204", func(t *testing.T) { + sm := core.NewInMemoryStreamManager() + handler := Handler(streamingFlow, WithStreamManager(sm)) + + req := httptest.NewRequest("POST", "/", strings.NewReader(`{"data":"test"}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("X-Genkit-Stream-Id", "non-existent-stream-id") + w := httptest.NewRecorder() + + handler(w, req) + + resp := w.Result() + + if resp.StatusCode != http.StatusNoContent { + t.Errorf("want status code %d, got %d", http.StatusNoContent, resp.StatusCode) + } + }) +} diff --git a/go/samples/durable-streaming/main.go b/go/samples/durable-streaming/main.go new file mode 100644 index 0000000000..146349e29a --- /dev/null +++ b/go/samples/durable-streaming/main.go @@ -0,0 +1,100 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// This sample demonstrates durable streaming, which allows clients to reconnect +// to in-progress or completed streams using a stream ID. +// +// Start the server: +// +// go run . +// +// Test streaming (get a stream ID back in X-Genkit-Stream-Id header): +// +// curl -N -i -H "Accept: text/event-stream" \ +// -d '{"data": 5}' \ +// http://localhost:8080/countdown +// +// Subscribe to an existing stream using the stream ID from the previous response: +// +// curl -N -H "Accept: text/event-stream" \ +// -H "X-Genkit-Stream-Id: " \ +// -d '{"data": 5}' \ +// http://localhost:8080/countdown +// +// The subscription will replay any buffered chunks and then continue with live updates. +// If the stream has already completed, all chunks plus the final result are returned. + +package main + +import ( + "context" + "fmt" + "log" + "net/http" + "time" + + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +func main() { + ctx := context.Background() + g := genkit.Init(ctx) + + type CountdownChunk struct { + Count int `json:"count"` + Message string `json:"message"` + Timestamp string `json:"timestamp"` + } + + // Define a streaming flow that counts down with delays. + countdown := genkit.DefineStreamingFlow(g, "countdown", + func(ctx context.Context, count int, cb func(context.Context, CountdownChunk) error) (string, error) { + if count <= 0 { + count = 5 + } + + for i := count; i > 0; i-- { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(1 * time.Second): + } + + chunk := CountdownChunk{ + Count: i, + Message: fmt.Sprintf("T-%d...", i), + Timestamp: time.Now().Format(time.RFC3339), + } + + if cb != nil { + if err := cb(ctx, chunk); err != nil { + return "", err + } + } + } + + return "Liftoff!", nil + }) + + // Set up HTTP server with durable streaming enabled. + // Completed streams are kept for 10 minutes before cleanup (while server is running). + mux := http.NewServeMux() + mux.HandleFunc("POST /countdown", genkit.Handler(countdown, + genkit.WithStreamManager(core.NewInMemoryStreamManager(core.WithTTL(10*time.Minute))), + )) + log.Fatal(http.ListenAndServe("127.0.0.1:8080", mux)) +} From 0f3527b61d5993e5fafab24fcb5090ad6ac3022a Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Tue, 16 Dec 2025 19:48:43 -0800 Subject: [PATCH 2/5] Put clean up on a loop. --- go/core/streaming.go | 57 +++++++++++++++++++++++++++------------ go/genkit/servers_test.go | 3 +++ 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/go/core/streaming.go b/go/core/streaming.go index eadfc510e4..2fa9606414 100644 --- a/go/core/streaming.go +++ b/go/core/streaming.go @@ -23,6 +23,9 @@ import ( "time" ) +// inMemoryStreamBufferSize is the buffer size for subscriber event channels. +const inMemoryStreamBufferSize = 100 + // StreamEventType indicates the type of stream event. type StreamEventType int @@ -87,12 +90,13 @@ type streamState struct { // InMemoryStreamManager is an in-memory implementation of StreamManager. // Useful for testing or single-instance deployments where persistence is not required. +// Call Close to stop the background cleanup goroutine when the manager is no longer needed. type InMemoryStreamManager struct { - streams map[string]*streamState - mu sync.RWMutex - ttl time.Duration - cleanupMu sync.Mutex - lastCleanup time.Time + streams map[string]*streamState + mu sync.RWMutex + ttl time.Duration + stopCh chan struct{} + doneCh chan struct{} } // StreamManagerOption configures an InMemoryStreamManager. @@ -119,6 +123,8 @@ func WithTTL(ttl time.Duration) StreamManagerOption { } // NewInMemoryStreamManager creates a new InMemoryStreamManager. +// A background goroutine is started to periodically clean up expired streams. +// Call Close to stop the goroutine when the manager is no longer needed. func NewInMemoryStreamManager(opts ...StreamManagerOption) *InMemoryStreamManager { options := &streamManagerOptions{ TTL: 5 * time.Minute, @@ -126,22 +132,34 @@ func NewInMemoryStreamManager(opts ...StreamManagerOption) *InMemoryStreamManage for _, opt := range opts { opt.applyInMemoryStreamManager(options) } - return &InMemoryStreamManager{ + m := &InMemoryStreamManager{ streams: make(map[string]*streamState), ttl: options.TTL, + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), } + go m.cleanupLoop() + return m } -// cleanup removes expired streams. Called periodically during operations. -func (m *InMemoryStreamManager) cleanup() { - m.cleanupMu.Lock() - if time.Since(m.lastCleanup) < time.Minute { - m.cleanupMu.Unlock() - return +// cleanupLoop runs periodically to remove expired streams. +func (m *InMemoryStreamManager) cleanupLoop() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + defer close(m.doneCh) + + for { + select { + case <-m.stopCh: + return + case <-ticker.C: + m.cleanupExpiredStreams() + } } - m.lastCleanup = time.Now() - m.cleanupMu.Unlock() +} +// cleanupExpiredStreams removes streams that have completed and exceeded the TTL. +func (m *InMemoryStreamManager) cleanupExpiredStreams() { now := time.Now() m.mu.Lock() defer m.mu.Unlock() @@ -156,10 +174,15 @@ func (m *InMemoryStreamManager) cleanup() { } } +// Close stops the background cleanup goroutine and releases resources. +// This method blocks until the cleanup goroutine has stopped. +func (m *InMemoryStreamManager) Close() { + close(m.stopCh) + <-m.doneCh +} + // Open creates a new stream for writing. func (m *InMemoryStreamManager) Open(ctx context.Context, streamID string) (ActionStreamInput, error) { - m.cleanup() - m.mu.Lock() defer m.mu.Unlock() @@ -192,7 +215,7 @@ func (m *InMemoryStreamManager) Subscribe(ctx context.Context, streamID string) return nil, nil, NewPublicError(NOT_FOUND, "stream not found", nil) } - ch := make(chan StreamEvent, 100) + ch := make(chan StreamEvent, inMemoryStreamBufferSize) state.mu.Lock() defer state.mu.Unlock() diff --git a/go/genkit/servers_test.go b/go/genkit/servers_test.go index 6a1e7308ae..ba5fe14585 100644 --- a/go/genkit/servers_test.go +++ b/go/genkit/servers_test.go @@ -280,6 +280,7 @@ func TestDurableStreamingHandler(t *testing.T) { t.Run("returns stream ID header", func(t *testing.T) { sm := core.NewInMemoryStreamManager() + defer sm.Close() handler := Handler(streamingFlow, WithStreamManager(sm)) req := httptest.NewRequest("POST", "/", strings.NewReader(`{"data":"hi"}`)) @@ -315,6 +316,7 @@ data: {"result":"hi-done"} t.Run("subscribe to completed stream", func(t *testing.T) { sm := core.NewInMemoryStreamManager() + defer sm.Close() handler := Handler(streamingFlow, WithStreamManager(sm)) // First request - run the stream to completion @@ -362,6 +364,7 @@ data: {"result":"ab-done"} t.Run("subscribe to non-existent stream returns 204", func(t *testing.T) { sm := core.NewInMemoryStreamManager() + defer sm.Close() handler := Handler(streamingFlow, WithStreamManager(sm)) req := httptest.NewRequest("POST", "/", strings.NewReader(`{"data":"test"}`)) From b023baf2c82bb94ee4cfbce81055db7a1dd659f3 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Tue, 16 Dec 2025 20:00:32 -0800 Subject: [PATCH 3/5] Update streaming.go --- go/core/streaming.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/core/streaming.go b/go/core/streaming.go index 2fa9606414..0f7c5281e7 100644 --- a/go/core/streaming.go +++ b/go/core/streaming.go @@ -273,7 +273,7 @@ func (s *inMemoryStreamInput) Write(chunk json.RawMessage) error { s.mu.Lock() if s.closed { s.mu.Unlock() - return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil) + return NewPublicError(FAILED_PRECONDITION, "stream writer is closed", nil) } s.mu.Unlock() @@ -281,7 +281,7 @@ func (s *inMemoryStreamInput) Write(chunk json.RawMessage) error { defer s.state.mu.Unlock() if s.state.status != streamStatusOpen { - return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil) + return NewPublicError(FAILED_PRECONDITION, "stream has already completed", nil) } s.state.chunks = append(s.state.chunks, chunk) @@ -303,7 +303,7 @@ func (s *inMemoryStreamInput) Done(output json.RawMessage) error { s.mu.Lock() if s.closed { s.mu.Unlock() - return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil) + return NewPublicError(FAILED_PRECONDITION, "stream writer is closed", nil) } s.closed = true s.mu.Unlock() @@ -312,7 +312,7 @@ func (s *inMemoryStreamInput) Done(output json.RawMessage) error { defer s.state.mu.Unlock() if s.state.status != streamStatusOpen { - return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil) + return NewPublicError(FAILED_PRECONDITION, "stream has already completed", nil) } s.state.status = streamStatusDone @@ -336,7 +336,7 @@ func (s *inMemoryStreamInput) Error(err error) error { s.mu.Lock() if s.closed { s.mu.Unlock() - return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil) + return NewPublicError(FAILED_PRECONDITION, "stream writer is closed", nil) } s.closed = true s.mu.Unlock() @@ -345,7 +345,7 @@ func (s *inMemoryStreamInput) Error(err error) error { defer s.state.mu.Unlock() if s.state.status != streamStatusOpen { - return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil) + return NewPublicError(FAILED_PRECONDITION, "stream has already completed", nil) } s.state.status = streamStatusError From ec9d601523e91d64eaada2602728c2a4841d78ab Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 17 Dec 2025 06:27:11 -0800 Subject: [PATCH 4/5] Update servers.go --- go/genkit/servers.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go/genkit/servers.go b/go/genkit/servers.go index db4b962f2b..62d16531c5 100644 --- a/go/genkit/servers.go +++ b/go/genkit/servers.go @@ -233,15 +233,13 @@ func runWithDurableStreaming(ctx context.Context, w http.ResponseWriter, a api.A w.Header().Set("X-Genkit-Stream-Id", streamID) callback := func(ctx context.Context, msg json.RawMessage) error { + durableStream.Write(msg) if err := writeSSEMessage(w, msg); err != nil { return err } if f, ok := w.(http.Flusher); ok { f.Flush() } - - // Fire-and-forget write to durable stream. - go durableStream.Write(msg) return nil } From 993fe3e87b3b339365043ea8cc9e0151be61e626 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 17 Dec 2025 06:59:03 -0800 Subject: [PATCH 5/5] Update servers.go --- go/genkit/servers.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/go/genkit/servers.go b/go/genkit/servers.go index 62d16531c5..ef46da6be1 100644 --- a/go/genkit/servers.go +++ b/go/genkit/servers.go @@ -36,7 +36,7 @@ import ( // HandlerOption configures a Handler. type HandlerOption interface { - applyHandler(*handlerOptions) + applyHandler(*handlerOptions) error } // handlerOptions are options for an action HTTP handler. @@ -45,20 +45,22 @@ type handlerOptions struct { StreamManager core.StreamManager // Optional manager for durable stream storage. } -func (o *handlerOptions) applyHandler(opts *handlerOptions) { +func (o *handlerOptions) applyHandler(opts *handlerOptions) error { if o.ContextProviders != nil { if opts.ContextProviders != nil { - panic("genkit.WithContextProviders: cannot set ContextProviders more than once") + return errors.New("cannot set ContextProviders more than once (WithContextProviders)") } opts.ContextProviders = o.ContextProviders } if o.StreamManager != nil { if opts.StreamManager != nil { - panic("genkit.WithStreamManager: cannot set StreamManager more than once") + return errors.New("cannot set StreamManager more than once (WithStreamManager)") } opts.StreamManager = o.StreamManager } + + return nil } // requestID is a unique ID for each request. @@ -87,7 +89,9 @@ func WithStreamManager(manager core.StreamManager) HandlerOption { func Handler(a api.Action, opts ...HandlerOption) http.HandlerFunc { options := &handlerOptions{} for _, opt := range opts { - opt.applyHandler(options) + if err := opt.applyHandler(options); err != nil { + panic(fmt.Errorf("genkit.Handler: error applying options: %w", err)) + } } return wrapHandler(handler(a, options))