Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions framework/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat: support raw response accumulation in stream accumulator
4 changes: 4 additions & 0 deletions framework/streaming/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (a *Accumulator) putChatStreamChunk(chunk *ChatStreamChunk) {
chunk.ErrorDetails = nil
chunk.FinishReason = nil
chunk.TokenUsage = nil
chunk.RawResponse = nil
a.chatStreamChunkPool.Put(chunk)
}

Expand All @@ -60,6 +61,7 @@ func (a *Accumulator) putAudioStreamChunk(chunk *AudioStreamChunk) {
chunk.ErrorDetails = nil
chunk.FinishReason = nil
chunk.TokenUsage = nil
chunk.RawResponse = nil
a.audioStreamChunkPool.Put(chunk)
}

Expand All @@ -77,6 +79,7 @@ func (a *Accumulator) putTranscriptionStreamChunk(chunk *TranscriptionStreamChun
chunk.ErrorDetails = nil
chunk.FinishReason = nil
chunk.TokenUsage = nil
chunk.RawResponse = nil
a.transcriptionStreamChunkPool.Put(chunk)
}

Expand All @@ -94,6 +97,7 @@ func (a *Accumulator) putResponsesStreamChunk(chunk *ResponsesStreamChunk) {
chunk.ErrorDetails = nil
chunk.FinishReason = nil
chunk.TokenUsage = nil
chunk.RawResponse = nil
a.responsesStreamChunkPool.Put(chunk)
}

Expand Down
19 changes: 19 additions & 0 deletions framework/streaming/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ func (a *Accumulator) processAccumulatedAudioStreamingChunks(requestID string, b
data.CacheDebug = lastChunk.SemanticCacheDebug
}
}
// Accumulate raw response
if len(accumulator.AudioStreamChunks) > 0 {
// Sort chunks by chunk index
sort.Slice(accumulator.AudioStreamChunks, func(i, j int) bool {
return accumulator.AudioStreamChunks[i].ChunkIndex < accumulator.AudioStreamChunks[j].ChunkIndex
})
for _, chunk := range accumulator.AudioStreamChunks {
if chunk.RawResponse != nil {
if data.RawResponse == nil {
data.RawResponse = bifrost.Ptr(*chunk.RawResponse)
} else {
*data.RawResponse += "\n\n" + *chunk.RawResponse
}
}
}
}
return data, nil
}

Expand Down Expand Up @@ -118,6 +134,9 @@ func (a *Accumulator) processAudioStreamingResponse(ctx *schemas.BifrostContext,
Audio: result.SpeechStreamResponse.Audio,
}
chunk.Delta = newDelta
if result.SpeechStreamResponse.ExtraFields.RawResponse != nil {
chunk.RawResponse = bifrost.Ptr(fmt.Sprintf("%v", result.SpeechStreamResponse.ExtraFields.RawResponse))
}
if result.SpeechStreamResponse.Usage != nil {
chunk.TokenUsage = result.SpeechStreamResponse.Usage
}
Expand Down
19 changes: 19 additions & 0 deletions framework/streaming/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ func (a *Accumulator) processAccumulatedChatStreamingChunks(requestID string, re
}
data.FinishReason = lastChunk.FinishReason
}
// Accumulate raw response
if len(accumulator.ChatStreamChunks) > 0 {
// Sort chunks by chunk index
sort.Slice(accumulator.ChatStreamChunks, func(i, j int) bool {
return accumulator.ChatStreamChunks[i].ChunkIndex < accumulator.ChatStreamChunks[j].ChunkIndex
})
for _, chunk := range accumulator.ChatStreamChunks {
if chunk.RawResponse != nil {
if data.RawResponse == nil {
data.RawResponse = bifrost.Ptr(*chunk.RawResponse)
} else {
*data.RawResponse += "\n\n" + *chunk.RawResponse
}
}
}
}
return data, nil
}

Expand Down Expand Up @@ -252,6 +268,9 @@ func (a *Accumulator) processChatStreamingResponse(ctx *schemas.BifrostContext,
chunk.TokenUsage = result.ChatResponse.Usage
}
chunk.ChunkIndex = result.ChatResponse.ExtraFields.ChunkIndex
if result.ChatResponse.ExtraFields.RawResponse != nil {
chunk.RawResponse = bifrost.Ptr(fmt.Sprintf("%v", result.ChatResponse.ExtraFields.RawResponse))
}
if isFinalChunk {
if a.pricingManager != nil {
cost := a.pricingManager.CalculateCostWithCacheDebug(result)
Expand Down
134 changes: 97 additions & 37 deletions framework/streaming/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,23 @@ func (a *Accumulator) processAccumulatedResponsesStreamingChunks(requestID strin
data.FinishReason = lastChunk.FinishReason
}

// Accumulate raw response
if len(accumulator.ResponsesStreamChunks) > 0 {
// Sort chunks by chunk index
sort.Slice(accumulator.ResponsesStreamChunks, func(i, j int) bool {
return accumulator.ResponsesStreamChunks[i].ChunkIndex < accumulator.ResponsesStreamChunks[j].ChunkIndex
})
for _, chunk := range accumulator.ResponsesStreamChunks {
if chunk.RawResponse != nil {
if data.RawResponse == nil {
data.RawResponse = bifrost.Ptr(*chunk.RawResponse)
} else {
*data.RawResponse += "\n\n" + *chunk.RawResponse
}
}
}
}

return data, nil
}

Expand All @@ -683,54 +700,94 @@ func (a *Accumulator) processResponsesStreamingResponse(ctx *schemas.BifrostCont

// For OpenAI-compatible providers, the last chunk already contains the whole accumulated response
// so just return it as is
// We maintain the accumulator only for raw response accumulation
if provider == schemas.OpenAI || provider == schemas.OpenRouter || (provider == schemas.Azure && !schemas.IsAnthropicModel(model)) {
isFinalChunk := bifrost.IsFinalChunk(ctx)
chunk := a.getResponsesStreamChunk()
chunk.Timestamp = time.Now()
chunk.ErrorDetails = bifrostErr
if bifrostErr != nil {
chunk.FinishReason = bifrost.Ptr("error")
} else if result != nil && result.ResponsesStreamResponse != nil {
if result.ResponsesStreamResponse.ExtraFields.RawResponse != nil {
rawResponse, ok := result.ResponsesStreamResponse.ExtraFields.RawResponse.(string)
if ok {
chunk.RawResponse = bifrost.Ptr(rawResponse)
}
}
}
if addErr := a.addResponsesStreamChunk(requestID, chunk, isFinalChunk); addErr != nil {
return nil, fmt.Errorf("failed to add responses stream chunk for request %s: %w", requestID, addErr)
}
if isFinalChunk {
// For OpenAI, the final chunk contains the complete response
// Extract the complete response and return it
if result != nil && result.ResponsesStreamResponse != nil {
// Build the complete response from the final chunk
data := &AccumulatedData{
RequestID: requestID,
Status: "success",
Stream: true,
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
Latency: result.GetExtraFields().Latency,
ErrorDetails: bifrostErr,
shouldProcess := false
// Get the accumulator to check if processing has already been triggered
accumulator := a.getOrCreateStreamAccumulator(requestID)
accumulator.mu.Lock()
shouldProcess = !accumulator.IsComplete
// Mark as complete when we're about to process
if shouldProcess {
accumulator.IsComplete = true
}
accumulator.mu.Unlock()

if shouldProcess {
accumulatedData, processErr := a.processAccumulatedResponsesStreamingChunks(requestID, bifrostErr, isFinalChunk)
if processErr != nil {
a.logger.Error("failed to process accumulated responses chunks for request %s: %v", requestID, processErr)
return nil, processErr
}

if bifrostErr != nil {
data.Status = "error"
}
// For OpenAI, the final chunk contains the complete response
// Extract the complete response and return it
if result != nil && result.ResponsesStreamResponse != nil {
// Build the complete response from the final chunk
data := &AccumulatedData{
RequestID: requestID,
Status: "success",
Stream: true,
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
Latency: result.GetExtraFields().Latency,
ErrorDetails: bifrostErr,
RawResponse: accumulatedData.RawResponse,
}

// Extract the complete response from the stream response
if result.ResponsesStreamResponse.Response != nil {
data.OutputMessages = result.ResponsesStreamResponse.Response.Output
if result.ResponsesStreamResponse.Response.Usage != nil {
// Convert ResponsesResponseUsage to schemas.LLMUsage
data.TokenUsage = &schemas.BifrostLLMUsage{
PromptTokens: result.ResponsesStreamResponse.Response.Usage.InputTokens,
CompletionTokens: result.ResponsesStreamResponse.Response.Usage.OutputTokens,
TotalTokens: result.ResponsesStreamResponse.Response.Usage.TotalTokens,
if bifrostErr != nil {
data.Status = "error"
}

// Extract the complete response from the stream response
if result.ResponsesStreamResponse.Response != nil {
data.OutputMessages = result.ResponsesStreamResponse.Response.Output
if result.ResponsesStreamResponse.Response.Usage != nil {
// Convert ResponsesResponseUsage to schemas.LLMUsage
data.TokenUsage = &schemas.BifrostLLMUsage{
PromptTokens: result.ResponsesStreamResponse.Response.Usage.InputTokens,
CompletionTokens: result.ResponsesStreamResponse.Response.Usage.OutputTokens,
TotalTokens: result.ResponsesStreamResponse.Response.Usage.TotalTokens,
}
}
}
}

if a.pricingManager != nil {
cost := a.pricingManager.CalculateCostWithCacheDebug(result)
data.Cost = bifrost.Ptr(cost)
}
if a.pricingManager != nil {
cost := a.pricingManager.CalculateCostWithCacheDebug(result)
data.Cost = bifrost.Ptr(cost)
}

return &ProcessedStreamResponse{
Type: StreamResponseTypeFinal,
RequestID: requestID,
StreamType: StreamTypeResponses,
Provider: provider,
Model: model,
Data: data,
}, nil
return &ProcessedStreamResponse{
Type: StreamResponseTypeFinal,
RequestID: requestID,
StreamType: StreamTypeResponses,
Provider: provider,
Model: model,
Data: data,
}, nil
} else {
return nil, nil
}
}
return nil, nil
}

// For non-final chunks from OpenAI, just pass through
Expand All @@ -753,6 +810,9 @@ func (a *Accumulator) processResponsesStreamingResponse(ctx *schemas.BifrostCont
if bifrostErr != nil {
chunk.FinishReason = bifrost.Ptr("error")
} else if result != nil && result.ResponsesStreamResponse != nil {
if result.ResponsesStreamResponse.ExtraFields.RawResponse != nil {
chunk.RawResponse = bifrost.Ptr(fmt.Sprintf("%v", result.ResponsesStreamResponse.ExtraFields.RawResponse))
}
// Store a deep copy of the stream response to prevent shared data mutation between plugins
chunk.StreamResponse = deepCopyResponsesStreamResponse(result.ResponsesStreamResponse)
// Extract token usage from stream response if available
Expand Down
41 changes: 33 additions & 8 deletions framework/streaming/transcription.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ func (a *Accumulator) processAccumulatedTranscriptionStreamingChunks(requestID s
data.CacheDebug = lastChunk.SemanticCacheDebug
}
}
// Accumulate raw response
if len(accumulator.TranscriptionStreamChunks) > 0 {
// Sort chunks by chunk index
sort.Slice(accumulator.TranscriptionStreamChunks, func(i, j int) bool {
return accumulator.TranscriptionStreamChunks[i].ChunkIndex < accumulator.TranscriptionStreamChunks[j].ChunkIndex
})
for _, chunk := range accumulator.TranscriptionStreamChunks {
if chunk.RawResponse != nil {
if data.RawResponse == nil {
data.RawResponse = bifrost.Ptr(*chunk.RawResponse)
} else {
*data.RawResponse += "\n\n" + *chunk.RawResponse
}
}
}
}
return data, nil
}

Expand All @@ -123,18 +139,27 @@ func (a *Accumulator) processTranscriptionStreamingResponse(ctx *schemas.Bifrost
if bifrostErr != nil {
chunk.FinishReason = bifrost.Ptr("error")
} else if result != nil && result.TranscriptionStreamResponse != nil {
// Set delta for all chunks (not just final chunks with usage)
// We create a deep copy of the delta to avoid pointing to stack memory
var deltaCopy *string
if result.TranscriptionStreamResponse.Delta != nil {
deltaValue := *result.TranscriptionStreamResponse.Delta
deltaCopy = &deltaValue
}
newDelta := &schemas.BifrostTranscriptionStreamResponse{
Type: result.TranscriptionStreamResponse.Type,
Delta: deltaCopy,
}
chunk.Delta = newDelta

// Set token usage if available (typically only in final chunk)
if result.TranscriptionStreamResponse.Usage != nil {
chunk.TokenUsage = result.TranscriptionStreamResponse.Usage

// For Transcription, entire delta is sent in the final chunk which also has usage information
// We create a deep copy of the delta to avoid pointing to stack memory
newDelta := &schemas.BifrostTranscriptionStreamResponse{
Type: result.TranscriptionStreamResponse.Type,
Delta: result.TranscriptionStreamResponse.Delta,
}
chunk.Delta = newDelta
}
chunk.ChunkIndex = result.TranscriptionStreamResponse.ExtraFields.ChunkIndex
if result.TranscriptionStreamResponse.ExtraFields.RawResponse != nil {
chunk.RawResponse = bifrost.Ptr(fmt.Sprintf("%v", result.TranscriptionStreamResponse.ExtraFields.RawResponse))
}
if isFinalChunk {
if a.pricingManager != nil {
cost := a.pricingManager.CalculateCostWithCacheDebug(result)
Expand Down
5 changes: 5 additions & 0 deletions framework/streaming/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type AccumulatedData struct {
AudioOutput *schemas.BifrostSpeechResponse
TranscriptionOutput *schemas.BifrostTranscriptionResponse
FinishReason *string
RawResponse *string
}

// AudioStreamChunk represents a single streaming chunk
Expand All @@ -55,6 +56,7 @@ type AudioStreamChunk struct {
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string
}

// TranscriptionStreamChunk represents a single transcription streaming chunk
Expand All @@ -67,6 +69,7 @@ type TranscriptionStreamChunk struct {
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string
}

// ChatStreamChunk represents a single streaming chunk
Expand All @@ -79,6 +82,7 @@ type ChatStreamChunk struct {
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string // Raw response if available
}

// ResponsesStreamChunk represents a single responses streaming chunk
Expand All @@ -91,6 +95,7 @@ type ResponsesStreamChunk struct {
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string
}

// StreamAccumulator manages accumulation of streaming chunks
Expand Down
4 changes: 4 additions & 0 deletions plugins/logging/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ func (p *LoggerPlugin) updateStreamingLogEntry(
updates["responses_output"] = tempEntry.ResponsesOutput
}
}
// Handle raw response from stream updates
if streamResponse.Data.RawResponse != nil {
updates["raw_response"] = *streamResponse.Data.RawResponse
}
}
// Only perform update if there's something to update
if len(updates) > 0 {
Expand Down
1 change: 1 addition & 0 deletions transports/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat: support for raw response accumulation for streaming
13 changes: 5 additions & 8 deletions ui/app/workspace/logs/views/logDetailsSheet.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,11 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet
)}

{(log.transcription_input || log.transcription_output) && (
<>
<div className="mt-4 w-full text-center text-sm font-medium">Transcription</div>
<TranscriptionView
transcriptionInput={log.transcription_input}
transcriptionOutput={log.transcription_output}
isStreaming={log.stream}
/>
</>
<TranscriptionView
transcriptionInput={log.transcription_input}
transcriptionOutput={log.transcription_output}
isStreaming={log.stream}
/>
)}

{/* Show conversation history for chat/text completions */}
Expand Down
Loading