Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/internal/testutil/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (account *ComprehensiveTestAccount) GetKeysForProvider(ctx *context.Context
"claude-3.7-sonnet": "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"claude-4-sonnet": "global.anthropic.claude-sonnet-4-20250514-v1:0",
"claude-4.5-sonnet": "global.anthropic.claude-sonnet-4-5-20250929-v1:0",
"claude-4.5-haiku": "global.anthropic.claude-haiku-4-5-20251001-v1:0",
},
},
},
Expand Down
351 changes: 351 additions & 0 deletions core/internal/testutil/chat_completion_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,355 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
t.Logf("✅ Streaming with tools test completed successfully")
})
}

// Test chat completion streaming with reasoning if supported
if testConfig.Scenarios.Reasoning && testConfig.ReasoningModel != "" {
t.Run("ChatCompletionStreamWithReasoning", func(t *testing.T) {
if os.Getenv("SKIP_PARALLEL_TESTS") != "true" {
t.Parallel()
}

problemPrompt := "Solve this step by step: If a train leaves station A at 2 PM traveling at 60 mph, and another train leaves station B at 3 PM traveling at 80 mph toward station A, and the stations are 420 miles apart, when will they meet?"

messages := []schemas.ChatMessage{
CreateBasicChatMessage(problemPrompt),
}

request := &schemas.BifrostChatRequest{
Provider: testConfig.Provider,
Model: testConfig.ReasoningModel,
Input: messages,
Params: &schemas.ChatParameters{
MaxCompletionTokens: bifrost.Ptr(1800),
Reasoning: &schemas.ChatReasoning{
Effort: bifrost.Ptr("high"),
MaxTokens: bifrost.Ptr(1500),
},
},
Fallbacks: testConfig.Fallbacks,
}

// Use retry framework for stream requests with reasoning
retryConfig := StreamingRetryConfig()
retryContext := TestRetryContext{
ScenarioName: "ChatCompletionStreamWithReasoning",
ExpectedBehavior: map[string]interface{}{
"should_stream_reasoning": true,
"should_have_reasoning_events": true,
"problem_type": "mathematical",
},
TestMetadata: map[string]interface{}{
"provider": testConfig.Provider,
"model": testConfig.ReasoningModel,
"reasoning": true,
},
}

// Use proper streaming retry wrapper for the stream request
responseChannel, err := WithStreamRetry(t, retryConfig, retryContext, func() (chan *schemas.BifrostStream, *schemas.BifrostError) {
return client.ChatCompletionStreamRequest(ctx, request)
})

RequireNoError(t, err, "Chat completion stream with reasoning failed")
if responseChannel == nil {
t.Fatal("Response channel should not be nil")
}

var reasoningDetected bool
var reasoningDetailsDetected bool
var reasoningTokensDetected bool
var responseCount int

streamCtx, cancel := context.WithTimeout(ctx, 200*time.Second)
defer cancel()

t.Logf("🧠 Testing chat completion streaming with reasoning...")

for {
select {
case response, ok := <-responseChannel:
if !ok {
goto reasoningStreamComplete
}

if response == nil {
t.Fatal("Streaming response should not be nil")
}
responseCount++

if response.BifrostChatResponse != nil {
chatResp := response.BifrostChatResponse

// Check for reasoning in choices
if len(chatResp.Choices) > 0 {
for _, choice := range chatResp.Choices {
if choice.ChatStreamResponseChoice != nil && choice.ChatStreamResponseChoice.Delta != nil {
delta := choice.ChatStreamResponseChoice.Delta

// Check for reasoning content in delta
if delta.Reasoning != nil && *delta.Reasoning != "" {
reasoningDetected = true
t.Logf("🧠 Reasoning content detected: %q", *delta.Reasoning)
}

// Check for reasoning details in delta
if len(delta.ReasoningDetails) > 0 {
reasoningDetailsDetected = true
t.Logf("🧠 Reasoning details detected: %d entries", len(delta.ReasoningDetails))

for _, detail := range delta.ReasoningDetails {
t.Logf(" - Type: %s, Index: %d", detail.Type, detail.Index)
switch detail.Type {
case schemas.BifrostReasoningDetailsTypeText:
if detail.Text != nil && *detail.Text != "" {
maxLen := 100
text := *detail.Text
if len(text) < maxLen {
maxLen = len(text)
}
t.Logf(" Text preview: %q", text[:maxLen])
}
case schemas.BifrostReasoningDetailsTypeSummary:
if detail.Summary != nil {
t.Logf(" Summary length: %d", len(*detail.Summary))
}
case schemas.BifrostReasoningDetailsTypeEncrypted:
if detail.Data != nil {
t.Logf(" Encrypted data length: %d", len(*detail.Data))
}
}
}
}
}
}
}

// Check for reasoning tokens in usage (usually in final chunk)
if chatResp.Usage != nil && chatResp.Usage.CompletionTokensDetails != nil {
if chatResp.Usage.CompletionTokensDetails.ReasoningTokens > 0 {
reasoningTokensDetected = true
t.Logf("🔢 Reasoning tokens used: %d", chatResp.Usage.CompletionTokensDetails.ReasoningTokens)
}
}
}

if responseCount > 150 {
goto reasoningStreamComplete
}

case <-streamCtx.Done():
t.Fatal("Timeout waiting for chat completion streaming response with reasoning")
}
}

reasoningStreamComplete:
if responseCount == 0 {
t.Fatal("Should receive at least one streaming response")
}

// At least one of these should be detected for reasoning
if !reasoningDetected && !reasoningDetailsDetected && !reasoningTokensDetected {
t.Logf("⚠️ Warning: No explicit reasoning indicators found in streaming response")
} else {
t.Logf("✅ Reasoning indicators detected:")
if reasoningDetected {
t.Logf(" - Reasoning content found")
}
if reasoningDetailsDetected {
t.Logf(" - Reasoning details found")
}
if reasoningTokensDetected {
t.Logf(" - Reasoning tokens reported")
}
}

t.Logf("✅ Chat completion streaming with reasoning test completed successfully")
})

// Additional test with full validation and retry support
t.Run("ChatCompletionStreamWithReasoningValidated", func(t *testing.T) {
if os.Getenv("SKIP_PARALLEL_TESTS") != "true" {
t.Parallel()
}

if testConfig.Provider == schemas.OpenAI || testConfig.Provider == schemas.Groq {
// OpenAI and Groq because reasoning for them in stream is extremely flaky
t.Skip("Skipping ChatCompletionStreamWithReasoningValidated test for OpenAI and Groq")
return
}

problemPrompt := "A farmer has 100 chickens and 50 cows. Each chicken lays 5 eggs per week, and each cow produces 20 liters of milk per day. If the farmer sells eggs for $0.25 each and milk for $1.50 per liter, and it costs $2 per week to feed each chicken and $15 per week to feed each cow, what is the farmer's weekly profit?"
if testConfig.Provider == schemas.Cerebras {
problemPrompt = "Hello how are you, can you search hackernews news regarding maxim ai for me? use your tools for this"
}

messages := []schemas.ChatMessage{
CreateBasicChatMessage(problemPrompt),
}

request := &schemas.BifrostChatRequest{
Provider: testConfig.Provider,
Model: testConfig.ReasoningModel,
Input: messages,
Params: &schemas.ChatParameters{
MaxCompletionTokens: bifrost.Ptr(1800),
Reasoning: &schemas.ChatReasoning{
Effort: bifrost.Ptr("high"),
MaxTokens: bifrost.Ptr(1500),
},
},
Fallbacks: testConfig.Fallbacks,
}

// Use retry framework for stream requests with reasoning and validation
retryConfig := StreamingRetryConfig()
retryContext := TestRetryContext{
ScenarioName: "ChatCompletionStreamWithReasoningValidated",
ExpectedBehavior: map[string]interface{}{
"should_stream_reasoning": true,
"should_have_reasoning_indicators": true,
"problem_type": "mathematical",
},
TestMetadata: map[string]interface{}{
"provider": testConfig.Provider,
"model": testConfig.ReasoningModel,
"reasoning": true,
"validated": true,
},
}

// Use validation retry wrapper that includes stream reading and validation
validationResult := WithChatStreamValidationRetry(
t,
retryConfig,
retryContext,
func() (chan *schemas.BifrostStream, *schemas.BifrostError) {
return client.ChatCompletionStreamRequest(ctx, request)
},
func(responseChannel chan *schemas.BifrostStream) ChatStreamValidationResult {
var reasoningDetected bool
var reasoningDetailsDetected bool
var reasoningTokensDetected bool
var responseCount int
var streamErrors []string
var fullContent strings.Builder

streamCtx, cancel := context.WithTimeout(ctx, 200*time.Second)
defer cancel()

t.Logf("🧠 Testing validated chat completion streaming with reasoning...")

for {
select {
case response, ok := <-responseChannel:
if !ok {
goto validatedReasoningStreamComplete
}

if response == nil {
streamErrors = append(streamErrors, "❌ Streaming response should not be nil")
continue
}
responseCount++

if response.BifrostChatResponse != nil {
chatResp := response.BifrostChatResponse

// Check for reasoning in choices
if len(chatResp.Choices) > 0 {
for _, choice := range chatResp.Choices {
if choice.ChatStreamResponseChoice != nil && choice.ChatStreamResponseChoice.Delta != nil {
delta := choice.ChatStreamResponseChoice.Delta

// Accumulate content
if delta.Content != nil {
fullContent.WriteString(*delta.Content)
t.Logf("📝 Content chunk received (length: %d, total so far: %d)", len(*delta.Content), fullContent.Len())
}

// Check for reasoning content in delta
if delta.Reasoning != nil && *delta.Reasoning != "" {
reasoningDetected = true
t.Logf("🧠 Reasoning content detected (length: %d)", len(*delta.Reasoning))
}

// Check for reasoning details in delta
if len(delta.ReasoningDetails) > 0 {
reasoningDetailsDetected = true
t.Logf("🧠 Reasoning details detected: %d entries", len(delta.ReasoningDetails))
}
}
}
}

// Check for reasoning tokens in usage
if chatResp.Usage != nil && chatResp.Usage.CompletionTokensDetails != nil {
if chatResp.Usage.CompletionTokensDetails.ReasoningTokens > 0 {
reasoningTokensDetected = true
t.Logf("🔢 Reasoning tokens: %d", chatResp.Usage.CompletionTokensDetails.ReasoningTokens)
}
}
}

if responseCount > 150 {
goto validatedReasoningStreamComplete
}

case <-streamCtx.Done():
streamErrors = append(streamErrors, "❌ Timeout waiting for streaming response with reasoning")
goto validatedReasoningStreamComplete
}
}

validatedReasoningStreamComplete:
var errors []string
if responseCount == 0 {
errors = append(errors, "❌ Should receive at least one streaming response")
}

// Check if at least one reasoning indicator is present
hasAnyReasoningIndicator := reasoningDetected || reasoningDetailsDetected || reasoningTokensDetected
if !hasAnyReasoningIndicator {
errors = append(errors, fmt.Sprintf("❌ No reasoning indicators found in streaming response (received %d chunks)", responseCount))
}

// Check content - for reasoning models, content may come after reasoning or may not be present
// If reasoning is detected, we consider it a valid response even without content
content := strings.TrimSpace(fullContent.String())
if content == "" && !hasAnyReasoningIndicator {
// Only require content if no reasoning indicators were found
errors = append(errors, "❌ No content received in streaming response and no reasoning indicators found")
} else if content == "" && hasAnyReasoningIndicator {
// Log a warning but don't fail if reasoning is present
t.Logf("⚠️ Warning: Reasoning detected but no content chunks received (this may be expected for some reasoning models)")
}

if len(streamErrors) > 0 {
errors = append(errors, streamErrors...)
}

return ChatStreamValidationResult{
Passed: len(errors) == 0,
Errors: errors,
ReceivedData: responseCount > 0 && (content != "" || hasAnyReasoningIndicator),
StreamErrors: streamErrors,
ToolCallDetected: false, // Not testing tool calls here
ResponseCount: responseCount,
}
},
)

// Check validation result
if !validationResult.Passed {
allErrors := append(validationResult.Errors, validationResult.StreamErrors...)
t.Fatalf("❌ Chat completion stream with reasoning validation failed after retries: %s", strings.Join(allErrors, "; "))
}

if validationResult.ResponseCount == 0 {
t.Fatalf("❌ Should receive at least one streaming response")
}

t.Logf("✅ Validated chat completion streaming with reasoning test completed successfully")
})
}
}
Loading