Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
558 changes: 134 additions & 424 deletions core/bifrost.go

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions core/changelog.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
fix: vertex and bedrock usage aggregation improvements for streaming
fix: choice index fixed to 0 for anthropic and bedrock streaming
- feat: adds batch and files API support for bedrock, openai, anthropic and gemini
- feat: new provider support - nebius
- feat: structured output support
- fix: vertex and bedrock usage aggregation improvements for streaming
- fix: choice index fixed to 0 for anthropic and bedrock streaming
21 changes: 7 additions & 14 deletions core/providers/anthropic/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ func (provider *AnthropicProvider) BatchCreate(ctx context.Context, key schemas.
}

// BatchList lists batch jobs.
func (provider *AnthropicProvider) BatchList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *AnthropicProvider) BatchList(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Anthropic, provider.customProviderConfig, schemas.BatchListRequest); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1147,8 +1147,8 @@ func (provider *AnthropicProvider) BatchList(ctx context.Context, keys []schemas
req.Header.SetContentType("application/json")

// Use first key if available
if len(keys) > 0 && keys[0].Value != "" {
req.Header.Set("x-api-key", keys[0].Value)
if key.Value != "" {
req.Header.Set("x-api-key", key.Value)
}
req.Header.Set("anthropic-version", provider.apiVersion)

Expand Down Expand Up @@ -1579,35 +1579,28 @@ func (provider *AnthropicProvider) FileUpload(ctx context.Context, key schemas.K
}

// FileList lists files from Anthropic's Files API.
func (provider *AnthropicProvider) FileList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *AnthropicProvider) FileList(ctx context.Context, key schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Anthropic, provider.customProviderConfig, schemas.FileListRequest); err != nil {
return nil, err
}

providerName := provider.GetProviderKey()

if len(keys) == 0 {
return nil, providerUtils.NewConfigurationError("no keys provided", providerName)
}

key := keys[0]


// Create request
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)

// Build URL with query params
baseURL := provider.buildRequestURL(ctx, "/v1/files", schemas.FileListRequest)
requestURL := provider.buildRequestURL(ctx, "/v1/files", schemas.FileListRequest)
values := url.Values{}
if request.Limit > 0 {
values.Set("limit", fmt.Sprintf("%d", request.Limit))
}
if request.After != nil && *request.After != "" {
values.Set("after_id", *request.After)
}
requestURL := baseURL
}
if encodedValues := values.Encode(); encodedValues != "" {
requestURL += "?" + encodedValues
}
Expand Down
8 changes: 7 additions & 1 deletion core/providers/anthropic/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,15 @@ func ToAnthropicBatchCreateResponse(resp *schemas.BifrostBatchCreateResponse) *A
Type: "message_batch",
ProcessingStatus: toAnthropicProcessingStatus(resp.Status),
CreatedAt: formatAnthropicTimestamp(resp.CreatedAt),
ExpiresAt: formatAnthropicTimestamp(*resp.ExpiresAt),
ResultsURL: resp.ResultsURL,
}
if resp.ExpiresAt != nil {
result.ExpiresAt = formatAnthropicTimestamp(*resp.ExpiresAt)
} else {
// This is a fallback for worst case scenario where expires_at is not available
// Which is never expected to happen, but just in case.
result.ExpiresAt = formatAnthropicTimestamp(time.Now().Add(24 * time.Hour).Unix())
}
if resp.RequestCounts.Total > 0 {
result.RequestCounts = &AnthropicBatchRequestCounts{
Processing: resp.RequestCounts.Pending,
Expand Down
22 changes: 8 additions & 14 deletions core/providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,12 +941,8 @@ func (provider *AzureProvider) FileUpload(ctx context.Context, key schemas.Key,
}

// FileList lists files from Azure OpenAI.
func (provider *AzureProvider) FileList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
if len(keys) == 0 {
return nil, providerUtils.NewConfigurationError("no keys provided", provider.GetProviderKey())
}

key := keys[0]
func (provider *AzureProvider) FileList(ctx context.Context, key schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {

if err := provider.validateKeyConfigForFiles(key); err != nil {
return nil, err
}
Expand All @@ -966,13 +962,15 @@ func (provider *AzureProvider) FileList(ctx context.Context, keys []schemas.Key,
defer fasthttp.ReleaseResponse(resp)

// Build URL with query params
baseURL := fmt.Sprintf("%s/openai/files", key.AzureKeyConfig.Endpoint)
requestURL := fmt.Sprintf("%s/openai/files", key.AzureKeyConfig.Endpoint)
values := url.Values{}
values.Set("api-version", *apiVersion)
if request.Purpose != "" {
values.Set("purpose", string(request.Purpose))
}
requestURL := baseURL + "?" + values.Encode()
if encodedValues := values.Encode(); encodedValues != "" {
requestURL += "?" + encodedValues
}

// Set headers
providerUtils.SetExtraHeaders(ctx, req, provider.networkConfig.ExtraHeaders, nil)
Expand Down Expand Up @@ -1396,12 +1394,8 @@ func (provider *AzureProvider) BatchCreate(ctx context.Context, key schemas.Key,
}

// BatchList lists batch jobs from Azure OpenAI.
func (provider *AzureProvider) BatchList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
if len(keys) == 0 {
return nil, providerUtils.NewConfigurationError("no keys provided", provider.GetProviderKey())
}

key := keys[0]
func (provider *AzureProvider) BatchList(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {

if err := provider.validateKeyConfigForFiles(key); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion core/providers/bedrock/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,10 @@ func toBedrockBatchStatus(status schemas.BatchStatus) string {
func ToBifrostBatchListRequest(req *BedrockBatchListRequest, provider schemas.ModelProvider) *schemas.BifrostBatchListRequest {
result := &schemas.BifrostBatchListRequest{
Provider: provider,
Limit: req.MaxResults,
// We add a dummy model to avoid validation errors
// This model is never used in any of the provider flows
Model: "dummy-model",
Limit: req.MaxResults,
}

if req.NextToken != nil {
Expand Down
29 changes: 9 additions & 20 deletions core/providers/bedrock/bedrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,8 +1291,11 @@ func (provider *BedrockProvider) TranscriptionStream(ctx context.Context, postHo

// FileUpload uploads a file to S3 for Bedrock batch processing.
func (provider *BedrockProvider) FileUpload(ctx context.Context, key schemas.Key, request *schemas.BifrostFileUploadRequest) (*schemas.BifrostFileUploadResponse, *schemas.BifrostError) {

if err := providerUtils.CheckOperationAllowed(schemas.Bedrock, provider.customProviderConfig, schemas.FileUploadRequest); err != nil {
provider.logger.Error("file upload operation not allowed: %s", err.Error.Message)
if err.Error != nil {
provider.logger.Error("file upload operation not allowed: %s", err.Error.Message)
}
return nil, err
}

Expand Down Expand Up @@ -1417,22 +1420,13 @@ func (provider *BedrockProvider) FileUpload(ctx context.Context, key schemas.Key
}

// FileList lists files in the S3 bucket used for Bedrock batch processing.
func (provider *BedrockProvider) FileList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *BedrockProvider) FileList(ctx context.Context, key schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Bedrock, provider.customProviderConfig, schemas.FileListRequest); err != nil {
return nil, err
}

providerName := provider.GetProviderKey()

if len(keys) == 0 {
return nil, providerUtils.NewConfigurationError("no keys provided", providerName)
}

key := keys[0]
if key.BedrockKeyConfig == nil {
return nil, providerUtils.NewConfigurationError("bedrock key config is not provided", providerName)
}

// Get S3 bucket from storage config or extra params
s3Bucket := ""
s3Prefix := ""
Expand Down Expand Up @@ -1478,9 +1472,9 @@ func (provider *BedrockProvider) FileList(ctx context.Context, keys []schemas.Ke
params.Set("continuation-token", *request.After)
}

reqURL := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/?%s", bucketName, region, params.Encode())
requestURL := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/?%s", bucketName, region, params.Encode())

httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil {
return nil, providerUtils.NewBifrostOperationError("error creating request", err, providerName)
}
Expand Down Expand Up @@ -2052,18 +2046,13 @@ func (provider *BedrockProvider) BatchCreate(ctx context.Context, key schemas.Ke
}

// BatchList lists batch inference jobs from AWS Bedrock.
func (provider *BedrockProvider) BatchList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *BedrockProvider) BatchList(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Bedrock, provider.customProviderConfig, schemas.BatchListRequest); err != nil {
return nil, err
}

providerName := provider.GetProviderKey()

if len(keys) == 0 {
return nil, providerUtils.NewConfigurationError("no keys provided", providerName)
}

key := keys[0]
if key.BedrockKeyConfig == nil {
return nil, providerUtils.NewConfigurationError("bedrock key config is not provided", providerName)
}
Expand Down Expand Up @@ -2508,7 +2497,7 @@ func (provider *BedrockProvider) BatchResults(ctx context.Context, key schemas.K
allFiles []schemas.FileObject
)
for {
listResp, bifrostErr = provider.FileList(ctx, []schemas.Key{key}, &schemas.BifrostFileListRequest{
listResp, bifrostErr = provider.FileList(ctx, key, &schemas.BifrostFileListRequest{
Provider: request.Provider,
StorageConfig: &schemas.FileStorageConfig{
S3: &schemas.S3StorageConfig{
Expand Down
4 changes: 2 additions & 2 deletions core/providers/cerebras/cerebras.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (provider *CerebrasProvider) FileUpload(_ context.Context, _ schemas.Key, _
}

// FileList is not supported by Cerebras provider.
func (provider *CerebrasProvider) FileList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *CerebrasProvider) FileList(_ context.Context, _ schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.FileListRequest, provider.GetProviderKey())
}

Expand All @@ -242,7 +242,7 @@ func (provider *CerebrasProvider) BatchCreate(_ context.Context, _ schemas.Key,
}

// BatchList is not supported by Cerebras provider.
func (provider *CerebrasProvider) BatchList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *CerebrasProvider) BatchList(_ context.Context, _ schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchListRequest, provider.GetProviderKey())
}

Expand Down
4 changes: 2 additions & 2 deletions core/providers/cohere/cohere.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (provider *CohereProvider) BatchCreate(_ context.Context, _ schemas.Key, _
}

// BatchList is not supported by Cohere provider.
func (provider *CohereProvider) BatchList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *CohereProvider) BatchList(_ context.Context, _ schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchListRequest, provider.GetProviderKey())
}

Expand All @@ -872,7 +872,7 @@ func (provider *CohereProvider) FileUpload(_ context.Context, _ schemas.Key, _ *
}

// FileList is not supported by Cohere provider.
func (provider *CohereProvider) FileList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *CohereProvider) FileList(_ context.Context, _ schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.FileListRequest, provider.GetProviderKey())
}

Expand Down
4 changes: 2 additions & 2 deletions core/providers/elevenlabs/elevenlabs.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func (provider *ElevenlabsProvider) BatchCreate(_ context.Context, _ schemas.Key
}

// BatchList is not supported by Elevenlabs provider.
func (provider *ElevenlabsProvider) BatchList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *ElevenlabsProvider) BatchList(_ context.Context, _ schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchListRequest, provider.GetProviderKey())
}

Expand All @@ -743,7 +743,7 @@ func (provider *ElevenlabsProvider) FileUpload(_ context.Context, _ schemas.Key,
}

// FileList is not supported by Elevenlabs provider.
func (provider *ElevenlabsProvider) FileList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *ElevenlabsProvider) FileList(_ context.Context, _ schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.FileListRequest, provider.GetProviderKey())
}

Expand Down
23 changes: 7 additions & 16 deletions core/providers/gemini/gemini.go
Original file line number Diff line number Diff line change
Expand Up @@ -1624,19 +1624,13 @@ func (provider *GeminiProvider) BatchCreate(ctx context.Context, key schemas.Key

// BatchList lists batch jobs for Gemini.
// Note: The consumer API may have limited list functionality.
func (provider *GeminiProvider) BatchList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *GeminiProvider) BatchList(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Gemini, provider.customProviderConfig, schemas.BatchListRequest); err != nil {
return nil, err
}

providerName := provider.GetProviderKey()

// Select a key for the request
if len(keys) == 0 {
return nil, providerUtils.NewBifrostOperationError("at least one API key is required", nil, providerName)
}
key := keys[0]

// Create HTTP request
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
Expand Down Expand Up @@ -2238,35 +2232,28 @@ func (provider *GeminiProvider) FileUpload(ctx context.Context, key schemas.Key,
}

// FileList lists files from Gemini.
func (provider *GeminiProvider) FileList(ctx context.Context, keys []schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *GeminiProvider) FileList(ctx context.Context, key schemas.Key, request *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Gemini, provider.customProviderConfig, schemas.FileListRequest); err != nil {
return nil, err
}

providerName := provider.GetProviderKey()

if len(keys) == 0 {
return nil, providerUtils.NewConfigurationError("no keys provided", providerName)
}

key := keys[0]

// Create request
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)

// Build URL with pagination
baseURL := fmt.Sprintf("%s/files", provider.networkConfig.BaseURL)
requestURL := fmt.Sprintf("%s/files", provider.networkConfig.BaseURL)
values := url.Values{}
if request.Limit > 0 {
values.Set("pageSize", fmt.Sprintf("%d", request.Limit))
}
if request.After != nil && *request.After != "" {
values.Set("pageToken", *request.After)
}
requestURL := baseURL
if encodedValues := values.Encode(); encodedValues != "" {
requestURL += "?" + encodedValues
}
Expand Down Expand Up @@ -2315,6 +2302,10 @@ func (provider *GeminiProvider) FileList(ctx context.Context, keys []schemas.Key
},
}

if geminiResp.NextPageToken != "" {
bifrostResp.After = &geminiResp.NextPageToken
}

for i, file := range geminiResp.Files {
var sizeBytes int64
fmt.Sscanf(file.SizeBytes, "%d", &sizeBytes)
Expand Down
4 changes: 2 additions & 2 deletions core/providers/groq/groq.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (provider *GroqProvider) BatchCreate(_ context.Context, _ schemas.Key, _ *s
}

// BatchList is not supported by Groq provider.
func (provider *GroqProvider) BatchList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
func (provider *GroqProvider) BatchList(_ context.Context, _ schemas.Key, _ *schemas.BifrostBatchListRequest) (*schemas.BifrostBatchListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchListRequest, provider.GetProviderKey())
}

Expand All @@ -280,7 +280,7 @@ func (provider *GroqProvider) FileUpload(_ context.Context, _ schemas.Key, _ *sc
}

// FileList is not supported by Groq provider.
func (provider *GroqProvider) FileList(_ context.Context, _ []schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
func (provider *GroqProvider) FileList(_ context.Context, _ schemas.Key, _ *schemas.BifrostFileListRequest) (*schemas.BifrostFileListResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.FileListRequest, provider.GetProviderKey())
}

Expand Down
Loading