Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
- chore: added case-insensitive helper methods for header and query parameter lookups in HTTPRequest
- feat: added support for path parameter lookups in HTTPRequest
- fix: missing request type in error response for anthropic SDK integration
- fix: missing request type in error response for anthropic SDK integration
- feat: add raw request data to bifrost error responses
- fix: add support for AdditionalProperties structures (both boolean and object types)
- fix: improve thought signature handling in gemini for function calls
- fix: enhance citations structure to support multiple citation types
- fix: anthropic streaming events through integration
20 changes: 12 additions & 8 deletions core/internal/testutil/structured_outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,12 @@ func RunStructuredOutputResponsesTest(t *testing.T, client *bifrost.Bifrost, ctx
Type: "json_schema",
Name: bifrost.Ptr("decision_schema"),
JSONSchema: &schemas.ResponsesTextConfigFormatJSONSchema{
Type: &typeStr,
Properties: &props,
Required: structuredOutputSchema["required"].([]string),
AdditionalProperties: &additionalProps,
Type: &typeStr,
Properties: &props,
Required: structuredOutputSchema["required"].([]string),
AdditionalProperties: &schemas.AdditionalPropertiesStruct{
AdditionalPropertiesBool: &additionalProps,
},
},
},
},
Expand Down Expand Up @@ -532,10 +534,12 @@ func RunStructuredOutputResponsesStreamTest(t *testing.T, client *bifrost.Bifros
Type: "json_schema",
Name: bifrost.Ptr("decision_schema"),
JSONSchema: &schemas.ResponsesTextConfigFormatJSONSchema{
Type: &typeStr,
Properties: &props,
Required: structuredOutputSchema["required"].([]string),
AdditionalProperties: &additionalProps,
Type: &typeStr,
Properties: &props,
Required: structuredOutputSchema["required"].([]string),
AdditionalProperties: &schemas.AdditionalPropertiesStruct{
AdditionalPropertiesBool: &additionalProps,
},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion core/providers/anthropic/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func ToAnthropicChatRequest(bifrostReq *schemas.BifrostChatRequest) (*AnthropicM
toolMsg := messages[i]
if toolMsg.ChatToolMessage != nil && toolMsg.ChatToolMessage.ToolCallID != nil {
toolResult := AnthropicContentBlock{
Type: "tool_result",
Type: AnthropicContentBlockTypeToolResult,
ToolUseID: toolMsg.ChatToolMessage.ToolCallID,
}

Expand Down
202 changes: 178 additions & 24 deletions core/providers/anthropic/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ var anthropicResponsesStreamStatePool = sync.Pool{
},
}

// webSearchItemIDs tracks item IDs for WebSearch tools to skip their argument deltas
// Maps item_id (string) -> true for WebSearch tools that need delta skipping
var webSearchItemIDs sync.Map

// acquireAnthropicResponsesStreamState gets an Anthropic responses stream state from the pool.
func acquireAnthropicResponsesStreamState() *AnthropicResponsesStreamState {
state := anthropicResponsesStreamStatePool.Get().(*AnthropicResponsesStreamState)
Expand Down Expand Up @@ -895,10 +899,10 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
// Computer tool - emit content_block_start
streamResp.Type = AnthropicStreamEventTypeContentBlockStart

if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
} else if bifrostResp.OutputIndex != nil {
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}

// Build the content_block as tool_use
Expand All @@ -916,10 +920,11 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
} else {
// Text or other content blocks - emit content_block_start
streamResp.Type = AnthropicStreamEventTypeContentBlockStart
if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
} else if bifrostResp.OutputIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}

// Build content_block based on item type
Expand All @@ -933,6 +938,7 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
case schemas.ResponsesMessageTypeReasoning:
contentBlock.Type = AnthropicContentBlockTypeThinking
contentBlock.Thinking = schemas.Ptr("")
contentBlock.Signature = schemas.Ptr("")
// Preserve signature if present
if bifrostResp.Item.ResponsesReasoning != nil && bifrostResp.Item.ResponsesReasoning.EncryptedContent != nil && *bifrostResp.Item.ResponsesReasoning.EncryptedContent != "" {
contentBlock.Data = bifrostResp.Item.ResponsesReasoning.EncryptedContent
Expand All @@ -948,6 +954,7 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
// This is actually reasoning content, not a function call
contentBlock.Type = AnthropicContentBlockTypeThinking
contentBlock.Thinking = schemas.Ptr("")
contentBlock.Signature = schemas.Ptr("")
// Check if there's encrypted content for redacted_thinking
if bifrostResp.Item.ResponsesReasoning.EncryptedContent != nil && *bifrostResp.Item.ResponsesReasoning.EncryptedContent != "" {
contentBlock.Type = AnthropicContentBlockTypeRedactedThinking
Expand Down Expand Up @@ -977,13 +984,21 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
if isFirstBlock && hasReasoningInResponse {
contentBlock.Type = AnthropicContentBlockTypeThinking
contentBlock.Thinking = schemas.Ptr("")
contentBlock.Signature = schemas.Ptr("")
} else {
contentBlock.Type = AnthropicContentBlockTypeToolUse
if bifrostResp.Item.ResponsesToolMessage != nil {
contentBlock.ID = bifrostResp.Item.ResponsesToolMessage.CallID
contentBlock.Name = bifrostResp.Item.ResponsesToolMessage.Name
// Always start with empty input for streaming compatibility
contentBlock.Input = map[string]interface{}{}

// Track WebSearch tools so we can skip their argument deltas
if bifrostResp.Item.ResponsesToolMessage.Name != nil &&
*bifrostResp.Item.ResponsesToolMessage.Name == "WebSearch" &&
bifrostResp.Item.ID != nil {
webSearchItemIDs.Store(*bifrostResp.Item.ID, true)
}
}
}
}
Expand Down Expand Up @@ -1036,9 +1051,40 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
}
}

// Sanitize websearch tool arguments to remove both allowed_domains and blocked_domains
// Anthropic only allows one or the other, not both
if shouldGenerateDeltas && argumentsJSON != "" {
// Check if this is a websearch tool
if bifrostResp.Item.ResponsesToolMessage.Name != nil &&
*bifrostResp.Item.ResponsesToolMessage.Name == "WebSearch" {
// Parse the JSON to check for conflicting domain filters
var toolArgs map[string]interface{}
if err := json.Unmarshal([]byte(argumentsJSON), &toolArgs); err == nil {
_, hasAllowed := toolArgs["allowed_domains"]
_, hasBlocked := toolArgs["blocked_domains"]

// If both domain filters exist, remove blocked_domains and keep allowed_domains
// This prioritizes the allowed list over the blocked list
if hasAllowed && hasBlocked {
delete(toolArgs, "blocked_domains")

// Re-marshal the sanitized arguments
if sanitizedBytes, err := json.Marshal(toolArgs); err == nil {
argumentsJSON = string(sanitizedBytes)
}
}
}
}

// Generate synthetic input_json_delta events by chunking the JSON
deltaEvents := generateSyntheticInputJSONDeltas(argumentsJSON, bifrostResp.ContentIndex)
// Use OutputIndex for proper Anthropic indexing, fallback to ContentIndex
var indexToUse *int
if bifrostResp.OutputIndex != nil {
indexToUse = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
indexToUse = bifrostResp.ContentIndex
}
deltaEvents := generateSyntheticInputJSONDeltas(argumentsJSON, indexToUse)
events = append(events, deltaEvents...)
}
}
Expand All @@ -1049,7 +1095,11 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema

case schemas.ResponsesStreamResponseTypeOutputTextDelta:
streamResp.Type = AnthropicStreamEventTypeContentBlockDelta
if bifrostResp.ContentIndex != nil {
// Use OutputIndex instead of ContentIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
// Fallback to ContentIndex if OutputIndex not available
streamResp.Index = bifrostResp.ContentIndex
}
if bifrostResp.Delta != nil {
Expand All @@ -1060,8 +1110,19 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
}

case schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDelta:
// Skip WebSearch tool argument deltas - they will be sent synthetically in output_item.done
// after sanitization to remove conflicting allowed_domains and blocked_domains
if bifrostResp.ItemID != nil {
if _, isWebSearch := webSearchItemIDs.Load(*bifrostResp.ItemID); isWebSearch {
return nil
}
}

streamResp.Type = AnthropicStreamEventTypeContentBlockDelta
if bifrostResp.ContentIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}
if bifrostResp.Arguments != nil {
Expand All @@ -1079,7 +1140,10 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema

case schemas.ResponsesStreamResponseTypeReasoningSummaryTextDelta:
streamResp.Type = AnthropicStreamEventTypeContentBlockDelta
if bifrostResp.ContentIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}

Expand All @@ -1102,6 +1166,65 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
return nil

case schemas.ResponsesStreamResponseTypeOutputItemDone:
// Handle WebSearch tool completion with sanitization and synthetic delta generation
if bifrostResp.Item != nil &&
bifrostResp.Item.Type != nil &&
*bifrostResp.Item.Type == schemas.ResponsesMessageTypeFunctionCall &&
bifrostResp.Item.ResponsesToolMessage != nil &&
bifrostResp.Item.ResponsesToolMessage.Name != nil &&
*bifrostResp.Item.ResponsesToolMessage.Name == "WebSearch" &&
bifrostResp.Item.ResponsesToolMessage.Arguments != nil {

argumentsJSON := *bifrostResp.Item.ResponsesToolMessage.Arguments

// Parse the arguments JSON
var toolArgs map[string]interface{}
if err := json.Unmarshal([]byte(argumentsJSON), &toolArgs); err == nil {
_, hasAllowed := toolArgs["allowed_domains"]
_, hasBlocked := toolArgs["blocked_domains"]

// If both domain filters exist, remove blocked_domains and keep allowed_domains
if hasAllowed && hasBlocked {
delete(toolArgs, "blocked_domains")

// Re-marshal the sanitized arguments
if sanitizedBytes, err := json.Marshal(toolArgs); err == nil {
argumentsJSON = string(sanitizedBytes)
bifrostResp.Item.ResponsesToolMessage.Arguments = &argumentsJSON
}
}
}

// Generate synthetic input_json_delta events for the sanitized WebSearch arguments
// This replaces the delta events that were skipped earlier
var events []*AnthropicStreamEvent

// Use OutputIndex for proper Anthropic indexing, fallback to ContentIndex
var indexToUse *int
if bifrostResp.OutputIndex != nil {
indexToUse = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
indexToUse = bifrostResp.ContentIndex
}

deltaEvents := generateSyntheticInputJSONDeltas(argumentsJSON, indexToUse)
events = append(events, deltaEvents...)

// Add the content_block_stop event at the end
stopEvent := &AnthropicStreamEvent{
Type: AnthropicStreamEventTypeContentBlockStop,
Index: indexToUse,
}
events = append(events, stopEvent)

// Clean up the tracking for this WebSearch item
if bifrostResp.Item.ID != nil {
webSearchItemIDs.Delete(*bifrostResp.Item.ID)
}

return events
}

if bifrostResp.Item != nil &&
bifrostResp.Item.Type != nil &&
*bifrostResp.Item.Type == schemas.ResponsesMessageTypeComputerCall {
Expand All @@ -1110,10 +1233,11 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
// Note: We're sending the complete action JSON in one delta
streamResp.Type = AnthropicStreamEventTypeContentBlockDelta

if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
} else if bifrostResp.OutputIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}

// Convert the action to Anthropic format and marshal to JSON
Expand All @@ -1137,10 +1261,11 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
} else {
// For text blocks and other content blocks, emit content_block_stop
streamResp.Type = AnthropicStreamEventTypeContentBlockStop
if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
} else if bifrostResp.OutputIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}
}
case schemas.ResponsesStreamResponseTypePing:
Expand All @@ -1150,6 +1275,10 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
streamResp.Type = AnthropicStreamEventTypeMessageStop
anthropicContentDeltaEvent := &AnthropicStreamEvent{
Type: AnthropicStreamEventTypeMessageDelta,
Delta: &AnthropicStreamDelta{
StopReason: schemas.Ptr(AnthropicStopReasonEndTurn),
StopSequence: schemas.Ptr(""),
},
}
if bifrostResp.Response.Usage != nil {
anthropicContentDeltaEvent.Usage = &AnthropicUsage{
Expand All @@ -1165,18 +1294,20 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
}
if bifrostResp.Response.StopReason != nil {
anthropicContentDeltaEvent.Delta = &AnthropicStreamDelta{
StopReason: schemas.Ptr(ConvertBifrostFinishReasonToAnthropic(*bifrostResp.Response.StopReason)),
StopReason: schemas.Ptr(ConvertBifrostFinishReasonToAnthropic(*bifrostResp.Response.StopReason)),
StopSequence: nil,
}
}
return []*AnthropicStreamEvent{anthropicContentDeltaEvent, streamResp}

case schemas.ResponsesStreamResponseTypeMCPCallArgumentsDelta:
// MCP call arguments delta - convert to content_block_delta with input_json
streamResp.Type = AnthropicStreamEventTypeContentBlockDelta
if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
} else if bifrostResp.OutputIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}
if bifrostResp.Delta != nil {
streamResp.Delta = &AnthropicStreamDelta{
Expand All @@ -1194,10 +1325,11 @@ func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schema
case schemas.ResponsesStreamResponseTypeMCPCallCompleted:
// MCP call completed - emit content_block_stop
streamResp.Type = AnthropicStreamEventTypeContentBlockStop
if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
} else if bifrostResp.OutputIndex != nil {
// Use OutputIndex for global Anthropic indexing
if bifrostResp.OutputIndex != nil {
streamResp.Index = bifrostResp.OutputIndex
} else if bifrostResp.ContentIndex != nil {
streamResp.Index = bifrostResp.ContentIndex
}

case schemas.ResponsesStreamResponseTypeMCPCallFailed:
Expand Down Expand Up @@ -2777,7 +2909,29 @@ func convertBifrostFunctionCallToAnthropicToolUse(msg *schemas.ResponsesMessage)

// Parse arguments as JSON input
if msg.ResponsesToolMessage.Arguments != nil && *msg.ResponsesToolMessage.Arguments != "" {
toolUseBlock.Input = parseJSONInput(*msg.ResponsesToolMessage.Arguments)
argumentsJSON := *msg.ResponsesToolMessage.Arguments

// Sanitize WebSearch tool arguments to remove both allowed_domains and blocked_domains
// Anthropic only allows one or the other, not both
if msg.ResponsesToolMessage.Name != nil && *msg.ResponsesToolMessage.Name == "WebSearch" {
var toolArgs map[string]interface{}
if err := json.Unmarshal([]byte(argumentsJSON), &toolArgs); err == nil {
_, hasAllowed := toolArgs["allowed_domains"]
_, hasBlocked := toolArgs["blocked_domains"]

// If both domain filters exist, remove blocked_domains and keep allowed_domains
if hasAllowed && hasBlocked {
delete(toolArgs, "blocked_domains")

// Re-marshal the sanitized arguments
if sanitizedBytes, err := json.Marshal(toolArgs); err == nil {
argumentsJSON = string(sanitizedBytes)
}
}
}
}

toolUseBlock.Input = parseJSONInput(argumentsJSON)
}

return &toolUseBlock
Expand Down
Loading