From 65163ac2e1978b0352b9f0c337186a93bc179be7 Mon Sep 17 00:00:00 2001 From: chibie Date: Thu, 19 Mar 2026 14:49:50 +0100 Subject: [PATCH] revert(ent): restore generate directive --- services/priority_queue.go | 245 +++++++++++++---------- services/provider_selection_constants.go | 34 ++++ services/provider_selection_helpers.go | 55 +++++ tasks/startup.go | 16 -- utils/utils.go | 218 +++++--------------- 5 files changed, 282 insertions(+), 286 deletions(-) create mode 100644 services/provider_selection_constants.go create mode 100644 services/provider_selection_helpers.go diff --git a/services/priority_queue.go b/services/priority_queue.go index 363e4513..6cff9806 100644 --- a/services/priority_queue.go +++ b/services/priority_queue.go @@ -2,9 +2,9 @@ package services import ( "context" - "errors" "fmt" "math/rand" + "sort" "strings" "time" @@ -340,6 +340,24 @@ func (s *PriorityQueueService) GetProviderRate(ctx context.Context, provider *en return rate, nil } +func providerOrderTokenRate(orderToken *ent.ProviderOrderToken, side RateSide) decimal.Decimal { + if orderToken == nil || orderToken.Edges.Currency == nil { + return decimal.Zero + } + switch side { + case RateSideBuy: + if !orderToken.FixedBuyRate.IsZero() { + return orderToken.FixedBuyRate + } + return orderToken.Edges.Currency.MarketBuyRate.Add(orderToken.FloatingBuyDelta).RoundBank(2) + default: + if !orderToken.FixedSellRate.IsZero() { + return orderToken.FixedSellRate + } + return orderToken.Edges.Currency.MarketSellRate.Add(orderToken.FloatingSellDelta).RoundBank(2) + } +} + // deleteQueue deletes existing circular queue func (s *PriorityQueueService) deleteQueue(ctx context.Context, key string) error { _, err := storage.RedisClient.Del(ctx, key).Result() @@ -699,35 +717,8 @@ func (s *PriorityQueueService) tryUsePreSetProvider(ctx context.Context, order t // AssignPaymentOrder assigns payment orders to providers func (s *PriorityQueueService) AssignPaymentOrder(ctx context.Context, order types.PaymentOrderFields) error { - orderIDPrefix := strings.Split(order.ID.String(), "-")[0] orderConf := config.OrderConfig() - // Both regular and OTC orders must have a provision bucket, unless order has a pre-set private provider (private orders don't require buckets). - allowNilBucketForPrivate := false - if order.ProvisionBucket == nil { - if order.ProviderID != "" { - providerForCheck, pErr := storage.Client.ProviderProfile.Query().Where(providerprofile.IDEQ(order.ProviderID)).Only(ctx) - if pErr == nil && providerForCheck != nil && providerForCheck.VisibilityMode == providerprofile.VisibilityModePrivate { - allowNilBucketForPrivate = true - } - } - if !allowNilBucketForPrivate { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "OrderType": order.OrderType, - "Reason": "internal: Order missing provision bucket", - }).Errorf("AssignPaymentOrder.MissingProvisionBucket") - return fmt.Errorf("order %s (type: %s) is missing provision bucket", order.ID.String(), order.OrderType) - } - } else if order.ProvisionBucket.Edges.Currency == nil { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "OrderType": order.OrderType, - "Reason": "internal: Provision bucket missing currency", - }).Errorf("AssignPaymentOrder.MissingCurrency") - return fmt.Errorf("provision bucket for order %s (type: %s) is missing currency", order.ID.String(), order.OrderType) - } - // Defensive check: Verify order is in a valid state for assignment // This prevents duplicate assignments from concurrent sources currentOrder, err := storage.Client.PaymentOrder.Get(ctx, order.ID) @@ -834,47 +825,129 @@ func (s *PriorityQueueService) AssignPaymentOrder(ctx context.Context, order typ } } - // Private orders with nil bucket only use the pre-set path above; do not run queue matching. - if order.ProvisionBucket == nil { - return nil + currency, err := resolveInstitutionCurrency(ctx, order.Institution) + if err != nil { + return err + } + if order.Token == nil { + return fmt.Errorf("order %s has no token", order.ID) + } + network := order.Network + if network == nil && order.Token.Edges.Network != nil { + network = order.Token.Edges.Network + } + if network == nil { + network, err = order.Token.QueryNetwork().Only(ctx) + if err != nil { + return err + } } - // Use same side-suffixed key as buildQueueForSide so reads/writes use identical keys (rateSide computed above) - baseRedisKey := fmt.Sprintf("bucket_%s_%s_%s", order.ProvisionBucket.Edges.Currency.Code, order.ProvisionBucket.MinAmount, order.ProvisionBucket.MaxAmount) - redisKey := baseRedisKey + "_" + string(rateSide) - - err = s.matchRate(ctx, redisKey, orderIDPrefix, order, excludeList) + candidates, err := storage.Client.ProviderOrderToken.Query(). + Where( + providerordertoken.NetworkEQ(network.Identifier), + providerordertoken.HasTokenWith(token.IDEQ(order.Token.ID)), + providerordertoken.HasCurrencyWith(fiatcurrency.IDEQ(currency.ID)), + providerordertoken.SettlementAddressNEQ(""), + providerordertoken.HasProviderWith( + providerprofile.IsActive(true), + providerprofile.VisibilityModeEQ(providerprofile.VisibilityModePublic), + providerprofile.HasProviderBalancesWith( + providerbalances.HasFiatCurrencyWith(fiatcurrency.IDEQ(currency.ID)), + providerbalances.IsAvailableEQ(true), + providerbalances.AvailableBalanceGTE(order.Amount.Mul(order.Rate).RoundBank(0)), + ), + ), + ). + WithProvider(func(pq *ent.ProviderProfileQuery) { + pq.WithProviderRating() + }). + WithCurrency(). + All(ctx) if err != nil { - prevRedisKey := redisKey + "_prev" - err = s.matchRate(ctx, prevRedisKey, orderIDPrefix, order, excludeList) - if err != nil { - // Both matchRate attempts failed; try fallback once for regular orders - matchRateErr := err - if order.OrderType != "otc" { - orderEnt, loadErr := storage.Client.PaymentOrder.Query(). - Where(paymentorder.IDEQ(order.ID)). - WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). - Only(ctx) - if loadErr == nil { - if fallbackErr := s.TryFallbackAssignment(ctx, orderEnt); fallbackErr == nil { - return nil - } else { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "Error": fallbackErr.Error(), - }).Errorf("AssignPaymentOrder: TryFallbackAssignment failed after no provider in queue") - var errStuck *types.ErrNoProviderDueToStuck - if errors.As(fallbackErr, &errStuck) { - return fallbackErr - } - } - } + return err + } + if len(candidates) == 0 { + if order.OrderType != "otc" { + orderEnt, loadErr := storage.Client.PaymentOrder.Query().Where(paymentorder.IDEQ(order.ID)).WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }).Only(ctx) + if loadErr == nil { + return s.TryFallbackAssignment(ctx, orderEnt) } - return fmt.Errorf("no provider matched for order: %w", matchRateErr) + } + return fmt.Errorf("no eligible provider order tokens found") + } + providerIDs := make([]string, 0, len(candidates)) + for _, candidate := range candidates { + if candidate.Edges.Provider != nil { + providerIDs = append(providerIDs, candidate.Edges.Provider.ID) } } + recentVolumes, _ := getRecentSuccessfulFiatVolumeByProvider(ctx, providerIDs) + sort.SliceStable(candidates, func(i, j int) bool { + leftScore := decimal.Zero + rightScore := decimal.Zero + if candidates[i].Edges.Provider != nil && candidates[i].Edges.Provider.Edges.ProviderRating != nil { + leftScore = candidates[i].Edges.Provider.Edges.ProviderRating.TrustScore + } + if candidates[j].Edges.Provider != nil && candidates[j].Edges.Provider.Edges.ProviderRating != nil { + rightScore = candidates[j].Edges.Provider.Edges.ProviderRating.TrustScore + } + if !leftScore.Equal(rightScore) { + return leftScore.GreaterThan(rightScore) + } + leftVol := recentVolumes[candidates[i].Edges.Provider.ID] + rightVol := recentVolumes[candidates[j].Edges.Provider.ID] + if !leftVol.Equal(rightVol) { + return leftVol.LessThan(rightVol) + } + return candidates[i].Edges.Provider.ID < candidates[j].Edges.Provider.ID + }) - return nil + for _, candidate := range candidates { + providerID := candidate.Edges.Provider.ID + excludeCount := s.countProviderInExcludeList(excludeList, providerID) + if order.OrderType == "otc" { + if excludeCount > 0 { + continue + } + } else if excludeCount >= orderConf.ProviderMaxRetryAttempts { + continue + } + if order.OrderType != "otc" && orderConf.ProviderStuckFulfillmentThreshold > 0 { + stuckCount, errStuck := utils.GetProviderStuckOrderCount(ctx, providerID) + if errStuck == nil && stuckCount >= orderConf.ProviderStuckFulfillmentThreshold { + continue + } + } + if order.Amount.LessThan(candidate.MinOrderAmount) { + continue + } + if order.Amount.GreaterThan(candidate.MaxOrderAmount) { + if candidate.MinOrderAmountOtc.IsZero() || candidate.MaxOrderAmountOtc.IsZero() { + continue + } + if order.Amount.LessThan(candidate.MinOrderAmountOtc) || order.Amount.GreaterThan(candidate.MaxOrderAmountOtc) { + continue + } + } + rate := providerOrderTokenRate(candidate, rateSide) + allowedDeviation := order.Rate.Mul(candidate.RateSlippage.Div(decimal.NewFromInt(100))) + if rate.Sub(order.Rate).Abs().GreaterThan(allowedDeviation) { + continue + } + order.ProviderID = providerID + if order.OrderType == "otc" { + return s.assignOtcOrder(ctx, order) + } + return s.sendOrderRequest(ctx, order) + } + if order.OrderType != "otc" { + orderEnt, loadErr := storage.Client.PaymentOrder.Query().Where(paymentorder.IDEQ(order.ID)).WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }).Only(ctx) + if loadErr == nil { + return s.TryFallbackAssignment(ctx, orderEnt) + } + } + return fmt.Errorf("no provider matched for order") } // TryFallbackAssignment attempts to assign the order to the configured fallback provider using only rate and balance checks. @@ -905,9 +978,6 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order // Eagerly load ProvisionBucket+Currency so we never need a separate fallback query for them. currentOrder, err := storage.Client.PaymentOrder.Query(). Where(paymentorder.IDEQ(order.ID)). - WithProvisionBucket(func(pb *ent.ProvisionBucketQuery) { - pb.WithCurrency() - }). Only(ctx) if err != nil { return fmt.Errorf("fallback: failed to load order: %w", err) @@ -966,7 +1036,6 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order AccountIdentifier: order.AccountIdentifier, AccountName: order.AccountName, ProviderID: "", - ProvisionBucket: currentOrder.Edges.ProvisionBucket, MessageHash: order.MessageHash, Memo: order.Memo, UpdatedAt: order.UpdatedAt, @@ -980,39 +1049,9 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order return fmt.Errorf("fallback: order %s has no token", order.ID.String()) } - // If order has no bucket yet, resolve one from institution currency + fiat amount. - if fields.ProvisionBucket == nil { - institution, instErr := utils.GetInstitutionByCode(ctx, order.Institution, true) - if instErr != nil { - return fmt.Errorf("fallback: cannot resolve bucket for order %s: institution lookup failed: %w", fields.ID.String(), instErr) - } - if institution.Edges.FiatCurrency == nil { - return fmt.Errorf("fallback: cannot resolve bucket for order %s: institution %s has no fiat currency", fields.ID.String(), order.Institution) - } - fiatAmount := order.Amount.Mul(order.Rate) - bucket, bErr := storage.Client.ProvisionBucket. - Query(). - Where( - provisionbucket.MaxAmountGTE(fiatAmount), - provisionbucket.MinAmountLTE(fiatAmount), - provisionbucket.HasCurrencyWith(fiatcurrency.IDEQ(institution.Edges.FiatCurrency.ID)), - ). - WithCurrency(). - Only(ctx) - if bErr != nil { - return fmt.Errorf("fallback: no matching provision bucket for order %s (fiat %s %s): %w", - fields.ID.String(), fiatAmount.String(), institution.Edges.FiatCurrency.Code, bErr) - } - fields.ProvisionBucket = bucket - // Persist so later flows (e.g. FulfillOrder) see the bucket and do not panic on nil ProvisionBucket - if _, upErr := storage.Client.PaymentOrder.UpdateOneID(fields.ID).SetProvisionBucket(bucket).Save(ctx); upErr != nil { - return fmt.Errorf("fallback: failed to set provision bucket on order %s: %w", fields.ID.String(), upErr) - } - } - - bucketCurrency := fields.ProvisionBucket.Edges.Currency - if bucketCurrency == nil { - return fmt.Errorf("fallback: provision bucket %d missing currency", fields.ProvisionBucket.ID) + bucketCurrency, err := resolveInstitutionCurrency(ctx, order.Institution) + if err != nil { + return fmt.Errorf("fallback: resolve institution currency: %w", err) } // Skip fallback provider if at or over stuck fulfillment threshold @@ -1358,7 +1397,11 @@ func (s *PriorityQueueService) addProviderToExcludeList(ctx context.Context, ord func (s *PriorityQueueService) sendOrderRequest(ctx context.Context, order types.PaymentOrderFields) error { orderConf := config.OrderConfig() // Reserve balance for this order - currency := order.ProvisionBucket.Edges.Currency.Code + currencyEnt, err := resolveInstitutionCurrency(ctx, order.Institution) + if err != nil { + return err + } + currency := currencyEnt.Code amount := order.Amount.Mul(order.Rate).RoundBank(0) // Start a transaction for the entire operation @@ -1444,7 +1487,7 @@ func (s *PriorityQueueService) sendOrderRequest(ctx context.Context, order types orderRequestData := map[string]interface{}{ "amount": order.Amount.Mul(order.Rate).RoundBank(0).String(), "institution": order.Institution, - "currency": order.ProvisionBucket.Edges.Currency.Code, + "currency": currency, "providerId": order.ProviderID, } diff --git a/services/provider_selection_constants.go b/services/provider_selection_constants.go new file mode 100644 index 00000000..ce8b95f3 --- /dev/null +++ b/services/provider_selection_constants.go @@ -0,0 +1,34 @@ +package services + +import ( + "strings" + "time" + + "github.com/shopspring/decimal" +) + +var ( + RewardFulfilledValidated = decimal.NewFromFloat(1.0) + PenaltyCancelInsufficientFunds = decimal.NewFromFloat(-1.5) + PenaltyCancelProviderFault = decimal.NewFromFloat(-1.0) + PenaltyValidationFailed = decimal.NewFromFloat(-2.0) + PenaltyOrderRequestExpired = decimal.NewFromFloat(-0.5) + RecentProcessedVolumeWindow = 24 * time.Hour + ProviderFaultCancelReasons = []string{ + "out of stock", + "declined", + "rate expired", + "unable to fulfill", + "capacity limit", + } +) + +func isProviderFaultCancelReason(reason string) bool { + normalized := strings.TrimSpace(strings.ToLower(reason)) + for _, allowed := range ProviderFaultCancelReasons { + if normalized == allowed { + return true + } + } + return false +} diff --git a/services/provider_selection_helpers.go b/services/provider_selection_helpers.go new file mode 100644 index 00000000..182ae352 --- /dev/null +++ b/services/provider_selection_helpers.go @@ -0,0 +1,55 @@ +package services + +import ( + "context" + "fmt" + "time" + + "github.com/paycrest/aggregator/ent" + "github.com/paycrest/aggregator/ent/paymentorder" + "github.com/paycrest/aggregator/ent/providerprofile" + "github.com/paycrest/aggregator/storage" + "github.com/paycrest/aggregator/utils" + "github.com/shopspring/decimal" +) + +func resolveInstitutionCurrency(ctx context.Context, institutionCode string) (*ent.FiatCurrency, error) { + institution, err := utils.GetInstitutionByCode(ctx, institutionCode, true) + if err != nil { + return nil, err + } + if institution == nil || institution.Edges.FiatCurrency == nil { + return nil, fmt.Errorf("institution %s has no fiat currency", institutionCode) + } + return institution.Edges.FiatCurrency, nil +} + +func resolveOrderCurrency(ctx context.Context, order *ent.PaymentOrder) (*ent.FiatCurrency, error) { + return resolveInstitutionCurrency(ctx, order.Institution) +} + +func getRecentSuccessfulFiatVolumeByProvider(ctx context.Context, providerIDs []string) (map[string]decimal.Decimal, error) { + volumes := make(map[string]decimal.Decimal, len(providerIDs)) + if len(providerIDs) == 0 { + return volumes, nil + } + since := time.Now().Add(-RecentProcessedVolumeWindow) + orders, err := storage.Client.PaymentOrder.Query(). + Where( + paymentorder.HasProviderWith(providerprofile.IDIn(providerIDs...)), + paymentorder.UpdatedAtGTE(since), + paymentorder.StatusIn(paymentorder.StatusValidated, paymentorder.StatusSettled), + ). + WithProvider(). + All(ctx) + if err != nil { + return nil, err + } + for _, order := range orders { + if order.Edges.Provider == nil { + continue + } + volumes[order.Edges.Provider.ID] = volumes[order.Edges.Provider.ID].Add(order.Amount.Mul(order.Rate)) + } + return volumes, nil +} diff --git a/tasks/startup.go b/tasks/startup.go index 3766df34..fb46406e 100644 --- a/tasks/startup.go +++ b/tasks/startup.go @@ -5,7 +5,6 @@ import ( "time" "github.com/go-co-op/gocron" - "github.com/paycrest/aggregator/services" "github.com/paycrest/aggregator/storage" "github.com/paycrest/aggregator/utils/logger" ) @@ -40,32 +39,17 @@ func SubscribeToRedisKeyspaceEvents() func() { func StartCronJobs() { // Use the system's local timezone instead of hardcoded UTC to prevent timezone conflicts scheduler := gocron.NewScheduler(time.Local) - priorityQueue := services.NewPriorityQueueService() - err := ComputeMarketRate() if err != nil { logger.Errorf("StartCronJobs for ComputeMarketRate: %v", err) } - if serverConf.Environment != "production" { - err = priorityQueue.ProcessBucketQueues() - if err != nil { - logger.Errorf("StartCronJobs for ProcessBucketQueues: %v", err) - } - } - // Compute market rate every 9 minutes _, err = scheduler.Every(9).Minutes().Do(ComputeMarketRate) if err != nil { logger.Errorf("StartCronJobs for ComputeMarketRate: %v", err) } - // Refresh provision bucket priority queues every X minutes - _, err = scheduler.Every(orderConf.BucketQueueRebuildInterval).Minutes().Do(priorityQueue.ProcessBucketQueues) - if err != nil { - logger.Errorf("StartCronJobs for ProcessBucketQueues: %v", err) - } - // Retry failed webhook notifications every 13 minutes _, err = scheduler.Every(13).Minutes().Do(RetryFailedWebhookNotifications) if err != nil { diff --git a/utils/utils.go b/utils/utils.go index dc44c8c9..a6cebd6b 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "errors" "fmt" "io" "math/big" @@ -976,81 +975,19 @@ func IsBase64(s string) bool { // GetTokenRateFromQueue gets the rate of a token from the priority queue for the given side (buy/sell). // Bucket keys are side-suffixed: bucket_{currency}_{min}_{max}_{side}. Scanning without side would mix buy and sell rates. func GetTokenRateFromQueue(tokenSymbol string, orderAmount decimal.Decimal, fiatCurrency string, marketRate decimal.Decimal, side RateSide) (decimal.Decimal, error) { - ctx := context.Background() - - // Scan for side-specific bucket keys: bucket_{currency}_{min}_{max}_{side} - keys, _, err := storage.RedisClient.Scan(ctx, uint64(0), "bucket_"+fiatCurrency+"_*_*_"+string(side), 100).Result() + tokenEntity, err := storage.Client.Token.Query().Where(tokenEnt.SymbolEQ(tokenSymbol)).Only(context.Background()) if err != nil { return decimal.Decimal{}, err } - - rateResponse := marketRate - highestMaxAmount := decimal.NewFromInt(0) - - // Scan through the buckets to find a suitable rate - for _, key := range keys { - bd, err := parseBucketKey(key) - if err != nil { - continue - } - minAmount, maxAmount := bd.MinAmount, bd.MaxAmount - - for index := 0; ; index++ { - // Get the topmost provider in the priority queue of the bucket - providerData, err := storage.RedisClient.LIndex(ctx, key, int64(index)).Result() - if err != nil { - break - } - parts := strings.Split(providerData, ":") - if len(parts) != 6 { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "ProviderData": providerData, - "Token": tokenSymbol, - "Currency": fiatCurrency, - "MinAmount": minAmount, - "MaxAmount": maxAmount, - }).Errorf("GetTokenRate.InvalidProviderData: %v", providerData) - continue - } - - // Skip entry if token doesn't match - if parts[1] != tokenSymbol { - continue - } - - // Skip entry if order amount is not within provider's min and max order amount - minOrderAmount, err := decimal.NewFromString(parts[4]) - if err != nil { - continue - } - - maxOrderAmount, err := decimal.NewFromString(parts[5]) - if err != nil { - continue - } - - if orderAmount.LessThan(minOrderAmount) || orderAmount.GreaterThan(maxOrderAmount) { - continue - } - - // Get fiat equivalent of the token amount - rate, _ := decimal.NewFromString(parts[3]) - fiatAmount := orderAmount.Mul(rate) - - // Check if fiat amount is within the bucket range and set the rate - if fiatAmount.GreaterThanOrEqual(minAmount) && fiatAmount.LessThanOrEqual(maxAmount) { - rateResponse = rate - break - } else if maxAmount.GreaterThan(highestMaxAmount) { - // Get the highest max amount - highestMaxAmount = maxAmount - rateResponse = rate - } - } + currency, err := storage.Client.FiatCurrency.Query().Where(fiatcurrency.CodeEQ(fiatCurrency)).Only(context.Background()) + if err != nil { + return decimal.Decimal{}, err } - - return rateResponse, nil + result, err := validateBucketRate(context.Background(), tokenEntity, currency, orderAmount, "", side) + if err != nil { + return decimal.Decimal{}, err + } + return result.Rate, nil } // GetInstitutionByCode returns the institution for a given institution code @@ -1349,116 +1286,59 @@ func getProviderRateFromRedis(ctx context.Context, providerID, tokenSymbol, curr // validateBucketRate handles bucket-based rate validation func validateBucketRate(ctx context.Context, token *ent.Token, currency *ent.FiatCurrency, amount decimal.Decimal, networkIdentifier string, side RateSide) (RateValidationResult, error) { - // Get redis keys for provision buckets for the specific side - // Scan for side-specific bucket keys: bucket_{currency}_{min}_{max}_{side} - keys, _, err := storage.RedisClient.Scan(ctx, uint64(0), "bucket_"+currency.Code+"_*_*_"+string(side), 100).Result() + q := storage.Client.ProviderOrderToken.Query(). + Where( + providerordertoken.HasTokenWith(tokenEnt.IDEQ(token.ID)), + providerordertoken.HasCurrencyWith(fiatcurrency.CodeEQ(currency.Code)), + providerordertoken.SettlementAddressNEQ(""), + providerordertoken.HasProviderWith( + providerprofile.IsActive(true), + providerprofile.VisibilityModeEQ(providerprofile.VisibilityModePublic), + ), + ). + WithProvider(func(pq *ent.ProviderProfileQuery) { pq.WithProviderRating() }). + WithCurrency() + if networkIdentifier != "" { + q = q.Where(providerordertoken.NetworkEQ(networkIdentifier)) + } + orderTokens, err := q.All(ctx) if err != nil { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "Currency": currency.Code, - "Network": networkIdentifier, - }).Errorf("Failed to scan Redis buckets for bucket rate") return RateValidationResult{}, fmt.Errorf("internal server error") } - - // Track the best available rate and reason for logging - var bestRate decimal.Decimal - var foundExactMatch bool - var selectedProviderID string - var selectedOrderType paymentorder.OrderType - var anySkippedDueToStuck bool - var currencyForStuck string - - // Scan through the buckets to find a matching rate - for _, key := range keys { - bucketData, err := parseBucketKey(key) - if err != nil { - logger.WithFields(logger.Fields{ - "Key": key, - "Error": err, - }).Errorf("ValidateRate.InvalidBucketKey: failed to parse bucket key") + var best RateValidationResult + for _, orderToken := range orderTokens { + if amount.LessThan(orderToken.MinOrderAmount) { continue } - - // Get all providers in this bucket to find the first suitable one (priority queue order) - providers, err := storage.RedisClient.LRange(ctx, key, 0, -1).Result() - if err != nil { - logger.WithFields(logger.Fields{ - "Key": key, - "Error": err, - }).Errorf("ValidateRate.FailedToGetProviders: failed to get providers from bucket") + orderType := DetermineOrderType(orderToken, amount) + if orderType == paymentorder.OrderTypeRegular && amount.GreaterThan(orderToken.MaxOrderAmount) { continue } - - // Find the first provider at the top of the queue that matches our criteria - result := findSuitableProviderRate(ctx, providers, token, networkIdentifier, amount, bucketData, side) - if result.Found { - foundExactMatch = true - bestRate = result.Rate - selectedProviderID = result.ProviderID - selectedOrderType = result.OrderType - break // Found exact match, no need to continue - } - if result.AllSkippedDueToStuck && result.CurrencyCode != "" { - anySkippedDueToStuck = true - currencyForStuck = result.CurrencyCode - } - - // Track the best available rate for logging purposes - if result.Rate.GreaterThan(bestRate) { - bestRate = result.Rate - } - } - - // If no exact match found, try fallback provider (if configured) even though it's not on the bucket queue - if !foundExactMatch { - if fallbackID := config.OrderConfig().FallbackProviderID; fallbackID != "" { - fallbackResult, fallbackErr := validateProviderRate(ctx, token, currency, amount, fallbackID, networkIdentifier, side) - if fallbackErr == nil { - // Quote the queue's best rate when amount was below/above queue providers' min/max but fallback accepts it - if bestRate.GreaterThan(decimal.Zero) { - return RateValidationResult{ - Rate: bestRate, - ProviderID: fallbackResult.ProviderID, - OrderType: fallbackResult.OrderType, - }, nil - } - return fallbackResult, nil + rate := decimal.Zero + if side == RateSideBuy { + if !orderToken.FixedBuyRate.IsZero() { + rate = orderToken.FixedBuyRate + } else { + rate = currency.MarketBuyRate.Add(orderToken.FloatingBuyDelta).RoundBank(2) } - var errStuck *types.ErrNoProviderDueToStuck - if errors.As(fallbackErr, &errStuck) { - return RateValidationResult{}, fallbackErr + } else { + if !orderToken.FixedSellRate.IsZero() { + rate = orderToken.FixedSellRate + } else { + rate = currency.MarketSellRate.Add(orderToken.FloatingSellDelta).RoundBank(2) } } - if anySkippedDueToStuck && currencyForStuck != "" { - return RateValidationResult{}, &types.ErrNoProviderDueToStuck{CurrencyCode: currencyForStuck} - } - logger.WithFields(logger.Fields{ - "Token": token.Symbol, - "Currency": currency.Code, - "Amount": amount, - "NetworkFilter": networkIdentifier, - "BestRate": bestRate, - }).Warnf("ValidateRate.NoSuitableProvider: no provider found for the given parameters") - - // Provide more specific error message (buy = fiat to crypto, sell = crypto to fiat) - networkMsg := networkIdentifier - if networkMsg == "" { - networkMsg = "any network" + if rate.IsZero() { + continue } - from, to := token.Symbol, currency.Code - if side == RateSideBuy { - from, to = currency.Code, token.Symbol + if best.Rate.IsZero() || rate.GreaterThan(best.Rate) { + best = RateValidationResult{Rate: rate, ProviderID: orderToken.Edges.Provider.ID, OrderType: orderType} } - return RateValidationResult{}, fmt.Errorf("no provider available for %s to %s conversion with amount %s on %s", - from, to, amount, networkMsg) } - - return RateValidationResult{ - Rate: bestRate, - ProviderID: selectedProviderID, - OrderType: selectedOrderType, - }, nil + if best.Rate.IsZero() { + return RateValidationResult{}, fmt.Errorf("no provider available for this token/currency pair") + } + return best, nil } // parseBucketKey parses and validates bucket key format