diff --git a/core/bifrost.go b/core/bifrost.go index 51ff407ff..aa271b74c 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -1260,6 +1260,84 @@ func (bifrost *Bifrost) BatchResultsRequest(ctx context.Context, req *schemas.Bi return response, nil } +// BatchDeleteRequest deletes a batch job. +func (bifrost *Bifrost) BatchDeleteRequest(ctx context.Context, req *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + if req == nil { + return nil, &schemas.BifrostError{ + IsBifrostError: false, + Error: &schemas.ErrorField{ + Message: "batch delete request is nil", + }, + } + } + if req.Provider == "" { + return nil, &schemas.BifrostError{ + IsBifrostError: false, + Error: &schemas.ErrorField{ + Message: "provider is required for batch delete request", + }, + } + } + if req.BatchID == "" { + return nil, &schemas.BifrostError{ + IsBifrostError: false, + Error: &schemas.ErrorField{ + Message: "batch_id is required for batch delete request", + }, + } + } + if ctx == nil { + ctx = bifrost.ctx + } + + provider := bifrost.getProviderByKey(req.Provider) + if provider == nil { + return nil, &schemas.BifrostError{ + IsBifrostError: false, + Error: &schemas.ErrorField{ + Message: "provider not found for batch delete request", + }, + } + } + + config, err := bifrost.account.GetConfigForProvider(req.Provider) + if err != nil { + return nil, newBifrostErrorFromMsg(fmt.Sprintf("failed to get config for provider %s: %v", req.Provider, err.Error())) + } + if config == nil { + return nil, newBifrostErrorFromMsg(fmt.Sprintf("config is nil for provider %s", req.Provider)) + } + + // Determine the base provider type for key requirement checks + baseProvider := req.Provider + if config.CustomProviderConfig != nil && config.CustomProviderConfig.BaseProviderType != "" { + baseProvider = config.CustomProviderConfig.BaseProviderType + } + + var key schemas.Key + if providerRequiresKey(baseProvider, config.CustomProviderConfig) { + keys, keyErr := bifrost.getAllSupportedKeys(&ctx, req.Provider, baseProvider) + if keyErr != nil { + return nil, newBifrostError(keyErr) + } + if len(keys) > 0 { + key = keys[0] + } + } + + response, bifrostErr := executeRequestWithRetries(&ctx, config, func() (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return provider.BatchDelete(ctx, key, req) + }, schemas.BatchDeleteRequest, req.Provider, "") + if bifrostErr != nil { + bifrostErr.ExtraFields = schemas.BifrostErrorExtraFields{ + RequestType: schemas.BatchDeleteRequest, + Provider: req.Provider, + } + return nil, bifrostErr + } + return response, nil +} + // FileUploadRequest uploads a file to the specified provider. func (bifrost *Bifrost) FileUploadRequest(ctx context.Context, req *schemas.BifrostFileUploadRequest) (*schemas.BifrostFileUploadResponse, *schemas.BifrostError) { if req == nil { diff --git a/core/providers/anthropic/batch.go b/core/providers/anthropic/batch.go index 8125882c7..17574d7e5 100644 --- a/core/providers/anthropic/batch.go +++ b/core/providers/anthropic/batch.go @@ -377,3 +377,8 @@ func formatAnthropicTimestamp(unixTime int64) string { } return time.Unix(unixTime, 0).UTC().Format(time.RFC3339) } + +// BatchDelete is not supported by Anthropic provider. +func (provider *AnthropicProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} diff --git a/core/providers/cerebras/cerebras.go b/core/providers/cerebras/cerebras.go index 5a25ff6f9..6344cb361 100644 --- a/core/providers/cerebras/cerebras.go +++ b/core/providers/cerebras/cerebras.go @@ -260,3 +260,8 @@ func (provider *CerebrasProvider) BatchCancel(_ context.Context, _ schemas.Key, func (provider *CerebrasProvider) BatchResults(_ context.Context, _ schemas.Key, _ *schemas.BifrostBatchResultsRequest) (*schemas.BifrostBatchResultsResponse, *schemas.BifrostError) { return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) } + +// BatchDelete is not supported by Cerebras provider. +func (provider *CerebrasProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} diff --git a/core/providers/cohere/cohere.go b/core/providers/cohere/cohere.go index 973cf2bae..3edfde965 100644 --- a/core/providers/cohere/cohere.go +++ b/core/providers/cohere/cohere.go @@ -866,6 +866,12 @@ func (provider *CohereProvider) BatchResults(_ context.Context, _ schemas.Key, _ return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) } +// BatchDelete is not supported by Cohere provider. +func (provider *CohereProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} + + // FileUpload is not supported by Cohere provider. func (provider *CohereProvider) FileUpload(_ context.Context, _ schemas.Key, _ *schemas.BifrostFileUploadRequest) (*schemas.BifrostFileUploadResponse, *schemas.BifrostError) { return nil, providerUtils.NewUnsupportedOperationError(schemas.FileUploadRequest, provider.GetProviderKey()) diff --git a/core/providers/gemini/batch.go b/core/providers/gemini/batch.go index bf02b0af2..62c44cf2a 100644 --- a/core/providers/gemini/batch.go +++ b/core/providers/gemini/batch.go @@ -240,3 +240,146 @@ func extractGeminiUsageMetadata(geminiResponse *GenerateContentResponse) (int, i } return inputTokens, outputTokens, totalTokens } + +// ==================== SDK RESPONSE CONVERTERS ==================== +// These functions convert Bifrost batch responses to Google GenAI SDK format. + +// ToGeminiJobState converts Bifrost batch status to Gemini SDK job state. +func ToGeminiJobState(status schemas.BatchStatus) string { + switch status { + case schemas.BatchStatusValidating: + return GeminiJobStatePending + case schemas.BatchStatusInProgress: + return GeminiJobStateRunning + case schemas.BatchStatusFinalizing: + return GeminiJobStateRunning + case schemas.BatchStatusCompleted: + return GeminiJobStateSucceeded + case schemas.BatchStatusFailed: + return GeminiJobStateFailed + case schemas.BatchStatusCancelling: + return GeminiJobStateCancelling + case schemas.BatchStatusCancelled: + return GeminiJobStateCancelled + case schemas.BatchStatusExpired: + return GeminiJobStateFailed + default: + return GeminiJobStatePending + } +} + +// ToGeminiBatchJobResponse converts a BifrostBatchCreateResponse to Gemini SDK format. +func ToGeminiBatchJobResponse(resp *schemas.BifrostBatchCreateResponse) *GeminiBatchJobResponseSDK { + if resp == nil { + return nil + } + + result := &GeminiBatchJobResponseSDK{ + Name: resp.ID, + State: ToGeminiJobState(resp.Status), + } + + // Add metadata if available + if resp.CreatedAt > 0 { + result.Metadata = &GeminiBatchMetadata{ + Name: resp.ID, + State: ToGeminiJobState(resp.Status), + CreateTime: time.Unix(resp.CreatedAt, 0).Format(time.RFC3339), + BatchStats: &GeminiBatchStats{ + RequestCount: resp.RequestCounts.Total, + PendingRequestCount: resp.RequestCounts.Total - resp.RequestCounts.Completed, + SuccessfulRequestCount: resp.RequestCounts.Completed - resp.RequestCounts.Failed, + }, + } + } + + return result +} + +// ToGeminiBatchRetrieveResponse converts a BifrostBatchRetrieveResponse to Gemini SDK format. +func ToGeminiBatchRetrieveResponse(resp *schemas.BifrostBatchRetrieveResponse) *GeminiBatchJobResponseSDK { + if resp == nil { + return nil + } + + result := &GeminiBatchJobResponseSDK{ + Name: resp.ID, + State: ToGeminiJobState(resp.Status), + } + + // Add metadata + result.Metadata = &GeminiBatchMetadata{ + Name: resp.ID, + State: ToGeminiJobState(resp.Status), + CreateTime: time.Unix(resp.CreatedAt, 0).Format(time.RFC3339), + BatchStats: &GeminiBatchStats{ + RequestCount: resp.RequestCounts.Total, + PendingRequestCount: resp.RequestCounts.Total - resp.RequestCounts.Completed, + SuccessfulRequestCount: resp.RequestCounts.Completed - resp.RequestCounts.Failed, + }, + } + + if resp.CompletedAt != nil { + result.Metadata.EndTime = time.Unix(*resp.CompletedAt, 0).Format(time.RFC3339) + } + + // Add output file info if available + if resp.OutputFileID != nil { + result.Dest = &GeminiBatchDest{ + FileName: *resp.OutputFileID, + } + } + + return result +} + +// ToGeminiBatchListResponse converts a BifrostBatchListResponse to Gemini SDK format. +func ToGeminiBatchListResponse(resp *schemas.BifrostBatchListResponse) *GeminiBatchListResponseSDK { + if resp == nil { + return nil + } + + jobs := make([]GeminiBatchJobResponseSDK, 0, len(resp.Data)) + for _, batch := range resp.Data { + job := GeminiBatchJobResponseSDK{ + Name: batch.ID, + State: ToGeminiJobState(batch.Status), + } + + // Add metadata + job.Metadata = &GeminiBatchMetadata{ + Name: batch.ID, + State: ToGeminiJobState(batch.Status), + CreateTime: time.Unix(batch.CreatedAt, 0).Format(time.RFC3339), + BatchStats: &GeminiBatchStats{ + RequestCount: batch.RequestCounts.Total, + PendingRequestCount: batch.RequestCounts.Total - batch.RequestCounts.Completed, + SuccessfulRequestCount: batch.RequestCounts.Completed - batch.RequestCounts.Failed, + }, + } + + jobs = append(jobs, job) + } + + result := &GeminiBatchListResponseSDK{ + BatchJobs: jobs, + } + + if resp.NextCursor != nil { + result.NextPageToken = *resp.NextCursor + } + + return result +} + +// ToGeminiBatchCancelResponse converts a BifrostBatchCancelResponse to Gemini SDK format. +func ToGeminiBatchCancelResponse(resp *schemas.BifrostBatchCancelResponse) *GeminiBatchJobResponseSDK { + if resp == nil { + return nil + } + + return &GeminiBatchJobResponseSDK{ + Name: resp.ID, + State: ToGeminiJobState(resp.Status), + } +} \ No newline at end of file diff --git a/core/providers/gemini/gemini.go b/core/providers/gemini/gemini.go index b32b6bf09..6a7640bc9 100644 --- a/core/providers/gemini/gemini.go +++ b/core/providers/gemini/gemini.go @@ -2101,6 +2101,65 @@ func (provider *GeminiProvider) BatchResults(ctx context.Context, key schemas.Ke return batchResultsResp, nil } +// BatchDelete deletes a batch job for Gemini. +func (provider *GeminiProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + if err := providerUtils.CheckOperationAllowed(schemas.Gemini, provider.customProviderConfig, schemas.BatchDeleteRequest); err != nil { + return nil, err + } + + providerName := provider.GetProviderKey() + + if request.BatchID == "" { + return nil, providerUtils.NewBifrostOperationError("batch_id is required", nil, providerName) + } + + // Create HTTP request + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + // Build URL for delete operation + batchID := request.BatchID + var url string + if strings.HasPrefix(batchID, "batches/") { + url = fmt.Sprintf("%s/%s", provider.networkConfig.BaseURL, batchID) + } else { + url = fmt.Sprintf("%s/batches/%s", provider.networkConfig.BaseURL, batchID) + } + + provider.logger.Debug("gemini batch delete url: " + url) + providerUtils.SetExtraHeaders(ctx, req, provider.networkConfig.ExtraHeaders, nil) + req.SetRequestURI(url) + req.Header.SetMethod(http.MethodDelete) + if key.Value != "" { + req.Header.Set("x-goog-api-key", key.Value) + } + req.Header.SetContentType("application/json") + + // Make request + latency, bifrostErr := providerUtils.MakeRequestWithContext(ctx, provider.client, req, resp) + if bifrostErr != nil { + return nil, bifrostErr + } + + // Handle response + if resp.StatusCode() != fasthttp.StatusOK && resp.StatusCode() != fasthttp.StatusNoContent { + return nil, parseGeminiError(resp) + } + + return &schemas.BifrostBatchDeleteResponse{ + ID: request.BatchID, + Object: "batch", + Deleted: true, + ExtraFields: schemas.BifrostResponseExtraFields{ + RequestType: schemas.BatchDeleteRequest, + Provider: providerName, + Latency: latency.Milliseconds(), + }, + }, nil +} + // FileUpload uploads a file to Gemini. func (provider *GeminiProvider) FileUpload(ctx context.Context, key schemas.Key, request *schemas.BifrostFileUploadRequest) (*schemas.BifrostFileUploadResponse, *schemas.BifrostError) { if err := providerUtils.CheckOperationAllowed(schemas.Gemini, provider.customProviderConfig, schemas.FileUploadRequest); err != nil { diff --git a/core/providers/gemini/types.go b/core/providers/gemini/types.go index 03a910365..3f8198f8f 100644 --- a/core/providers/gemini/types.go +++ b/core/providers/gemini/types.go @@ -11,7 +11,6 @@ import ( "time" "github.com/bytedance/sonic" - "github.com/maximhq/bifrost/core/schemas" ) type Role string @@ -1621,14 +1620,90 @@ const ( GeminiBatchStateExpired = "BATCH_STATE_EXPIRED" ) +// Google GenAI SDK job states (different from internal batch states) +const ( + GeminiJobStateUnspecified = "JOB_STATE_UNSPECIFIED" + GeminiJobStateQueued = "JOB_STATE_QUEUED" + GeminiJobStatePending = "JOB_STATE_PENDING" + GeminiJobStateRunning = "JOB_STATE_RUNNING" + GeminiJobStateSucceeded = "JOB_STATE_SUCCEEDED" + GeminiJobStateFailed = "JOB_STATE_FAILED" + GeminiJobStateCancelling = "JOB_STATE_CANCELLING" + GeminiJobStateCancelled = "JOB_STATE_CANCELLED" + GeminiJobStatePaused = "JOB_STATE_PAUSED" +) + +// ==================== SDK BATCH TYPES ==================== +// These types are used for the Google GenAI SDK batch API format. + +// GeminiBatchCreateRequestSDK represents the SDK format for batch create requests. +// The SDK sends: batches.create(model="...", src=[...] or src="files/...") +type GeminiBatchCreateRequestSDK struct { + Model string `json:"model,omitempty"` + // Src can be either: + // - A string like "files/display_name" for file-based input + // - An array of inline request objects + Src interface{} `json:"src,omitempty"` +} + +// GeminiBatchInlineRequest represents a single inline request in SDK batch format. +// Format: {"contents": [...], "config": {...}} +type GeminiBatchInlineRequest struct { + Contents []Content `json:"contents,omitempty"` + Config *GeminiBatchInlineConfig `json:"config,omitempty"` +} + +// GeminiBatchInlineConfig represents the config object in an inline batch request. +type GeminiBatchInlineConfig struct { + ResponseModalities []string `json:"response_modalities,omitempty"` +} + +// GeminiBatchJobResponseSDK represents the SDK format for batch job responses. +// This matches what the Google GenAI Python SDK expects. +type GeminiBatchJobResponseSDK struct { + Name string `json:"name"` + State string `json:"state"` + Metadata *GeminiBatchMetadata `json:"metadata,omitempty"` + Dest *GeminiBatchDest `json:"dest,omitempty"` + Error *GeminiBatchErrorInfo `json:"error,omitempty"` +} + +// GeminiBatchListResponseSDK represents the SDK format for batch list responses. +type GeminiBatchListResponseSDK struct { + BatchJobs []GeminiBatchJobResponseSDK `json:"batchJobs,omitempty"` + NextPageToken string `json:"nextPageToken,omitempty"` +} + +// GeminiBatchListRequestSDK represents the SDK format for batch list requests. +type GeminiBatchListRequestSDK struct { + PageSize int `json:"pageSize,omitempty"` + PageToken string `json:"pageToken,omitempty"` +} + +// GeminiBatchRetrieveRequestSDK represents the SDK format for batch retrieve requests. +type GeminiBatchRetrieveRequestSDK struct { + Name string `json:"name"` +} + +// GeminiBatchCancelRequestSDK represents the SDK format for batch cancel requests. +type GeminiBatchCancelRequestSDK struct { + Name string `json:"name"` +} + +// GeminiBatchDeleteRequestSDK represents the SDK format for batch delete requests. +type GeminiBatchDeleteRequestSDK struct { + Name string `json:"name"` +} + // ==================== FILE TYPES ==================== // GeminiFileUploadRequest represents the request for uploading a file. type GeminiFileUploadRequest struct { - File []byte `json:"-"` // Raw file content (not serialized) - Filename string `json:"filename"` // Original filename - Purpose string `json:"purpose"` // Purpose of the file (e.g., "batch") - Provider schemas.ModelProvider `json:"provider"` + File []byte `json:"-"` // Raw file content (not serialized) + Filename string `json:"filename"` // Original filename + MimeType string `json:"mimeType"` // MIME type of the file + Purpose string `json:"purpose"` // Purpose of the file (e.g., "batch") + ResumableSessionID string `json:"-"` // Session ID for resumable uploads (internal use) } // GeminiFileListRequest represents the request for listing files. diff --git a/core/providers/mistral/batch.go b/core/providers/mistral/batch.go index e368910db..32b86f43f 100644 --- a/core/providers/mistral/batch.go +++ b/core/providers/mistral/batch.go @@ -32,3 +32,8 @@ func (provider *MistralProvider) BatchResults(ctx context.Context, key schemas.K return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) } +// BatchDelete is not supported by Mistral provider. +func (provider *MistralProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} + diff --git a/core/providers/ollama/ollama.go b/core/providers/ollama/ollama.go index de13e9a66..7e696db1b 100644 --- a/core/providers/ollama/ollama.go +++ b/core/providers/ollama/ollama.go @@ -248,6 +248,11 @@ func (provider *OllamaProvider) BatchResults(_ context.Context, _ schemas.Key, _ return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) } +// BatchDelete is not supported by Ollama provider. +func (provider *OllamaProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} + // FileUpload is not supported by Ollama provider. func (provider *OllamaProvider) FileUpload(_ context.Context, _ schemas.Key, _ *schemas.BifrostFileUploadRequest) (*schemas.BifrostFileUploadResponse, *schemas.BifrostError) { return nil, providerUtils.NewUnsupportedOperationError(schemas.FileUploadRequest, provider.GetProviderKey()) diff --git a/core/providers/openrouter/openrouter.go b/core/providers/openrouter/openrouter.go index 5885ef451..0ddb8e332 100644 --- a/core/providers/openrouter/openrouter.go +++ b/core/providers/openrouter/openrouter.go @@ -302,6 +302,11 @@ func (provider *OpenRouterProvider) BatchCancel(_ context.Context, _ schemas.Key return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchCancelRequest, provider.GetProviderKey()) } +// BatchDelete is not supported by OpenRouter provider. +func (provider *OpenRouterProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} + // BatchResults is not supported by OpenRouter provider. func (provider *OpenRouterProvider) BatchResults(_ context.Context, _ schemas.Key, _ *schemas.BifrostBatchResultsRequest) (*schemas.BifrostBatchResultsResponse, *schemas.BifrostError) { return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) diff --git a/core/providers/parasail/batch.go b/core/providers/parasail/batch.go index 67c9cacfc..f5358ccc6 100644 --- a/core/providers/parasail/batch.go +++ b/core/providers/parasail/batch.go @@ -32,3 +32,8 @@ func (provider *ParasailProvider) BatchResults(ctx context.Context, key schemas. return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) } +// BatchDelete is not supported by Parasail provider. +func (provider *ParasailProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} + diff --git a/core/providers/sgl/batch.go b/core/providers/sgl/batch.go index c142fa973..3e9d861df 100644 --- a/core/providers/sgl/batch.go +++ b/core/providers/sgl/batch.go @@ -32,3 +32,8 @@ func (provider *SGLProvider) BatchResults(_ context.Context, _ schemas.Key, _ *s return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey()) } +// BatchDelete is not supported by SGL provider. +func (provider *SGLProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) { + return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey()) +} + diff --git a/core/schemas/bifrost.go b/core/schemas/bifrost.go index 1c0cfd614..a73809843 100644 --- a/core/schemas/bifrost.go +++ b/core/schemas/bifrost.go @@ -100,6 +100,7 @@ const ( BatchRetrieveRequest RequestType = "batch_retrieve" BatchCancelRequest RequestType = "batch_cancel" BatchResultsRequest RequestType = "batch_results" + BatchDeleteRequest RequestType = "batch_delete" FileUploadRequest RequestType = "file_upload" FileListRequest RequestType = "file_list" FileRetrieveRequest RequestType = "file_retrieve" diff --git a/core/schemas/provider.go b/core/schemas/provider.go index f4a6a01b1..ef068746a 100644 --- a/core/schemas/provider.go +++ b/core/schemas/provider.go @@ -180,6 +180,7 @@ type AllowedRequests struct { BatchRetrieve bool `json:"batch_retrieve"` BatchCancel bool `json:"batch_cancel"` BatchResults bool `json:"batch_results"` + BatchDelete bool `json:"batch_delete"` FileUpload bool `json:"file_upload"` FileList bool `json:"file_list"` FileRetrieve bool `json:"file_retrieve"` @@ -228,6 +229,8 @@ func (ar *AllowedRequests) IsOperationAllowed(operation RequestType) bool { return ar.BatchCancel case BatchResultsRequest: return ar.BatchResults + case BatchDeleteRequest: + return ar.BatchDelete case FileUploadRequest: return ar.FileUpload case FileListRequest: @@ -346,6 +349,8 @@ type Provider interface { BatchCancel(ctx context.Context, key Key, request *BifrostBatchCancelRequest) (*BifrostBatchCancelResponse, *BifrostError) // BatchResults retrieves results from a completed batch job BatchResults(ctx context.Context, key Key, request *BifrostBatchResultsRequest) (*BifrostBatchResultsResponse, *BifrostError) + // BatchDelete deletes a batch job + BatchDelete(ctx context.Context, key Key, request *BifrostBatchDeleteRequest) (*BifrostBatchDeleteResponse, *BifrostError) // FileUpload uploads a file to the provider FileUpload(ctx context.Context, key Key, request *BifrostFileUploadRequest) (*BifrostFileUploadResponse, *BifrostError) // FileList lists files from the provider diff --git a/transports/bifrost-http/integrations/genai.go b/transports/bifrost-http/integrations/genai.go index a458ee7c1..fd194c320 100644 --- a/transports/bifrost-http/integrations/genai.go +++ b/transports/bifrost-http/integrations/genai.go @@ -2,12 +2,17 @@ package integrations import ( "context" + "crypto/rand" + "encoding/hex" "errors" "fmt" "io" "strconv" "strings" + "sync" + "time" + "github.com/bytedance/sonic" bifrost "github.com/maximhq/bifrost/core" "github.com/maximhq/bifrost/core/providers/gemini" "github.com/maximhq/bifrost/core/schemas" @@ -16,6 +21,52 @@ import ( "github.com/valyala/fasthttp" ) +// uploadSession stores metadata for resumable upload sessions +type uploadSession struct { + Filename string + MimeType string + SizeBytes int64 + Provider schemas.ModelProvider + CreatedAt time.Time +} + +// uploadSessions stores active upload sessions keyed by session ID +var uploadSessions = sync.Map{} + +// ErrResumableUploadInit is a sentinel error indicating the resumable upload init response was sent +var ErrResumableUploadInit = errors.New("resumable upload init handled") + +// Context key for flagging that response was already written +type contextKeyResponseWritten struct{} + +// Context key for storing original filename for resumable uploads +type contextKeyOriginalFilename struct{} + +// generateSessionID creates a unique session ID for resumable uploads +func generateSessionID() string { + bytes := make([]byte, 16) + rand.Read(bytes) + return hex.EncodeToString(bytes) +} + +// cleanupExpiredSessions removes sessions older than 1 hour +func init() { + go func() { + ticker := time.NewTicker(10 * time.Minute) + for range ticker.C { + now := time.Now() + uploadSessions.Range(func(key, value interface{}) bool { + if session, ok := value.(*uploadSession); ok { + if now.Sub(session.CreatedAt) > time.Hour { + uploadSessions.Delete(key) + } + } + return true + }) + } + }() +} + // GenAIRouter holds route registrations for genai endpoints. type GenAIRouter struct { *GenericRouter @@ -123,15 +174,26 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor Path: pathPrefix + "/upload/v1beta/files", Method: "POST", GetRequestTypeInstance: func() interface{} { - return &schemas.BifrostFileUploadRequest{} + return &gemini.GeminiFileUploadRequest{} }, RequestParser: parseGeminiFileUploadRequest, FileRequestConverter: func(ctx *context.Context, req interface{}) (*FileRequest, error) { - if uploadReq, ok := req.(*schemas.BifrostFileUploadRequest); ok { - uploadReq.Provider = schemas.Gemini + if geminiReq, ok := req.(*gemini.GeminiFileUploadRequest); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + // Convert Gemini request to Bifrost request + bifrostReq := &schemas.BifrostFileUploadRequest{ + Provider: provider, + File: geminiReq.File, + Filename: geminiReq.Filename, + Purpose: schemas.FilePurpose(geminiReq.Purpose), + } return &FileRequest{ Type: schemas.FileUploadRequest, - UploadRequest: uploadReq, + UploadRequest: bifrostReq, }, nil } return nil, errors.New("invalid file upload request type") @@ -145,6 +207,116 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { return gemini.ToGeminiError(err) }, + PreCallback: extractGeminiFileUploadParams, + }) + + // Resumable upload continuation endpoint - POST /upload/v1beta/files/resumable/{session_id} + // This handles phase 2 of resumable uploads where actual file content is sent + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/upload/v1beta/files/resumable/{session_id}", + Method: "POST", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiFileUploadRequest{} + }, + RequestParser: parseGeminiResumableUploadPhase2, + FileRequestConverter: func(ctx *context.Context, req interface{}) (*FileRequest, error) { + if geminiReq, ok := req.(*gemini.GeminiFileUploadRequest); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + // Convert Gemini request to Bifrost request + bifrostReq := &schemas.BifrostFileUploadRequest{ + Provider: provider, + File: geminiReq.File, + Filename: geminiReq.Filename, + Purpose: geminiReq.Purpose, + } + return &FileRequest{ + Type: schemas.FileUploadRequest, + UploadRequest: bifrostReq, + }, nil + } + return nil, errors.New("invalid file upload request type") + }, + FileUploadResponseConverter: func(ctx *context.Context, resp *schemas.BifrostFileUploadResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + fmt.Printf("[DEBUG] FileUploadResponseConverter (phase2 POST): using raw response\n") + return resp.ExtraFields.RawResponse, nil + } + result := gemini.ToGeminiFileUploadResponse(resp) + // If displayName is empty, use the original filename from context + if result.File.DisplayName == "" { + if originalFilename := (*ctx).Value(contextKeyOriginalFilename{}); originalFilename != nil { + if filename, ok := originalFilename.(string); ok && filename != "" { + result.File.DisplayName = filename + fmt.Printf("[DEBUG] FileUploadResponseConverter (phase2 POST): set displayName from context=%s\n", filename) + } + } + } + fmt.Printf("[DEBUG] FileUploadResponseConverter (phase2 POST): converted response=%+v\n", result) + return result, nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiResumableUploadParams, + PostCallback: setResumableUploadFinalStatus, + }) + + // Resumable upload continuation endpoint - PUT /upload/v1beta/files/resumable/{session_id} + // Some clients may use PUT instead of POST for resumable uploads + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/upload/v1beta/files/resumable/{session_id}", + Method: "PUT", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiFileUploadRequest{} + }, + RequestParser: parseGeminiResumableUploadPhase2, + FileRequestConverter: func(ctx *context.Context, req interface{}) (*FileRequest, error) { + if geminiReq, ok := req.(*gemini.GeminiFileUploadRequest); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + // Convert Gemini request to Bifrost request + bifrostReq := &schemas.BifrostFileUploadRequest{ + Provider: provider, + File: geminiReq.File, + Filename: geminiReq.Filename, + Purpose: geminiReq.Purpose, + } + return &FileRequest{ + Type: schemas.FileUploadRequest, + UploadRequest: bifrostReq, + }, nil + } + return nil, errors.New("invalid file upload request type") + }, + FileUploadResponseConverter: func(ctx *context.Context, resp *schemas.BifrostFileUploadResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + return resp.ExtraFields.RawResponse, nil + } + result := gemini.ToGeminiFileUploadResponse(resp) + // If displayName is empty, use the original filename from context + if result.File.DisplayName == "" { + if originalFilename := (*ctx).Value(contextKeyOriginalFilename{}); originalFilename != nil { + if filename, ok := originalFilename.(string); ok && filename != "" { + result.File.DisplayName = filename + } + } + } + return result, nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiResumableUploadParams, + PostCallback: setResumableUploadFinalStatus, }) // List files endpoint - GET /v1beta/files @@ -153,14 +325,25 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor Path: pathPrefix + "/v1beta/files", Method: "GET", GetRequestTypeInstance: func() interface{} { - return &schemas.BifrostFileListRequest{} + return &gemini.GeminiFileListRequest{} }, FileRequestConverter: func(ctx *context.Context, req interface{}) (*FileRequest, error) { - if listReq, ok := req.(*schemas.BifrostFileListRequest); ok { - listReq.Provider = schemas.Gemini + if geminiReq, ok := req.(*gemini.GeminiFileListRequest); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + // Convert Gemini request to Bifrost request + bifrostReq := &schemas.BifrostFileListRequest{ + Provider: provider, + Limit: geminiReq.Limit, + After: geminiReq.After, + Order: geminiReq.Order, + } return &FileRequest{ Type: schemas.FileListRequest, - ListRequest: listReq, + ListRequest: bifrostReq, }, nil } return nil, errors.New("invalid file list request type") @@ -183,14 +366,23 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor Path: pathPrefix + "/v1beta/files/{file_id}", Method: "GET", GetRequestTypeInstance: func() interface{} { - return &schemas.BifrostFileRetrieveRequest{} + return &gemini.GeminiFileRetrieveRequest{} }, FileRequestConverter: func(ctx *context.Context, req interface{}) (*FileRequest, error) { - if retrieveReq, ok := req.(*schemas.BifrostFileRetrieveRequest); ok { - retrieveReq.Provider = schemas.Gemini + if geminiReq, ok := req.(*gemini.GeminiFileRetrieveRequest); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + // Convert Gemini request to Bifrost request + bifrostReq := &schemas.BifrostFileRetrieveRequest{ + Provider: provider, + FileID: geminiReq.FileID, + } return &FileRequest{ Type: schemas.FileRetrieveRequest, - RetrieveRequest: retrieveReq, + RetrieveRequest: bifrostReq, }, nil } return nil, errors.New("invalid file retrieve request type") @@ -204,7 +396,7 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { return gemini.ToGeminiError(err) }, - PreCallback: extractGeminiFileIDFromPath, + PreCallback: extractGeminiFileRetrieveParams, }) // Delete file endpoint - DELETE /v1beta/files/{file_id} @@ -213,14 +405,23 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor Path: pathPrefix + "/v1beta/files/{file_id}", Method: "DELETE", GetRequestTypeInstance: func() interface{} { - return &schemas.BifrostFileDeleteRequest{} + return &gemini.GeminiFileDeleteRequest{} }, FileRequestConverter: func(ctx *context.Context, req interface{}) (*FileRequest, error) { - if deleteReq, ok := req.(*schemas.BifrostFileDeleteRequest); ok { - deleteReq.Provider = schemas.Gemini + if geminiReq, ok := req.(*gemini.GeminiFileDeleteRequest); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + // Convert Gemini request to Bifrost request + bifrostReq := &schemas.BifrostFileDeleteRequest{ + Provider: provider, + FileID: geminiReq.FileID, + } return &FileRequest{ Type: schemas.FileDeleteRequest, - DeleteRequest: deleteReq, + DeleteRequest: bifrostReq, }, nil } return nil, errors.New("invalid file delete request type") @@ -234,29 +435,529 @@ func CreateGenAIFileRouteConfigs(pathPrefix string, handlerStore lib.HandlerStor ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { return gemini.ToGeminiError(err) }, - PreCallback: extractGeminiFileIDFromPath, + PreCallback: extractGeminiFileDeleteParams, }) return routes } -// parseGeminiFileUploadRequest parses multipart/form-data for Gemini file upload requests +// CreateGenAIBatchRouteConfigs creates route configurations for Gemini Batch API endpoints. +func CreateGenAIBatchRouteConfigs(pathPrefix string, handlerStore lib.HandlerStore) []RouteConfig { + var routes []RouteConfig + + // Create batch endpoint - POST /v1beta/models/{model}:batchGenerateContent + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/v1beta/models/{model}:batchGenerateContent", + Method: "POST", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiBatchCreateRequestSDK{} + }, + BatchCreateRequestConverter: func(ctx *context.Context, req interface{}) (*BatchRequest, error) { + if sdkReq, ok := req.(*gemini.GeminiBatchCreateRequestSDK); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + + bifrostReq := &schemas.BifrostBatchCreateRequest{ + Provider: provider, + Model: sdkReq.Model, + } + + // Handle src field - can be string (file reference) or array (inline requests) + switch src := sdkReq.Src.(type) { + case string: + // File-based input: src="files/display_name" + // TrimPrefix is safe even if prefix doesn't exist + bifrostReq.InputFileID = strings.TrimPrefix(src, "files/") + case []interface{}: + // Inline requests: src=[{contents: [...], config: {...}}] + requests := make([]schemas.BatchRequestItem, 0, len(src)) + for i, item := range src { + if itemMap, ok := item.(map[string]interface{}); ok { + customID := fmt.Sprintf("request-%d", i) + requests = append(requests, schemas.BatchRequestItem{ + CustomID: customID, + Body: itemMap, + }) + } + } + bifrostReq.Requests = requests + } + + return &BatchRequest{ + Type: schemas.BatchCreateRequest, + CreateRequest: bifrostReq, + }, nil + } + return nil, errors.New("invalid batch create request type") + }, + BatchCreateResponseConverter: func(ctx *context.Context, resp *schemas.BifrostBatchCreateResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + return resp.ExtraFields.RawResponse, nil + } + return gemini.ToGeminiBatchJobResponse(resp), nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiBatchCreateParams, + }) + + // List batches endpoint - GET /v1beta/batches + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/v1beta/batches", + Method: "GET", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiBatchListRequestSDK{} + }, + BatchCreateRequestConverter: func(ctx *context.Context, req interface{}) (*BatchRequest, error) { + if sdkReq, ok := req.(*gemini.GeminiBatchListRequestSDK); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + + bifrostReq := &schemas.BifrostBatchListRequest{ + Provider: provider, + PageSize: sdkReq.PageSize, + } + if sdkReq.PageToken != "" { + bifrostReq.PageToken = &sdkReq.PageToken + } + return &BatchRequest{ + Type: schemas.BatchListRequest, + ListRequest: bifrostReq, + }, nil + } + return nil, errors.New("invalid batch list request type") + }, + BatchListResponseConverter: func(ctx *context.Context, resp *schemas.BifrostBatchListResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + return resp.ExtraFields.RawResponse, nil + } + return gemini.ToGeminiBatchListResponse(resp), nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiBatchListQueryParams, + }) + + // Retrieve batch endpoint - GET /v1beta/batches/{batch_id} + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/v1beta/batches/{batch_id}", + Method: "GET", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiBatchRetrieveRequestSDK{} + }, + BatchCreateRequestConverter: func(ctx *context.Context, req interface{}) (*BatchRequest, error) { + if sdkReq, ok := req.(*gemini.GeminiBatchRetrieveRequestSDK); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + + return &BatchRequest{ + Type: schemas.BatchRetrieveRequest, + RetrieveRequest: &schemas.BifrostBatchRetrieveRequest{ + Provider: provider, + BatchID: sdkReq.Name, + }, + }, nil + } + return nil, errors.New("invalid batch retrieve request type") + }, + BatchRetrieveResponseConverter: func(ctx *context.Context, resp *schemas.BifrostBatchRetrieveResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + return resp.ExtraFields.RawResponse, nil + } + return gemini.ToGeminiBatchRetrieveResponse(resp), nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiBatchIDFromPath, + }) + + // Cancel batch endpoint - POST /v1beta/batches/{batch_id}:cancel + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/v1beta/batches/{batch_id}:cancel", + Method: "POST", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiBatchCancelRequestSDK{} + }, + BatchCreateRequestConverter: func(ctx *context.Context, req interface{}) (*BatchRequest, error) { + if sdkReq, ok := req.(*gemini.GeminiBatchCancelRequestSDK); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + + return &BatchRequest{ + Type: schemas.BatchCancelRequest, + CancelRequest: &schemas.BifrostBatchCancelRequest{ + Provider: provider, + BatchID: sdkReq.Name, + }, + }, nil + } + return nil, errors.New("invalid batch cancel request type") + }, + BatchCancelResponseConverter: func(ctx *context.Context, resp *schemas.BifrostBatchCancelResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + return resp.ExtraFields.RawResponse, nil + } + return gemini.ToGeminiBatchCancelResponse(resp), nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiBatchIDFromPathCancel, + }) + + // Delete batch endpoint - DELETE /v1beta/batches/{batch_id} + routes = append(routes, RouteConfig{ + Type: RouteConfigTypeGenAI, + Path: pathPrefix + "/v1beta/batches/{batch_id}", + Method: "DELETE", + GetRequestTypeInstance: func() interface{} { + return &gemini.GeminiBatchDeleteRequestSDK{} + }, + BatchCreateRequestConverter: func(ctx *context.Context, req interface{}) (*BatchRequest, error) { + if sdkReq, ok := req.(*gemini.GeminiBatchDeleteRequestSDK); ok { + // Get provider from context + provider := schemas.Gemini + if p := (*ctx).Value(bifrostContextKeyProvider); p != nil { + provider = p.(schemas.ModelProvider) + } + + return &BatchRequest{ + Type: schemas.BatchDeleteRequest, + DeleteRequest: &schemas.BifrostBatchDeleteRequest{ + Provider: provider, + BatchID: sdkReq.Name, + }, + }, nil + } + return nil, errors.New("invalid batch delete request type") + }, + BatchDeleteResponseConverter: func(ctx *context.Context, resp *schemas.BifrostBatchDeleteResponse) (interface{}, error) { + if resp.ExtraFields.RawResponse != nil { + return resp.ExtraFields.RawResponse, nil + } + // Return empty object on successful delete + return map[string]interface{}{}, nil + }, + ErrorConverter: func(ctx *context.Context, err *schemas.BifrostError) interface{} { + return gemini.ToGeminiError(err) + }, + PreCallback: extractGeminiBatchIDFromPath, + }) + + return routes +} + +// extractGeminiBatchCreateParams extracts provider from header and model from URL for batch create +func extractGeminiBatchCreateParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + + // Extract model from URL path + model := ctx.UserValue("model") + if model != nil { + modelStr := model.(string) + // Remove :batchGenerateContent suffix if present + modelStr = strings.TrimSuffix(modelStr, ":batchGenerateContent") + if sdkReq, ok := req.(*gemini.GeminiBatchCreateRequestSDK); ok { + sdkReq.Model = modelStr + } + } + + return nil +} + +// extractGeminiBatchListQueryParams extracts query parameters for batch list requests +func extractGeminiBatchListQueryParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + + if listReq, ok := req.(*gemini.GeminiBatchListRequestSDK); ok { + // Extract pageSize from query parameters + if pageSizeStr := string(ctx.QueryArgs().Peek("pageSize")); pageSizeStr != "" { + if pageSize, err := strconv.Atoi(pageSizeStr); err == nil { + listReq.PageSize = pageSize + } + } + + // Extract pageToken from query parameters + if pageToken := string(ctx.QueryArgs().Peek("pageToken")); pageToken != "" { + listReq.PageToken = pageToken + } + } + + return nil +} + +// extractGeminiBatchIDFromPath extracts batch_id from path parameters +func extractGeminiBatchIDFromPath(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + + batchID := ctx.UserValue("batch_id") + if batchID == nil { + return errors.New("batch_id is required") + } + + batchIDStr, ok := batchID.(string) + if !ok || batchIDStr == "" { + return errors.New("batch_id must be a non-empty string") + } + + // Ensure batch ID has proper format (batches/xxx) + if !strings.HasPrefix(batchIDStr, "batches/") { + batchIDStr = "batches/" + batchIDStr + } + + switch r := req.(type) { + case *gemini.GeminiBatchRetrieveRequestSDK: + r.Name = batchIDStr + case *gemini.GeminiBatchDeleteRequestSDK: + r.Name = batchIDStr + } + + return nil +} + +// extractGeminiBatchIDFromPathCancel extracts batch_id from path for cancel requests +func extractGeminiBatchIDFromPathCancel(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + + batchID := ctx.UserValue("batch_id") + if batchID == nil { + return errors.New("batch_id is required") + } + + batchIDStr, ok := batchID.(string) + if !ok || batchIDStr == "" { + return errors.New("batch_id must be a non-empty string") + } + + // Remove :cancel suffix if present (from URL pattern matching) + batchIDStr = strings.TrimSuffix(batchIDStr, ":cancel") + + // Ensure batch ID has proper format (batches/xxx) + if !strings.HasPrefix(batchIDStr, "batches/") { + batchIDStr = "batches/" + batchIDStr + } + + if cancelReq, ok := req.(*gemini.GeminiBatchCancelRequestSDK); ok { + cancelReq.Name = batchIDStr + } + + return nil +} + +// parseGeminiFileUploadRequest parses file upload requests from the Google GenAI SDK. +// It handles both standard multipart uploads and resumable uploads by intercepting +// and converting them into a standard in-memory payload. func parseGeminiFileUploadRequest(ctx *fasthttp.RequestCtx, req interface{}) error { - uploadReq, ok := req.(*schemas.BifrostFileUploadRequest) + uploadReq, ok := req.(*gemini.GeminiFileUploadRequest) if !ok { return errors.New("invalid request type for file upload") } + contentType := string(ctx.Request.Header.ContentType()) + // Check for resumable upload protocol (Google GenAI SDK uses this) + uploadProtocol := string(ctx.Request.Header.Peek("X-Goog-Upload-Protocol")) + + fmt.Printf("[DEBUG] parseGeminiFileUploadRequest: contentType=%s, uploadProtocol=%s, path=%s\n", contentType, uploadProtocol, string(ctx.Path())) + + if uploadProtocol == "resumable" || uploadProtocol == "multipart" { + // Handle Google GenAI SDK resumable/multipart upload format + return parseGeminiResumableUpload(ctx, uploadReq, contentType) + } + + // Standard multipart/form-data upload + if strings.HasPrefix(contentType, "multipart/") { + return parseGeminiMultipartUpload(ctx, uploadReq) + } + + // Raw body upload (single file content) + return parseGeminiRawUpload(ctx, uploadReq) +} + +// parseGeminiResumableUpload handles Google GenAI SDK resumable upload format. +// The SDK sends requests with X-Goog-Upload-Protocol header and may include +// metadata and file content in a multipart related format. +func parseGeminiResumableUpload(ctx *fasthttp.RequestCtx, uploadReq *gemini.GeminiFileUploadRequest, contentType string) error { + body := ctx.Request.Body() + + fmt.Printf("[DEBUG] parseGeminiResumableUpload: contentType=%s, bodyLen=%d\n", contentType, len(body)) + + // Check if it's multipart/related (metadata + file content) + if strings.HasPrefix(contentType, "multipart/related") { + fmt.Printf("[DEBUG] parseGeminiResumableUpload: handling multipart/related\n") + return parseGeminiMultipartRelated(ctx, uploadReq, body, contentType) + } + + // Check if this is just metadata (start of resumable upload) + if strings.HasPrefix(contentType, "application/json") { + fmt.Printf("[DEBUG] parseGeminiResumableUpload: handling JSON metadata, body=%s\n", string(body)) + // This is the initial request with just metadata + // Parse the metadata - Google GenAI SDK sends snake_case fields + var metadata struct { + File struct { + DisplayName string `json:"display_name"` + MimeType string `json:"mime_type"` + SizeBytes int64 `json:"size_bytes"` + } `json:"file"` + } + if err := sonic.Unmarshal(body, &metadata); err == nil { + fmt.Printf("[DEBUG] parseGeminiResumableUpload: parsed metadata - displayName=%s, mimeType=%s, sizeBytes=%d\n", metadata.File.DisplayName, metadata.File.MimeType, metadata.File.SizeBytes) + uploadReq.Filename = metadata.File.DisplayName + uploadReq.MimeType = metadata.File.MimeType + + // Create a session to store metadata for the second request + sessionID := generateSessionID() + fmt.Printf("[DEBUG] parseGeminiResumableUpload: created session ID=%s\n", sessionID) + + session := &uploadSession{ + Filename: metadata.File.DisplayName, + MimeType: metadata.File.MimeType, + SizeBytes: metadata.File.SizeBytes, + CreatedAt: time.Now(), + } + uploadSessions.Store(sessionID, session) + + // Store session ID on request to signal special response handling in PreCallback + uploadReq.ResumableSessionID = sessionID + } else { + fmt.Printf("[DEBUG] parseGeminiResumableUpload: failed to parse metadata: %v\n", err) + } + // For initial metadata-only request, file content will come in subsequent request + return nil + } + + fmt.Printf("[DEBUG] parseGeminiResumableUpload: handling raw file content\n") + // Assume raw file content + uploadReq.File = make([]byte, len(body)) + copy(uploadReq.File, body) + return nil +} + +// parseGeminiMultipartRelated parses multipart/related format used by Google GenAI SDK. +// Format: boundary-separated parts with metadata JSON and file content. +func parseGeminiMultipartRelated(ctx *fasthttp.RequestCtx, uploadReq *gemini.GeminiFileUploadRequest, body []byte, contentType string) error { + // Extract boundary from content type + boundary := "" + for _, param := range strings.Split(contentType, ";") { + param = strings.TrimSpace(param) + if strings.HasPrefix(param, "boundary=") { + boundary = strings.TrimPrefix(param, "boundary=") + boundary = strings.Trim(boundary, "\"") + break + } + } + + if boundary == "" { + return errors.New("missing boundary in multipart/related content type") + } + + // Split body by boundary + delimiter := "--" + boundary + parts := strings.Split(string(body), delimiter) + + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" || part == "--" { + continue + } + + // Split headers from content + headerEnd := strings.Index(part, "\r\n\r\n") + if headerEnd == -1 { + headerEnd = strings.Index(part, "\n\n") + if headerEnd == -1 { + continue + } + } + + headers := part[:headerEnd] + content := part[headerEnd:] + content = strings.TrimPrefix(content, "\r\n\r\n") + content = strings.TrimPrefix(content, "\n\n") + + // Check content type of this part + headersLower := strings.ToLower(headers) + if strings.Contains(headersLower, "application/json") { + // This is metadata - Google GenAI SDK sends snake_case fields + var metadata struct { + File struct { + DisplayName string `json:"display_name"` + MimeType string `json:"mime_type"` + } `json:"file"` + } + if err := sonic.Unmarshal([]byte(content), &metadata); err == nil { + if metadata.File.DisplayName != "" { + uploadReq.Filename = metadata.File.DisplayName + } + if metadata.File.MimeType != "" { + uploadReq.MimeType = metadata.File.MimeType + } + } + } else { + // This is file content + uploadReq.File = []byte(content) + } + } + + return nil +} - // Parse multipart form +// parseGeminiMultipartUpload handles standard multipart/form-data uploads. +func parseGeminiMultipartUpload(ctx *fasthttp.RequestCtx, uploadReq *gemini.GeminiFileUploadRequest) error { form, err := ctx.MultipartForm() if err != nil { return err } - // Extract metadata (optional JSON with displayName) - if metadataValues := form.Value["metadata"]; len(metadataValues) > 0 && metadataValues[0] != "" { - // Could parse JSON metadata to extract displayName - // For now, just use filename from file header + // Parse metadata field if present (JSON with displayName) + if metadataValues := form.Value["metadata"]; len(metadataValues) > 0 { + var metadata struct { + File struct { + DisplayName string `json:"displayName"` + } `json:"file"` + } + if err := sonic.Unmarshal([]byte(metadataValues[0]), &metadata); err == nil { + if metadata.File.DisplayName != "" { + uploadReq.Filename = metadata.File.DisplayName + } + } } // Extract file (required) @@ -279,16 +980,208 @@ func parseGeminiFileUploadRequest(ctx *fasthttp.RequestCtx, req interface{}) err } uploadReq.File = fileData - uploadReq.Filename = fileHeader.Filename + if uploadReq.Filename == "" { + uploadReq.Filename = fileHeader.Filename + } + + return nil +} + +// parseGeminiRawUpload handles raw body uploads (file content only). +func parseGeminiRawUpload(ctx *fasthttp.RequestCtx, uploadReq *gemini.GeminiFileUploadRequest) error { + body := ctx.Request.Body() + if len(body) == 0 { + return errors.New("file content is required") + } + + uploadReq.File = make([]byte, len(body)) + copy(uploadReq.File, body) + + // Try to get filename from Content-Disposition header + contentDisposition := string(ctx.Request.Header.Peek("Content-Disposition")) + if contentDisposition != "" { + for _, param := range strings.Split(contentDisposition, ";") { + param = strings.TrimSpace(param) + if strings.HasPrefix(param, "filename=") { + filename := strings.TrimPrefix(param, "filename=") + filename = strings.Trim(filename, "\"") + uploadReq.Filename = filename + break + } + } + } + + return nil +} + +// parseGeminiResumableUploadPhase2 handles phase 2 of resumable uploads where actual file content is sent +func parseGeminiResumableUploadPhase2(ctx *fasthttp.RequestCtx, req interface{}) error { + fmt.Printf("[DEBUG] parseGeminiResumableUploadPhase2: called, path=%s\n", string(ctx.Path())) + + uploadReq, ok := req.(*gemini.GeminiFileUploadRequest) + if !ok { + return errors.New("invalid request type for file upload") + } + + // Get session ID from URL path + sessionID := ctx.UserValue("session_id") + fmt.Printf("[DEBUG] parseGeminiResumableUploadPhase2: sessionID from path=%v\n", sessionID) + if sessionID == nil { + return errors.New("session_id is required") + } + + sessionIDStr, ok := sessionID.(string) + if !ok || sessionIDStr == "" { + return errors.New("session_id must be a non-empty string") + } + + // Retrieve session metadata + sessionVal, ok := uploadSessions.Load(sessionIDStr) + fmt.Printf("[DEBUG] parseGeminiResumableUploadPhase2: session found=%v\n", ok) + if !ok { + return errors.New("upload session not found or expired") + } + + session, ok := sessionVal.(*uploadSession) + if !ok { + return errors.New("invalid session data") + } + + // Get file content from request body + body := ctx.Request.Body() + fmt.Printf("[DEBUG] parseGeminiResumableUploadPhase2: bodyLen=%d, filename=%s, provider=%s\n", len(body), session.Filename, session.Provider) + if len(body) == 0 { + return errors.New("file content is required") + } + + // Populate the upload request with session metadata and file content + uploadReq.File = make([]byte, len(body)) + copy(uploadReq.File, body) + uploadReq.Filename = session.Filename + uploadReq.MimeType = session.MimeType + uploadReq.Purpose = "batch" // Default purpose for file uploads via GenAI API + + // Store session ID for provider extraction in PreCallback + // NOTE: Don't delete session here - PreCallback needs to read provider from it + uploadReq.ResumableSessionID = sessionIDStr + + fmt.Printf("[DEBUG] parseGeminiResumableUploadPhase2: successfully prepared upload request\n") + return nil +} + +// setResumableUploadFinalStatus sets the X-Goog-Upload-Status header to "final" for phase 2 responses +func setResumableUploadFinalStatus(ctx *fasthttp.RequestCtx, req interface{}, resp interface{}) error { + // Set the upload status to final to signal completion of resumable upload + ctx.Response.Header.Set("X-Goog-Upload-Status", "final") + + // Log the response for debugging + respJSON, _ := sonic.Marshal(resp) + fmt.Printf("[DEBUG] setResumableUploadFinalStatus: set X-Goog-Upload-Status=final, response body=%s\n", string(respJSON)) + + // Also log the full response headers for debugging + fmt.Printf("[DEBUG] setResumableUploadFinalStatus: status code=%d\n", ctx.Response.StatusCode()) + + return nil +} + +// extractGeminiResumableUploadParams extracts provider from session for resumable upload phase 2 +func extractGeminiResumableUploadParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Get session ID from URL path + sessionID := ctx.UserValue("session_id") + if sessionID == nil { + return errors.New("session_id is required") + } + + sessionIDStr, ok := sessionID.(string) + if !ok || sessionIDStr == "" { + return errors.New("session_id must be a non-empty string") + } + + // Get provider and filename from session (stored during phase 1) + provider := schemas.Gemini + var originalFilename string + if sessionVal, ok := uploadSessions.Load(sessionIDStr); ok { + if session, ok := sessionVal.(*uploadSession); ok { + if session.Provider != "" { + provider = session.Provider + } + originalFilename = session.Filename + } + // Clean up the session now that we've extracted the data + uploadSessions.Delete(sessionIDStr) + } + + fmt.Printf("[DEBUG] extractGeminiResumableUploadParams: sessionID=%s, provider=%s, filename=%s\n", sessionIDStr, provider, originalFilename) + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, provider) + // Store original filename in context for response converter + *bifrostCtx = context.WithValue(*bifrostCtx, contextKeyOriginalFilename{}, originalFilename) + return nil +} + +// extractGeminiFileUploadParams extracts provider from header for file upload requests +// and handles resumable upload init by returning the upload URL +func extractGeminiFileUploadParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + + fmt.Printf("[DEBUG] extractGeminiFileUploadParams: provider=%s\n", provider) + + // Check if this is a resumable upload init (metadata-only request) + if uploadReq, ok := req.(*gemini.GeminiFileUploadRequest); ok { + fmt.Printf("[DEBUG] extractGeminiFileUploadParams: resumableSessionID=%s, fileLen=%d\n", uploadReq.ResumableSessionID, len(uploadReq.File)) + if uploadReq.ResumableSessionID != "" { + // Update the session with the provider + if sessionVal, ok := uploadSessions.Load(uploadReq.ResumableSessionID); ok { + if session, ok := sessionVal.(*uploadSession); ok { + session.Provider = schemas.ModelProvider(provider) + } + } + + // Build the upload URL for phase 2 + // Use the request's host and scheme to build the URL + scheme := "http" + if ctx.IsTLS() { + scheme = "https" + } + host := string(ctx.Host()) + uploadURL := fmt.Sprintf("%s://%s/genai/upload/v1beta/files/resumable/%s", scheme, host, uploadReq.ResumableSessionID) + + fmt.Printf("[DEBUG] extractGeminiFileUploadParams: returning upload URL=%s\n", uploadURL) + + // Send the upload URL response + ctx.Response.Header.Set("X-Goog-Upload-URL", uploadURL) + ctx.Response.Header.Set("X-Goog-Upload-Status", "active") + ctx.Response.Header.SetContentType("application/json") + ctx.SetStatusCode(200) + + // Return empty JSON object as response body + ctx.SetBody([]byte("{}")) + + // Mark that response was written + *bifrostCtx = context.WithValue(*bifrostCtx, contextKeyResponseWritten{}, true) + + // Return sentinel error to signal router to skip further processing + return ErrResumableUploadInit + } + } return nil } // extractGeminiFileListQueryParams extracts query parameters for Gemini file list requests func extractGeminiFileListQueryParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { - if listReq, ok := req.(*schemas.BifrostFileListRequest); ok { - listReq.Provider = schemas.Gemini + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + if listReq, ok := req.(*gemini.GeminiFileListRequest); ok { // Extract pageSize from query parameters if pageSizeStr := string(ctx.QueryArgs().Peek("pageSize")); pageSizeStr != "" { if pageSize, err := strconv.Atoi(pageSizeStr); err == nil { @@ -305,8 +1198,15 @@ func extractGeminiFileListQueryParams(ctx *fasthttp.RequestCtx, bifrostCtx *cont return nil } -// extractGeminiFileIDFromPath extracts file_id from path parameters for Gemini -func extractGeminiFileIDFromPath(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { +// extractGeminiFileRetrieveParams extracts file_id and provider for file retrieve requests +func extractGeminiFileRetrieveParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + fileID := ctx.UserValue("file_id") if fileID == nil { return errors.New("file_id is required") @@ -317,13 +1217,34 @@ func extractGeminiFileIDFromPath(ctx *fasthttp.RequestCtx, bifrostCtx *context.C return errors.New("file_id must be a non-empty string") } - switch r := req.(type) { - case *schemas.BifrostFileRetrieveRequest: - r.FileID = fileIDStr - r.Provider = schemas.Gemini - case *schemas.BifrostFileDeleteRequest: - r.FileID = fileIDStr - r.Provider = schemas.Gemini + if retrieveReq, ok := req.(*gemini.GeminiFileRetrieveRequest); ok { + retrieveReq.FileID = fileIDStr + } + + return nil +} + +// extractGeminiFileDeleteParams extracts file_id and provider for file delete requests +func extractGeminiFileDeleteParams(ctx *fasthttp.RequestCtx, bifrostCtx *context.Context, req interface{}) error { + // Extract provider from header, default to Gemini + provider := string(ctx.Request.Header.Peek("x-model-provider")) + if provider == "" { + provider = string(schemas.Gemini) + } + *bifrostCtx = context.WithValue(*bifrostCtx, bifrostContextKeyProvider, schemas.ModelProvider(provider)) + + fileID := ctx.UserValue("file_id") + if fileID == nil { + return errors.New("file_id is required") + } + + fileIDStr, ok := fileID.(string) + if !ok || fileIDStr == "" { + return errors.New("file_id must be a non-empty string") + } + + if deleteReq, ok := req.(*gemini.GeminiFileDeleteRequest); ok { + deleteReq.FileID = fileIDStr } return nil @@ -333,6 +1254,7 @@ func extractGeminiFileIDFromPath(ctx *fasthttp.RequestCtx, bifrostCtx *context.C func NewGenAIRouter(client *bifrost.Bifrost, handlerStore lib.HandlerStore, logger schemas.Logger) *GenAIRouter { routes := CreateGenAIRouteConfigs("/genai") routes = append(routes, CreateGenAIFileRouteConfigs("/genai", handlerStore)...) + routes = append(routes, CreateGenAIBatchRouteConfigs("/genai", handlerStore)...) return &GenAIRouter{ GenericRouter: NewGenericRouter(client, handlerStore, routes, logger), diff --git a/transports/bifrost-http/integrations/router.go b/transports/bifrost-http/integrations/router.go index 8eff01453..bc059c393 100644 --- a/transports/bifrost-http/integrations/router.go +++ b/transports/bifrost-http/integrations/router.go @@ -85,6 +85,7 @@ type BatchRequest struct { RetrieveRequest *schemas.BifrostBatchRetrieveRequest CancelRequest *schemas.BifrostBatchCancelRequest ResultsRequest *schemas.BifrostBatchResultsRequest + DeleteRequest *schemas.BifrostBatchDeleteRequest } // FileRequest wraps a Bifrost file request with its type information. @@ -155,6 +156,10 @@ type BatchCancelResponseConverter func(ctx *context.Context, resp *schemas.Bifro // It takes a BifrostBatchResultsResponse and returns the format expected by the specific integration. type BatchResultsResponseConverter func(ctx *context.Context, resp *schemas.BifrostBatchResultsResponse) (interface{}, error) +// BatchDeleteResponseConverter is a function that converts BifrostBatchDeleteResponse to integration-specific format. +// It takes a BifrostBatchDeleteResponse and returns the format expected by the specific integration. +type BatchDeleteResponseConverter func(ctx *context.Context, resp *schemas.BifrostBatchDeleteResponse) (interface{}, error) + // FileUploadResponseConverter is a function that converts BifrostFileUploadResponse to integration-specific format. // It takes a BifrostFileUploadResponse and returns the format expected by the specific integration. type FileUploadResponseConverter func(ctx *context.Context, resp *schemas.BifrostFileUploadResponse) (interface{}, error) @@ -283,6 +288,7 @@ type RouteConfig struct { BatchRetrieveResponseConverter BatchRetrieveResponseConverter // Function to convert BifrostBatchRetrieveResponse to integration format BatchCancelResponseConverter BatchCancelResponseConverter // Function to convert BifrostBatchCancelResponse to integration format BatchResultsResponseConverter BatchResultsResponseConverter // Function to convert BifrostBatchResultsResponse to integration format + BatchDeleteResponseConverter BatchDeleteResponseConverter // Function to convert BifrostBatchDeleteResponse to integration format FileUploadResponseConverter FileUploadResponseConverter // Function to convert BifrostFileUploadResponse to integration format FileListResponseConverter FileListResponseConverter // Function to convert BifrostFileListResponse to integration format FileRetrieveResponseConverter FileRetrieveResponseConverter // Function to convert BifrostFileRetrieveResponse to integration format @@ -418,6 +424,11 @@ func (g *GenericRouter) createHandler(config RouteConfig) fasthttp.RequestHandle // or performing request validation after parsing if config.PreCallback != nil { if err := config.PreCallback(ctx, bifrostCtx, req); err != nil { + // Check if this is a resumable upload init that was already handled + if err == ErrResumableUploadInit { + // Response was already written by the PreCallback, just return + return + } g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute pre-request callback: "+err.Error())) return } @@ -811,6 +822,28 @@ func (g *GenericRouter) handleBatchRequest(ctx *fasthttp.RequestCtx, config Rout response = batchResponse } + case schemas.BatchDeleteRequest: + if batchReq.DeleteRequest == nil { + g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Invalid batch delete request")) + return + } + batchResponse, bifrostErr := g.client.BatchDeleteRequest(requestCtx, batchReq.DeleteRequest) + if bifrostErr != nil { + g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) + return + } + if config.PostCallback != nil { + if err := config.PostCallback(ctx, req, batchResponse); err != nil { + g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) + return + } + } + if config.BatchDeleteResponseConverter != nil { + response, err = config.BatchDeleteResponseConverter(bifrostCtx, batchResponse) + } else { + response = batchResponse + } + default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown batch request type")) return @@ -837,19 +870,31 @@ func (g *GenericRouter) handleFileRequest(ctx *fasthttp.RequestCtx, config Route g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Invalid file upload request")) return } + fmt.Printf("[DEBUG] router: calling FileUploadRequest for provider=%s, purpose=%s, filename=%s\n", fileReq.UploadRequest.Provider, fileReq.UploadRequest.Purpose, fileReq.UploadRequest.Filename) fileResponse, bifrostErr := g.client.FileUploadRequest(requestCtx, fileReq.UploadRequest) if bifrostErr != nil { + errMsg := "unknown error" + if bifrostErr.Error != nil { + errMsg = bifrostErr.Error.Message + } + fmt.Printf("[DEBUG] router: FileUploadRequest error: %s (provider=%s)\n", errMsg, fileReq.UploadRequest.Provider) g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } + fmt.Printf("[DEBUG] router: FileUploadRequest success, response ID=%s\n", fileResponse.ID) if config.PostCallback != nil { + fmt.Printf("[DEBUG] router: calling PostCallback\n") if err := config.PostCallback(ctx, req, fileResponse); err != nil { + fmt.Printf("[DEBUG] router: PostCallback error: %v\n", err) g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } + fmt.Printf("[DEBUG] router: PostCallback success\n") } if config.FileUploadResponseConverter != nil { + fmt.Printf("[DEBUG] router: calling FileUploadResponseConverter\n") response, err = config.FileUploadResponseConverter(bifrostCtx, fileResponse) + fmt.Printf("[DEBUG] router: FileUploadResponseConverter done, err=%v\n", err) } else { response = fileResponse } @@ -974,6 +1019,7 @@ func (g *GenericRouter) handleFileRequest(ctx *fasthttp.RequestCtx, config Route } if err != nil { + fmt.Printf("[DEBUG] router: file response conversion error: %v\n", err) g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file response")) return } diff --git a/transports/bifrost-http/integrations/utils.go b/transports/bifrost-http/integrations/utils.go index 02ffa8b63..32de98b4e 100644 --- a/transports/bifrost-http/integrations/utils.go +++ b/transports/bifrost-http/integrations/utils.go @@ -191,7 +191,9 @@ func (g *GenericRouter) sendSuccess(ctx *fasthttp.RequestCtx, bifrostCtx *contex return } + ctx.Response.Header.Set("Content-Length", fmt.Sprintf("%d", len(responseBody))) ctx.SetBody(responseBody) + fmt.Printf("[DEBUG] sendSuccess: status=200, contentLen=%d, body=%s\n", len(responseBody), string(responseBody)) } // extractAndParseFallbacks extracts fallbacks from the integration request and adds them to the BifrostRequest