Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 350 additions & 0 deletions go/core/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package core

import (
"context"
"encoding/json"
"sync"
"time"
)

// StreamEventType indicates the type of stream event.
type StreamEventType int

const (
StreamEventChunk StreamEventType = iota
StreamEventDone
StreamEventError
)

// StreamEvent represents an event in a durable stream.
type StreamEvent struct {
Type StreamEventType
Chunk json.RawMessage // set when Type == StreamEventChunk
Output json.RawMessage // set when Type == StreamEventDone
Err error // set when Type == StreamEventError
}

// ActionStreamInput provides methods for writing to a durable stream.
type ActionStreamInput interface {
// Write sends a chunk to the stream and notifies all subscribers.
Write(chunk json.RawMessage) error
// Done marks the stream as successfully completed with the given output.
Done(output json.RawMessage) error
// Error marks the stream as failed with the given error.
Error(err error) error
// Close releases resources without marking the stream as done or errored.
Close() error
}

// StreamManager manages durable streams, allowing creation and subscription.
// Implementations can provide different storage backends (e.g., in-memory, database, cache).
type StreamManager interface {
// Open creates a new stream for writing.
// Returns an error if a stream with the given ID already exists.
Open(ctx context.Context, streamID string) (ActionStreamInput, error)
// Subscribe subscribes to an existing stream.
// Returns a channel that receives stream events, an unsubscribe function, and an error.
// If the stream has already completed, all buffered events are sent before the done/error event.
// Returns NOT_FOUND error if the stream doesn't exist.
Subscribe(ctx context.Context, streamID string) (<-chan StreamEvent, func(), error)
}

// streamStatus represents the current state of a stream.
type streamStatus int

const (
streamStatusOpen streamStatus = iota
streamStatusDone
streamStatusError
)

// streamState holds the internal state of a single stream.
type streamState struct {
status streamStatus
chunks []json.RawMessage
output json.RawMessage
err error
subscribers []chan StreamEvent
lastTouched time.Time
mu sync.RWMutex
}

// InMemoryStreamManager is an in-memory implementation of StreamManager.
// Useful for testing or single-instance deployments where persistence is not required.
type InMemoryStreamManager struct {
streams map[string]*streamState
mu sync.RWMutex
ttl time.Duration
cleanupMu sync.Mutex
lastCleanup time.Time
}

// StreamManagerOption configures an InMemoryStreamManager.
type StreamManagerOption interface {
applyInMemoryStreamManager(*streamManagerOptions)
}

// streamManagerOptions holds configuration for InMemoryStreamManager.
type streamManagerOptions struct {
TTL time.Duration // Time-to-live for completed streams.
}

func (o *streamManagerOptions) applyInMemoryStreamManager(opts *streamManagerOptions) {
if o.TTL > 0 {
opts.TTL = o.TTL
}
}

// WithTTL sets the time-to-live for completed streams.
// Streams that have completed (done or error) will be cleaned up after this duration.
// Default is 5 minutes.
func WithTTL(ttl time.Duration) StreamManagerOption {
return &streamManagerOptions{TTL: ttl}
}

// NewInMemoryStreamManager creates a new InMemoryStreamManager.
func NewInMemoryStreamManager(opts ...StreamManagerOption) *InMemoryStreamManager {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some unit tests for the in memory stream manager?

options := &streamManagerOptions{
TTL: 5 * time.Minute,
}
for _, opt := range opts {
opt.applyInMemoryStreamManager(options)
}
return &InMemoryStreamManager{
streams: make(map[string]*streamState),
ttl: options.TTL,
}
}

// cleanup removes expired streams. Called periodically during operations.
func (m *InMemoryStreamManager) cleanup() {
m.cleanupMu.Lock()
if time.Since(m.lastCleanup) < time.Minute {
m.cleanupMu.Unlock()
return
}
m.lastCleanup = time.Now()
m.cleanupMu.Unlock()

now := time.Now()
m.mu.Lock()
defer m.mu.Unlock()

for id, state := range m.streams {
state.mu.RLock()
shouldDelete := state.status != streamStatusOpen && now.Sub(state.lastTouched) > m.ttl
state.mu.RUnlock()
if shouldDelete {
delete(m.streams, id)
}
}
}

// Open creates a new stream for writing.
func (m *InMemoryStreamManager) Open(ctx context.Context, streamID string) (ActionStreamInput, error) {
m.cleanup()

m.mu.Lock()
defer m.mu.Unlock()

if _, exists := m.streams[streamID]; exists {
return nil, NewPublicError(ALREADY_EXISTS, "stream already exists", nil)
}

state := &streamState{
status: streamStatusOpen,
chunks: make([]json.RawMessage, 0),
subscribers: make([]chan StreamEvent, 0),
lastTouched: time.Now(),
}
m.streams[streamID] = state

return &inMemoryStreamInput{
manager: m,
streamID: streamID,
state: state,
}, nil
}

// Subscribe subscribes to an existing stream.
func (m *InMemoryStreamManager) Subscribe(ctx context.Context, streamID string) (<-chan StreamEvent, func(), error) {
m.mu.RLock()
state, exists := m.streams[streamID]
m.mu.RUnlock()

if !exists {
return nil, nil, NewPublicError(NOT_FOUND, "stream not found", nil)
}

ch := make(chan StreamEvent, 100)

state.mu.Lock()
defer state.mu.Unlock()

// Send all buffered chunks
for _, chunk := range state.chunks {
select {
case ch <- StreamEvent{Type: StreamEventChunk, Chunk: chunk}:
case <-ctx.Done():
close(ch)
return nil, nil, ctx.Err()
}
}

// Handle completed streams
switch state.status {
case streamStatusDone:
ch <- StreamEvent{Type: StreamEventDone, Output: state.output}
close(ch)
return ch, func() {}, nil
case streamStatusError:
ch <- StreamEvent{Type: StreamEventError, Err: state.err}
close(ch)
return ch, func() {}, nil
}

// Stream is still open, add subscriber
state.subscribers = append(state.subscribers, ch)

unsubscribe := func() {
state.mu.Lock()
defer state.mu.Unlock()
for i, sub := range state.subscribers {
if sub == ch {
state.subscribers = append(state.subscribers[:i], state.subscribers[i+1:]...)
close(ch)
break
}
}
}

return ch, unsubscribe, nil
}

// inMemoryStreamInput implements ActionStreamInput for the in-memory manager.
type inMemoryStreamInput struct {
manager *InMemoryStreamManager
streamID string
state *streamState
closed bool
mu sync.Mutex
}

func (s *inMemoryStreamInput) Write(chunk json.RawMessage) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil)
}
s.mu.Unlock()

s.state.mu.Lock()
defer s.state.mu.Unlock()

if s.state.status != streamStatusOpen {
return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil)
}

s.state.chunks = append(s.state.chunks, chunk)
s.state.lastTouched = time.Now()

event := StreamEvent{Type: StreamEventChunk, Chunk: chunk}
for _, ch := range s.state.subscribers {
select {
case ch <- event:
default:
// Channel full, skip (subscriber is slow)
}
}

return nil
}

func (s *inMemoryStreamInput) Done(output json.RawMessage) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil)
}
s.closed = true
s.mu.Unlock()

s.state.mu.Lock()
defer s.state.mu.Unlock()

if s.state.status != streamStatusOpen {
return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil)
}

s.state.status = streamStatusDone
s.state.output = output
s.state.lastTouched = time.Now()

event := StreamEvent{Type: StreamEventDone, Output: output}
for _, ch := range s.state.subscribers {
select {
case ch <- event:
default:
}
close(ch)
}
s.state.subscribers = nil

return nil
}

func (s *inMemoryStreamInput) Error(err error) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return NewPublicError(FAILED_PRECONDITION, "stream is closed", nil)
}
s.closed = true
s.mu.Unlock()

s.state.mu.Lock()
defer s.state.mu.Unlock()

if s.state.status != streamStatusOpen {
return NewPublicError(FAILED_PRECONDITION, "stream is not open", nil)
}

s.state.status = streamStatusError
s.state.err = err
s.state.lastTouched = time.Now()

event := StreamEvent{Type: StreamEventError, Err: err}
for _, ch := range s.state.subscribers {
select {
case ch <- event:
default:
}
close(ch)
}
s.state.subscribers = nil

return nil
}

func (s *inMemoryStreamInput) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
return nil
}
Loading
Loading