Skip to content

Commit 4f289b9

Browse files
feat: anthropic integration convertor enhancements
1 parent fb3bf4b commit 4f289b9

File tree

11 files changed

+294
-413
lines changed

11 files changed

+294
-413
lines changed

core/providers/anthropic/responses.go

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,29 @@ func (chunk *AnthropicStreamEvent) ToBifrostResponsesStream(ctx context.Context,
753753
if chunk.Delta.StopReason != nil {
754754
state.StopReason = schemas.Ptr(ConvertAnthropicFinishReasonToBifrost(*chunk.Delta.StopReason))
755755
}
756+
// if ctx.Value(schemas.BifrostContextKeyIntegrationType) == "anthropic" {
757+
// var usage *schemas.ResponsesResponseUsage
758+
// if chunk.Usage != nil {
759+
// usage = &schemas.ResponsesResponseUsage{
760+
// InputTokens: chunk.Usage.InputTokens,
761+
// OutputTokens: chunk.Usage.OutputTokens,
762+
// TotalTokens: chunk.Usage.InputTokens + chunk.Usage.OutputTokens,
763+
// InputTokensDetails: &schemas.ResponsesResponseInputTokens{
764+
// CachedTokens: chunk.Usage.CacheReadInputTokens,
765+
// },
766+
// OutputTokensDetails: &schemas.ResponsesResponseOutputTokens{
767+
// CachedTokens: chunk.Usage.CacheCreationInputTokens,
768+
// },
769+
// }
770+
// }
771+
// return []*schemas.BifrostResponsesStreamResponse{{
772+
// Type: "anthropic.message_delta",
773+
// SequenceNumber: sequenceNumber,
774+
// Response: &schemas.BifrostResponsesResponse{
775+
// Usage: usage,
776+
// },
777+
// }}, nil, false
778+
// }
756779
// Message-level updates (like stop reason, usage, etc.)
757780
// Note: We don't emit output_item.done here because items are already closed
758781
// by content_block_stop. This event is informational only.
@@ -808,7 +831,7 @@ func (chunk *AnthropicStreamEvent) ToBifrostResponsesStream(ctx context.Context,
808831
}
809832

810833
// ToAnthropicResponsesStreamResponse converts a Bifrost Responses stream response to Anthropic SSE string format
811-
func ToAnthropicResponsesStreamResponse(bifrostResp *schemas.BifrostResponsesStreamResponse) []*AnthropicStreamEvent {
834+
func ToAnthropicResponsesStreamResponse(ctx context.Context, bifrostResp *schemas.BifrostResponsesStreamResponse) []*AnthropicStreamEvent {
812835
if bifrostResp == nil {
813836
return nil
814837
}
@@ -1189,6 +1212,17 @@ func ToAnthropicResponsesStreamResponse(bifrostResp *schemas.BifrostResponsesStr
11891212
}
11901213
}
11911214

1215+
// case "anthropic.message_delta":
1216+
// if ctx.Value(schemas.BifrostContextKeyIntegrationType) == "anthropic" {
1217+
// streamResp.Type = AnthropicStreamEventTypeMessageDelta
1218+
// if bifrostResp.Response != nil && bifrostResp.Response.Usage != nil {
1219+
// streamResp.Usage = &AnthropicUsage{
1220+
// InputTokens: bifrostResp.Response.Usage.InputTokens,
1221+
// OutputTokens: bifrostResp.Response.Usage.OutputTokens,
1222+
// }
1223+
// }
1224+
// }
1225+
11921226
default:
11931227
// Unknown event type, return empty
11941228
return nil
@@ -1230,16 +1264,13 @@ func (request *AnthropicMessageRequest) ToBifrostResponsesRequest() *schemas.Bif
12301264
if request.StopSequences != nil {
12311265
params.ExtraParams["stop"] = request.StopSequences
12321266
}
1233-
if request.Thinking != nil {
1234-
params.ExtraParams["thinking"] = request.Thinking
1235-
}
12361267
if request.OutputFormat != nil {
12371268
params.Text = convertAnthropicOutputFormatToResponsesTextConfig(request.OutputFormat)
12381269
}
12391270
if request.Thinking != nil {
12401271
if request.Thinking.Type == "enabled" {
12411272
params.Reasoning = &schemas.ResponsesParametersReasoning{
1242-
Effort: schemas.Ptr("auto"),
1273+
Effort: schemas.Ptr("medium"), // TODO: add a relative measure with budget tokens and max tokens
12431274
MaxTokens: request.Thinking.BudgetTokens,
12441275
}
12451276
} else {
@@ -1616,6 +1647,23 @@ func ConvertBifrostMessagesToAnthropicMessages(bifrostMessages []schemas.Respons
16161647
continue
16171648
}
16181649

1650+
// If there are pending reasoning blocks and this is a user message,
1651+
// flush them into a separate assistant message first
1652+
// (thinking blocks can only appear in assistant messages in Anthropic)
1653+
if len(pendingReasoningContentBlocks) > 0 && (msg.Role == nil || *msg.Role == schemas.ResponsesInputMessageRoleUser) {
1654+
// Copy the pending reasoning content blocks
1655+
copied := make([]AnthropicContentBlock, len(pendingReasoningContentBlocks))
1656+
copy(copied, pendingReasoningContentBlocks)
1657+
assistantReasoningMsg := AnthropicMessage{
1658+
Role: AnthropicMessageRoleAssistant,
1659+
Content: AnthropicContent{
1660+
ContentBlocks: copied,
1661+
},
1662+
}
1663+
anthropicMessages = append(anthropicMessages, assistantReasoningMsg)
1664+
pendingReasoningContentBlocks = nil
1665+
}
1666+
16191667
// Regular user/assistant message
16201668
anthropicMsg := convertBifrostMessageToAnthropicMessage(&msg, &pendingReasoningContentBlocks)
16211669
if anthropicMsg != nil {
@@ -2072,7 +2120,8 @@ func convertBifrostMessageToAnthropicMessage(msg *schemas.ResponsesMessage, pend
20722120
}
20732121

20742122
// Add any pending reasoning content blocks to the message
2075-
if len(*pendingReasoningContentBlocks) > 0 {
2123+
// Only add reasoning blocks to assistant messages (thinking blocks can only appear in assistant messages in Anthropic)
2124+
if len(*pendingReasoningContentBlocks) > 0 && anthropicMsg.Role == AnthropicMessageRoleAssistant {
20762125
// copy the pending reasoning content blocks
20772126
copied := make([]AnthropicContentBlock, len(*pendingReasoningContentBlocks))
20782127
copy(copied, *pendingReasoningContentBlocks)

core/providers/anthropic/types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,9 @@ type AnthropicTextResponse struct {
298298
// AnthropicUsage represents usage information in Anthropic format
299299
type AnthropicUsage struct {
300300
InputTokens int `json:"input_tokens"`
301-
CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"`
302-
CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"`
303-
CacheCreation AnthropicUsageCacheCreation `json:"cache_creation,omitempty"`
301+
CacheCreationInputTokens int `json:"cache_creation_input_tokens"`
302+
CacheReadInputTokens int `json:"cache_read_input_tokens"`
303+
CacheCreation AnthropicUsageCacheCreation `json:"cache_creation"`
304304
OutputTokens int `json:"output_tokens"`
305305
}
306306

core/providers/bedrock/responses.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,158 @@ func (chunk *BedrockStreamEvent) ToBifrostResponsesStream(sequenceNumber int, st
215215
if chunk.Start.ToolUse != nil {
216216
var responses []*schemas.BifrostResponsesStreamResponse
217217

218+
// Close any open reasoning blocks first (Anthropic sends content_block_stop before starting new blocks)
219+
for prevContentIndex := range state.ReasoningContentIndices {
220+
prevOutputIndex, prevExists := state.ContentIndexToOutputIndex[prevContentIndex]
221+
if !prevExists {
222+
continue
223+
}
224+
225+
// Skip already completed output indices
226+
if state.CompletedOutputIndices[prevOutputIndex] {
227+
continue
228+
}
229+
230+
itemID := state.ItemIDs[prevOutputIndex]
231+
232+
// For reasoning items, content_index is always 0
233+
reasoningContentIndex := 0
234+
235+
// Emit reasoning_summary_text.done
236+
emptyText := ""
237+
reasoningDoneResponse := &schemas.BifrostResponsesStreamResponse{
238+
Type: schemas.ResponsesStreamResponseTypeReasoningSummaryTextDone,
239+
SequenceNumber: sequenceNumber + len(responses),
240+
OutputIndex: schemas.Ptr(prevOutputIndex),
241+
ContentIndex: &reasoningContentIndex,
242+
Text: &emptyText,
243+
}
244+
if itemID != "" {
245+
reasoningDoneResponse.ItemID = &itemID
246+
}
247+
responses = append(responses, reasoningDoneResponse)
248+
249+
// Emit content_part.done for reasoning
250+
partDoneResponse := &schemas.BifrostResponsesStreamResponse{
251+
Type: schemas.ResponsesStreamResponseTypeContentPartDone,
252+
SequenceNumber: sequenceNumber + len(responses),
253+
OutputIndex: schemas.Ptr(prevOutputIndex),
254+
ContentIndex: &reasoningContentIndex,
255+
}
256+
if itemID != "" {
257+
partDoneResponse.ItemID = &itemID
258+
}
259+
responses = append(responses, partDoneResponse)
260+
261+
// Emit output_item.done for reasoning
262+
statusCompleted := "completed"
263+
doneItem := &schemas.ResponsesMessage{
264+
Status: &statusCompleted,
265+
}
266+
if itemID != "" {
267+
doneItem.ID = &itemID
268+
}
269+
responses = append(responses, &schemas.BifrostResponsesStreamResponse{
270+
Type: schemas.ResponsesStreamResponseTypeOutputItemDone,
271+
SequenceNumber: sequenceNumber + len(responses),
272+
OutputIndex: schemas.Ptr(prevOutputIndex),
273+
ContentIndex: &reasoningContentIndex,
274+
Item: doneItem,
275+
})
276+
277+
// Mark this output index as completed
278+
state.CompletedOutputIndices[prevOutputIndex] = true
279+
}
280+
// Clear reasoning content indices after closing them
281+
clear(state.ReasoningContentIndices)
282+
283+
// Close any open tool call blocks before starting a new one (Anthropic completes each block before starting next)
284+
for prevContentIndex, prevOutputIndex := range state.ContentIndexToOutputIndex {
285+
// Skip reasoning blocks (already handled above)
286+
if state.ReasoningContentIndices[prevContentIndex] {
287+
continue
288+
}
289+
290+
// Skip already completed output indices
291+
if state.CompletedOutputIndices[prevOutputIndex] {
292+
continue
293+
}
294+
295+
// Check if this is a tool call
296+
prevToolCallID := state.ToolCallIDs[prevOutputIndex]
297+
if prevToolCallID == "" {
298+
continue // Not a tool call
299+
}
300+
301+
prevItemID := state.ItemIDs[prevOutputIndex]
302+
prevToolName := state.ToolCallNames[prevOutputIndex]
303+
accumulatedArgs := state.ToolArgumentBuffers[prevOutputIndex]
304+
305+
// Emit content_part.done for tool call
306+
responses = append(responses, &schemas.BifrostResponsesStreamResponse{
307+
Type: schemas.ResponsesStreamResponseTypeContentPartDone,
308+
SequenceNumber: sequenceNumber + len(responses),
309+
OutputIndex: schemas.Ptr(prevOutputIndex),
310+
ContentIndex: schemas.Ptr(prevContentIndex),
311+
ItemID: &prevItemID,
312+
})
313+
314+
// Emit function_call_arguments.done with full arguments
315+
if accumulatedArgs != "" {
316+
var doneItem *schemas.ResponsesMessage
317+
if prevToolCallID != "" || prevToolName != "" {
318+
doneItem = &schemas.ResponsesMessage{
319+
ResponsesToolMessage: &schemas.ResponsesToolMessage{},
320+
}
321+
if prevToolCallID != "" {
322+
doneItem.ResponsesToolMessage.CallID = &prevToolCallID
323+
}
324+
if prevToolName != "" {
325+
doneItem.ResponsesToolMessage.Name = &prevToolName
326+
}
327+
}
328+
329+
argsDoneResponse := &schemas.BifrostResponsesStreamResponse{
330+
Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDone,
331+
SequenceNumber: sequenceNumber + len(responses),
332+
OutputIndex: schemas.Ptr(prevOutputIndex),
333+
Arguments: &accumulatedArgs,
334+
}
335+
if prevItemID != "" {
336+
argsDoneResponse.ItemID = &prevItemID
337+
}
338+
if doneItem != nil {
339+
argsDoneResponse.Item = doneItem
340+
}
341+
responses = append(responses, argsDoneResponse)
342+
}
343+
344+
// Emit output_item.done for tool call
345+
statusCompleted := "completed"
346+
toolDoneItem := &schemas.ResponsesMessage{
347+
ID: &prevItemID,
348+
Type: schemas.Ptr(schemas.ResponsesMessageTypeFunctionCall),
349+
Status: &statusCompleted,
350+
ResponsesToolMessage: &schemas.ResponsesToolMessage{
351+
CallID: &prevToolCallID,
352+
Name: &prevToolName,
353+
Arguments: &accumulatedArgs,
354+
},
355+
}
356+
357+
responses = append(responses, &schemas.BifrostResponsesStreamResponse{
358+
Type: schemas.ResponsesStreamResponseTypeOutputItemDone,
359+
SequenceNumber: sequenceNumber + len(responses),
360+
OutputIndex: schemas.Ptr(prevOutputIndex),
361+
ContentIndex: schemas.Ptr(prevContentIndex),
362+
ItemID: &prevItemID,
363+
Item: toolDoneItem,
364+
})
365+
366+
// Mark this output index as completed
367+
state.CompletedOutputIndices[prevOutputIndex] = true
368+
}
369+
218370
// Create new output index for this tool use
219371
outputIndex := state.CurrentOutputIndex
220372
state.ContentIndexToOutputIndex[contentBlockIndex] = outputIndex

core/providers/openai/openai.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,9 +1154,6 @@ func HandleOpenAIResponsesRequest(
11541154
return nil, bifrostErr
11551155
}
11561156

1157-
fmt.Println("jsonData", string(jsonData))
1158-
fmt.Println("--------------------------------")
1159-
11601157
req.SetBody(jsonData)
11611158

11621159
// Make request

core/providers/utils/utils.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,20 @@ func HandleProviderAPIError(resp *fasthttp.Response, errorResp any) *schemas.Bif
319319
statusCode := resp.StatusCode()
320320
body := append([]byte(nil), resp.Body()...)
321321

322+
// decode body
323+
decodedBody, err := CheckAndDecodeBody(resp)
324+
if err != nil {
325+
return &schemas.BifrostError{
326+
IsBifrostError: false,
327+
StatusCode: &statusCode,
328+
Error: &schemas.ErrorField{
329+
Message: err.Error(),
330+
},
331+
}
332+
}
333+
334+
body = decodedBody
335+
322336
if err := sonic.Unmarshal(body, errorResp); err != nil {
323337
rawResponse := body
324338
message := fmt.Sprintf("provider API error: %s", string(rawResponse))

core/providers/vertex/errors.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,21 @@ import (
1010
func parseVertexError(providerName schemas.ModelProvider, resp *fasthttp.Response) *schemas.BifrostError {
1111
var openAIErr schemas.BifrostError
1212
var vertexErr []VertexError
13-
if err := sonic.Unmarshal(resp.Body(), &openAIErr); err != nil || openAIErr.Error == nil {
13+
14+
decodedBody, err := providerUtils.CheckAndDecodeBody(resp)
15+
if err != nil {
16+
return providerUtils.NewBifrostOperationError(schemas.ErrProviderResponseDecode, err, providerName)
17+
}
18+
19+
if err := sonic.Unmarshal(decodedBody, &openAIErr); err != nil || openAIErr.Error == nil {
1420
// Try Vertex error format if OpenAI format fails or is incomplete
15-
if err := sonic.Unmarshal(resp.Body(), &vertexErr); err != nil {
21+
if err := sonic.Unmarshal(decodedBody, &vertexErr); err != nil {
1622
//try with single Vertex error format
1723
var vertexErr VertexError
18-
if err := sonic.Unmarshal(resp.Body(), &vertexErr); err != nil {
24+
if err := sonic.Unmarshal(decodedBody, &vertexErr); err != nil {
1925
// Try VertexValidationError format (validation errors from Mistral endpoint)
2026
var validationErr VertexValidationError
21-
if err := sonic.Unmarshal(resp.Body(), &validationErr); err != nil {
27+
if err := sonic.Unmarshal(decodedBody, &validationErr); err != nil {
2228
return providerUtils.NewBifrostOperationError(schemas.ErrProviderResponseUnmarshal, err, providerName)
2329
}
2430
if len(validationErr.Detail) > 0 {

core/schemas/bifrost.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ const (
117117
BifrostContextKeyUseRawRequestBody BifrostContextKey = "bifrost-use-raw-request-body"
118118
BifrostContextKeySendBackRawRequest BifrostContextKey = "bifrost-send-back-raw-request" // bool
119119
BifrostContextKeySendBackRawResponse BifrostContextKey = "bifrost-send-back-raw-response" // bool
120+
BifrostContextKeyIntegrationType BifrostContextKey = "bifrost-integration-type" // RouteConfigType
120121
BifrostContextKeyIsResponsesToChatCompletionFallback BifrostContextKey = "bifrost-is-responses-to-chat-completion-fallback" // bool (set by bifrost)
121122
)
122123

0 commit comments

Comments
 (0)