diff --git a/controllers/provider/provider.go b/controllers/provider/provider.go index 7bb03f87..f6b3c107 100644 --- a/controllers/provider/provider.go +++ b/controllers/provider/provider.go @@ -448,9 +448,6 @@ func (ctrl *ProviderController) handleExportPaymentOrders(ctx *gin.Context, prov WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { - pbq.WithCurrency() - }). Limit(maxExportLimit). Order(ent.Desc(paymentorder.FieldCreatedAt), ent.Desc(paymentorder.FieldID)). All(reqCtx) @@ -524,10 +521,7 @@ func (ctrl *ProviderController) handleExportPaymentOrders(ctx *gin.Context, prov institutionName = name } - var currencyCode string - if paymentOrder.Edges.ProvisionBucket != nil && paymentOrder.Edges.ProvisionBucket.Edges.Currency != nil { - currencyCode = paymentOrder.Edges.ProvisionBucket.Edges.Currency.Code - } + currencyCode, _ := u.GetInstitutionCurrencyCode(reqCtx, paymentOrder.Institution, true) row := []string{ paymentOrder.ID.String(), @@ -590,7 +584,6 @@ func (ctrl *ProviderController) AcceptOrder(ctx *gin.Context) { WithProvider(). WithSenderProfile(). WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { pbq.WithCurrency() }). Only(reqCtx) if fallbackErr != nil || fallbackOrder == nil { if fallbackErr != nil && !ent.IsNotFound(fallbackErr) { @@ -628,12 +621,9 @@ func (ctrl *ProviderController) AcceptOrder(ctx *gin.Context) { } if fallbackOrder.Direction == paymentorder.DirectionOnramp { result["amount"] = fallbackOrder.Amount.Add(fallbackOrder.SenderFee).Mul(fallbackOrder.Rate).RoundBank(0).String() - if fallbackOrder.Edges.ProvisionBucket != nil && fallbackOrder.Edges.ProvisionBucket.Edges.Currency != nil { - result["currency"] = fallbackOrder.Edges.ProvisionBucket.Edges.Currency.Code - } else if fallbackOrder.Institution != "" { - inst, instErr := u.GetInstitutionByCode(reqCtx, fallbackOrder.Institution, true) - if instErr == nil && inst != nil && inst.Edges.FiatCurrency != nil { - result["currency"] = inst.Edges.FiatCurrency.Code + if fallbackOrder.Institution != "" { + if currencyCode, curErr := u.GetInstitutionCurrencyCode(reqCtx, fallbackOrder.Institution, true); curErr == nil { + result["currency"] = currencyCode } } } @@ -1407,16 +1397,7 @@ func (ctrl *ProviderController) handlePayoutFulfillment(ctx *gin.Context, orderI return } providerID := fulfillment.Edges.Order.Edges.Provider.ID - currency := "" - if fulfillment.Edges.Order.Edges.ProvisionBucket != nil && fulfillment.Edges.Order.Edges.ProvisionBucket.Edges.Currency != nil { - currency = fulfillment.Edges.Order.Edges.ProvisionBucket.Edges.Currency.Code - } - if currency == "" && fulfillment.Edges.Order.Institution != "" { - inst, instErr := u.GetInstitutionByCode(reqCtx, fulfillment.Edges.Order.Institution, true) - if instErr == nil && inst != nil && inst.Edges.FiatCurrency != nil { - currency = inst.Edges.FiatCurrency.Code - } - } + currency, _ := u.GetInstitutionCurrencyCode(reqCtx, fulfillment.Edges.Order.Institution, true) if currency == "" { logger.WithFields(logger.Fields{ "OrderID": orderID.String(), @@ -1548,16 +1529,7 @@ func (ctrl *ProviderController) handlePayoutFulfillment(ctx *gin.Context, orderI return } providerID := fulfillment.Edges.Order.Edges.Provider.ID - currency := "" - if fulfillment.Edges.Order.Edges.ProvisionBucket != nil && fulfillment.Edges.Order.Edges.ProvisionBucket.Edges.Currency != nil { - currency = fulfillment.Edges.Order.Edges.ProvisionBucket.Edges.Currency.Code - } - if currency == "" && fulfillment.Edges.Order.Institution != "" { - inst, instErr := u.GetInstitutionByCode(reqCtx, fulfillment.Edges.Order.Institution, true) - if instErr == nil && inst != nil && inst.Edges.FiatCurrency != nil { - currency = inst.Edges.FiatCurrency.Code - } - } + currency, _ := u.GetInstitutionCurrencyCode(reqCtx, fulfillment.Edges.Order.Institution, true) if currency == "" { logger.WithFields(logger.Fields{ "OrderID": orderID.String(), @@ -2358,9 +2330,6 @@ func (ctrl *ProviderController) CancelOrder(ctx *gin.Context) { tq.WithNetwork() }). WithProvider(). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { - pbq.WithCurrency() - }). Only(reqCtx) if err != nil { logger.WithFields(logger.Fields{ @@ -2381,49 +2350,8 @@ func (ctrl *ProviderController) CancelOrder(ctx *gin.Context) { } else if payload.Reason != "Insufficient funds" { cancellationCount += 1 orderUpdate.AppendCancellationReasons([]string{payload.Reason}) - } else if payload.Reason == "Insufficient funds" && order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil { - // Search for the specific provider in the queue using a Redis list (private orders with nil bucket are not in the queue) - redisKey := fmt.Sprintf("bucket_%s_%s_%s", order.Edges.ProvisionBucket.Edges.Currency.Code, order.Edges.ProvisionBucket.MinAmount, order.Edges.ProvisionBucket.MaxAmount) - - // Check if the provider ID exists in the list - for index := -1; ; index-- { - providerData, err := storage.RedisClient.LIndex(reqCtx, redisKey, int64(index)).Result() - if err != nil { - break - } - - // Extract the id from the data (format "providerID:token:network:rate:minAmount:maxAmount") - parts := strings.Split(providerData, ":") - if len(parts) != 6 { - logger.WithFields(logger.Fields{ - "Provider Data": providerData, - }).Error("Invalid provider data format") - continue // Skip this entry due to invalid format - } - - if parts[0] == provider.ID { - // Remove the provider from the list - placeholder := "DELETED_PROVIDER" // Define a placeholder value - _, err := storage.RedisClient.LSet(reqCtx, redisKey, int64(index), placeholder).Result() - if err != nil { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "Index": index, - }).Errorf("Failed to set placeholder at index %d: %v", index, err) - } - - // Remove all occurences of the placeholder from the list - _, err = storage.RedisClient.LRem(reqCtx, redisKey, 0, placeholder).Result() - if err != nil { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "Placeholder": placeholder, - }).Errorf("Failed to remove placeholder from circular queue: %v", err) - } - - break - } - } + } else if payload.Reason == "Insufficient funds" { + // DB-driven provider selection no longer removes providers from Redis bucket queues here. } // Update order status to cancelled @@ -2446,16 +2374,7 @@ func (ctrl *ProviderController) CancelOrder(ctx *gin.Context) { // Release reserved balance for this cancelled order providerID := order.Edges.Provider.ID - currency := "" - if order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil { - currency = order.Edges.ProvisionBucket.Edges.Currency.Code - } - if currency == "" && order.Institution != "" { - inst, instErr := u.GetInstitutionByCode(reqCtx, order.Institution, true) - if instErr == nil && inst != nil && inst.Edges.FiatCurrency != nil { - currency = inst.Edges.FiatCurrency.Code - } - } + currency, _ := u.GetInstitutionCurrencyCode(reqCtx, order.Institution, true) if currency != "" { amount := order.Amount.Mul(order.Rate).RoundBank(0) err = ctrl.balanceService.ReleaseFiatBalance(reqCtx, providerID, currency, amount, nil) diff --git a/services/common/indexer.go b/services/common/indexer.go index f03bedaf..778765d2 100644 --- a/services/common/indexer.go +++ b/services/common/indexer.go @@ -504,13 +504,11 @@ func GetProviderAddressFromOrder(ctx context.Context, order *ent.PaymentOrder) ( return "", fmt.Errorf("payment order has no provider") } - // Get the currency from the provision bucket - if order.Edges.ProvisionBucket == nil { - return "", fmt.Errorf("payment order has no provision bucket") + currencyCode, err := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if err != nil { + return "", fmt.Errorf("payment order institution currency lookup failed: %w", err) } - currencyCode := order.Edges.ProvisionBucket.Edges.Currency.Code - // Get provider order token for this provider, token, and currency providerOrderToken, err := storage.Client.ProviderOrderToken. Query(). diff --git a/services/common/order.go b/services/common/order.go index 8b0ff207..42cabce7 100644 --- a/services/common/order.go +++ b/services/common/order.go @@ -140,7 +140,7 @@ func ProcessPaymentOrderFromBlockchain( } // Validate and prepare payment order data (pass existing order so it can be used if cancellation is needed) - paymentOrderFields, _, _, _, _, err := validateAndPreparePaymentOrderData(ctx, network, event, refundOrder, existingOrderWithMessageHash) + paymentOrderFields, _, _, _, err := validateAndPreparePaymentOrderData(ctx, network, event, refundOrder, existingOrderWithMessageHash) if err != nil { return err } @@ -181,11 +181,6 @@ func ProcessPaymentOrderFromBlockchain( updateBuilder = updateBuilder.SetSender(paymentOrderFields.Sender) } - // Update provision bucket if available (should always be set for regular orders) - if paymentOrderFields.ProvisionBucket != nil { - updateBuilder = updateBuilder.SetProvisionBucket(paymentOrderFields.ProvisionBucket) - } - _, err = updateBuilder.Save(ctx) if err != nil { _ = tx.Rollback() @@ -270,7 +265,6 @@ func ProcessPaymentOrderFromBlockchain( SetMessageHash(paymentOrderFields.MessageHash). SetMemo(paymentOrderFields.Memo). SetMetadata(paymentOrderFields.Metadata). - SetProvisionBucket(paymentOrderFields.ProvisionBucket). SetOrderType(paymentorder.OrderType(paymentOrderFields.OrderType)). SetStatus(paymentorder.StatusPending). SetIndexerCreatedAt(time.Now()) @@ -283,7 +277,6 @@ func ProcessPaymentOrderFromBlockchain( } } - orderCreated, err := orderBuilder.Save(ctx) if err != nil { return fmt.Errorf("%s - failed to create payment order: %w", paymentOrderFields.GatewayID, err) @@ -430,7 +423,6 @@ func UpdateOrderStatusRefunded(ctx context.Context, network *ent.Network, event SetTxHash(event.TxHash). SetStatus(paymentorder.StatusRefunded) - updatedOrderRows, err := paymentOrderUpdate.Save(ctx) if err != nil { return fmt.Errorf("UpdateOrderStatusRefunded.aggregator: %v", err) @@ -452,21 +444,9 @@ func UpdateOrderStatusRefunded(ctx context.Context, network *ent.Network, event ), ). WithProvider(). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { - pbq.WithCurrency() - }). Only(ctx) if err == nil && paymentOrder != nil && paymentOrder.Edges.Provider != nil { - currency := "" - if paymentOrder.Edges.ProvisionBucket != nil && paymentOrder.Edges.ProvisionBucket.Edges.Currency != nil { - currency = paymentOrder.Edges.ProvisionBucket.Edges.Currency.Code - } - if currency == "" && paymentOrder.Institution != "" { - inst, instErr := utils.GetInstitutionByCode(ctx, paymentOrder.Institution, true) - if instErr == nil && inst != nil && inst.Edges.FiatCurrency != nil { - currency = inst.Edges.FiatCurrency.Code - } - } + currency, _ := utils.GetInstitutionCurrencyCode(ctx, paymentOrder.Institution, true) if currency != "" { balanceService := balance.New() providerID := paymentOrder.Edges.Provider.ID @@ -596,7 +576,6 @@ func UpdateOrderStatusSettleOut(ctx context.Context, network *ent.Network, event paymentOrderUpdate = paymentOrderUpdate.AddPercentSettled(event.SettlePercent.Div(decimal.NewFromInt(1000))) } - updatedOrderRows, err := paymentOrderUpdate.Save(ctx) if err != nil { return fmt.Errorf("UpdateOrderStatusSettleOut.aggregator: %v", err) @@ -618,17 +597,17 @@ func UpdateOrderStatusSettleOut(ctx context.Context, network *ent.Network, event ), ). WithProvider(). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { - pbq.WithCurrency() - }). Only(ctx) - if err == nil && paymentOrder != nil && paymentOrder.Edges.Provider != nil && paymentOrder.Edges.ProvisionBucket != nil && paymentOrder.Edges.ProvisionBucket.Edges.Currency != nil { + if err == nil && paymentOrder != nil && paymentOrder.Edges.Provider != nil { // Only attempt balance operations if we have the required edge data // Create a new balance service instance for this transaction balanceService := balance.New() providerID := paymentOrder.Edges.Provider.ID - currency := paymentOrder.Edges.ProvisionBucket.Edges.Currency.Code + currency, curErr := utils.GetInstitutionCurrencyCode(ctx, paymentOrder.Institution, true) + if curErr != nil || currency == "" { + goto commitSettleOut + } amount := paymentOrder.Amount.Mul(paymentOrder.Rate).RoundBank(0) currentBalance, err := balanceService.GetProviderFiatBalance(ctx, providerID, currency) @@ -664,6 +643,7 @@ func UpdateOrderStatusSettleOut(ctx context.Context, network *ent.Network, event } } +commitSettleOut: // Commit the transaction if err := tx.Commit(); err != nil { return fmt.Errorf("UpdateOrderStatusSettleOut commit failed: %w", err) @@ -755,7 +735,6 @@ func UpdateOrderStatusSettleIn(ctx context.Context, network *ent.Network, event SetTxHash(event.TxHash). SetStatus(paymentorder.StatusSettled) - updatedOrderRows, err := paymentOrderUpdate.Save(ctx) if err != nil { return fmt.Errorf("UpdateOrderStatusSettleIn.aggregator: %v", err) @@ -904,11 +883,6 @@ func HandleCancellation(ctx context.Context, createdPaymentOrder *ent.PaymentOrd SetCancellationReasons([]string{cancellationReason}). SetStatus(paymentorder.StatusCancelled) - // Only set ProvisionBucket if it's not nil - if paymentOrderFields.ProvisionBucket != nil { - orderBuilder = orderBuilder.SetProvisionBucket(paymentOrderFields.ProvisionBucket) - } - // Set provider if ProviderID exists if paymentOrderFields.ProviderID != "" { provider, err := db.Client.ProviderProfile.Query().Where(providerprofile.IDEQ(paymentOrderFields.ProviderID)).Only(ctx) @@ -1258,29 +1232,7 @@ func processPaymentOrderPostCreation( } } - // Fill empty provision bucket so AssignPaymentOrder can run (same as stale_ops nil-bucket resolution) - if paymentOrderFields.ProvisionBucket == nil { - institution, instErr := utils.GetInstitutionByCode(ctx, paymentOrder.Institution, true) - if instErr == nil && institution != nil && institution.Edges.FiatCurrency != nil { - fiatAmount := paymentOrder.Amount.Mul(paymentOrder.Rate) - bucket, bErr := db.Client.ProvisionBucket. - Query(). - Where( - provisionbucket.MaxAmountGTE(fiatAmount), - provisionbucket.MinAmountLTE(fiatAmount), - provisionbucket.HasCurrencyWith(fiatcurrency.IDEQ(institution.Edges.FiatCurrency.ID)), - ). - WithCurrency(). - First(ctx) - if bErr == nil && bucket != nil { - if _, upErr := db.Client.PaymentOrder.UpdateOneID(paymentOrder.ID).SetProvisionBucket(bucket).Save(ctx); upErr == nil { - paymentOrderFields.ProvisionBucket = bucket - } - } - } - } - - // Assign when we have a bucket, or when order is private; private orders don't require buckets. + // Assign when the order is private or can be matched by the priority queue. isPrivate := false if paymentOrderFields.ProviderID != "" { provider, pErr := db.Client.ProviderProfile.Query().Where(providerprofile.IDEQ(paymentOrderFields.ProviderID)).Only(ctx) @@ -1289,7 +1241,7 @@ func processPaymentOrderPostCreation( } } paymentOrderFields.ID = paymentOrder.ID - if paymentOrderFields.ProvisionBucket != nil || isPrivate { + if isPrivate || paymentOrderFields.Institution != "" { // Run assignment in a goroutine with its own 60s timeout so one slow order doesn't // consume the indexing task's context and cancel other work. fieldsCopy := *paymentOrderFields @@ -1298,17 +1250,17 @@ func processPaymentOrderPostCreation( defer cancel() _ = assignPaymentOrder(assignCtx, fieldsCopy) }() - } else if paymentOrderFields.ProvisionBucket == nil { + } else { logger.WithFields(logger.Fields{ "OrderID": paymentOrder.ID.String(), - }).Errorf("processPaymentOrderPostCreation: could not resolve provision bucket for order; skipping provider assignment") + }).Errorf("processPaymentOrderPostCreation: missing institution currency context for provider assignment") } return nil } // validateAndPreparePaymentOrderData validates the blockchain event data and prepares payment order fields. -// Returns the prepared fields, token, institution, currency, provision bucket, and any error. +// Returns the prepared fields, token, institution, currency, and any error. // existingOrder is an optional existing payment order that should be used for cancellation instead of creating a new one. func validateAndPreparePaymentOrderData( ctx context.Context, @@ -1316,7 +1268,7 @@ func validateAndPreparePaymentOrderData( event *types.OrderCreatedEvent, refundOrder func(context.Context, *ent.Network, string) error, existingOrder *ent.PaymentOrder, -) (*types.PaymentOrderFields, *ent.Token, *ent.Institution, *ent.FiatCurrency, *ent.ProvisionBucket, error) { +) (*types.PaymentOrderFields, *ent.Token, *ent.Institution, *ent.FiatCurrency, error) { // Get token from db token, err := db.Client.Token. Query(). @@ -1340,16 +1292,16 @@ func validateAndPreparePaymentOrderData( }).Errorf("token lookup failed and refund failed") refundErr := refundOrder(ctx, network, event.OrderId) if refundErr != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("token lookup failed and refund failed: %w", refundErr) + return nil, nil, nil, nil, fmt.Errorf("token lookup failed and refund failed: %w", refundErr) } } - return nil, nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil } // Get order recipient from message hash recipient, err := cryptoUtils.GetOrderRecipientFromMessageHash(event.MessageHash) if err != nil { - return nil, nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, nil, fmt.Sprintf("Message hash decryption failed %v", err), refundOrder, existingOrder) + return nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, nil, fmt.Sprintf("Message hash decryption failed %v", err), refundOrder, existingOrder) } if recipient.Metadata != nil { @@ -1367,7 +1319,7 @@ func validateAndPreparePaymentOrderData( // TODO: Enforce KYB status checks here in the future if needed // Example enforcement: // if serverConf.Environment == "production" && kybStatus != user.KybVerificationStatusApproved { - // return nil, nil, nil, nil, nil, createBasicPaymentOrderAndCancel( + // return nil, nil, nil, nil, createBasicPaymentOrderAndCancel( // ctx, event, network, token, recipient, // "Sender KYB verification not approved", // refundOrder, existingOrder, @@ -1379,7 +1331,7 @@ func validateAndPreparePaymentOrderData( // Get institution institution, err := utils.GetInstitutionByCode(ctx, recipient.Institution, true) if err != nil { - return nil, nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, recipient, "Institution lookup failed", refundOrder, existingOrder) + return nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, recipient, "Institution lookup failed", refundOrder, existingOrder) } // Get currency @@ -1391,31 +1343,13 @@ func validateAndPreparePaymentOrderData( ). Only(ctx) if err != nil { - return nil, nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, recipient, "Currency lookup failed", refundOrder, existingOrder) + return nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, recipient, "Currency lookup failed", refundOrder, existingOrder) } // Adjust amounts for token decimals event.Amount = event.Amount.Div(decimal.NewFromInt(10).Pow(decimal.NewFromInt(int64(token.Decimals)))) event.ProtocolFee = event.ProtocolFee.Div(decimal.NewFromInt(10).Pow(decimal.NewFromInt(int64(token.Decimals)))) - // Get provision bucket - var provisionBucket *ent.ProvisionBucket - var isLessThanMin bool - provisionBucket, isLessThanMin, err = GetProvisionBucket(ctx, event.Amount.Mul(event.Rate), currency) - if err != nil { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "Amount": event.Amount, - "Currency": currency, - }).Errorf("failed to fetch provision bucket when creating payment order") - - cancellationReason := "Provision bucket lookup failed" - if errors.Is(err, ErrNoProvisionBucketForAmount) { - cancellationReason = "No provision bucket for this amount" - } - return nil, nil, nil, nil, nil, createBasicPaymentOrderAndCancel(ctx, event, network, token, recipient, cancellationReason, refundOrder, existingOrder) - } - // Normalize mobile money account identifier (same as sender API path) accountIdentifier := recipient.AccountIdentifier if institution.Type == institutionent.TypeMobileMoney && institution.Edges.FiatCurrency != nil { @@ -1441,18 +1375,9 @@ func validateAndPreparePaymentOrderData( Memo: recipient.Memo, MessageHash: event.MessageHash, Metadata: recipient.Metadata, - ProvisionBucket: provisionBucket, OrderType: "regular", } - if isLessThanMin { - err := HandleCancellation(ctx, existingOrder, nil, "Amount is less than the minimum bucket", refundOrder) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) - } - return nil, nil, nil, nil, nil, nil - } - // Validate rate rateResult, rateErr := utils.ValidateRate( ctx, @@ -1467,17 +1392,17 @@ func validateAndPreparePaymentOrderData( if rateResult.Rate == decimal.NewFromInt(1) && paymentOrderFields.Rate != decimal.NewFromInt(1) { err := HandleCancellation(ctx, nil, paymentOrderFields, "Rate validation failed", refundOrder) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) } - return nil, nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil } if rateErr != nil { err := HandleCancellation(ctx, nil, paymentOrderFields, fmt.Sprintf("Rate validation failed: %s", rateErr.Error()), refundOrder) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) } - return nil, nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil } // Check if event rate is within 0.1% tolerance of validated rate @@ -1487,9 +1412,9 @@ func validateAndPreparePaymentOrderData( if rateDiff.GreaterThan(tolerance) { err := HandleCancellation(ctx, nil, paymentOrderFields, "Rate validation failed", refundOrder) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) } - return nil, nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil } // Use order type from ValidateRate result @@ -1532,9 +1457,9 @@ func validateAndPreparePaymentOrderData( // 4. Provider does not support the currency // 5. Provider have not configured a settlement address for the network _ = HandleCancellation(ctx, nil, paymentOrderFields, "Provider not available", refundOrder) - return nil, nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil } else { - return nil, nil, nil, nil, nil, fmt.Errorf("%s - failed to fetch provider: %w", paymentOrderFields.GatewayID, err) + return nil, nil, nil, nil, fmt.Errorf("%s - failed to fetch provider: %w", paymentOrderFields.GatewayID, err) } } @@ -1547,9 +1472,9 @@ func validateAndPreparePaymentOrderData( if provisionBucket == nil && !isPrivate { err := HandleCancellation(ctx, nil, paymentOrderFields, "Amount is larger than the maximum bucket", refundOrder) if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) + return nil, nil, nil, nil, fmt.Errorf("failed to handle cancellation: %w", err) } - return nil, nil, nil, nil, nil, nil + return nil, nil, nil, nil, nil } return paymentOrderFields, token, institution, currency, provisionBucket, nil diff --git a/services/priority_queue.go b/services/priority_queue.go index 363e4513..99cb0c42 100644 --- a/services/priority_queue.go +++ b/services/priority_queue.go @@ -2,9 +2,9 @@ package services import ( "context" - "errors" "fmt" "math/rand" + "sort" "strings" "time" @@ -340,6 +340,30 @@ func (s *PriorityQueueService) GetProviderRate(ctx context.Context, provider *en return rate, nil } +func providerOrderTokenRate(orderToken *ent.ProviderOrderToken, side RateSide) decimal.Decimal { + if orderToken == nil || orderToken.Edges.Currency == nil { + return decimal.Zero + } + switch side { + case RateSideBuy: + if !orderToken.FixedBuyRate.IsZero() { + return orderToken.FixedBuyRate + } + if orderToken.Edges.Currency.MarketBuyRate.IsZero() { + return decimal.Zero + } + return orderToken.Edges.Currency.MarketBuyRate.Add(orderToken.FloatingBuyDelta).RoundBank(2) + default: + if !orderToken.FixedSellRate.IsZero() { + return orderToken.FixedSellRate + } + if orderToken.Edges.Currency.MarketSellRate.IsZero() { + return decimal.Zero + } + return orderToken.Edges.Currency.MarketSellRate.Add(orderToken.FloatingSellDelta).RoundBank(2) + } +} + // deleteQueue deletes existing circular queue func (s *PriorityQueueService) deleteQueue(ctx context.Context, key string) error { _, err := storage.RedisClient.Del(ctx, key).Result() @@ -699,35 +723,8 @@ func (s *PriorityQueueService) tryUsePreSetProvider(ctx context.Context, order t // AssignPaymentOrder assigns payment orders to providers func (s *PriorityQueueService) AssignPaymentOrder(ctx context.Context, order types.PaymentOrderFields) error { - orderIDPrefix := strings.Split(order.ID.String(), "-")[0] orderConf := config.OrderConfig() - // Both regular and OTC orders must have a provision bucket, unless order has a pre-set private provider (private orders don't require buckets). - allowNilBucketForPrivate := false - if order.ProvisionBucket == nil { - if order.ProviderID != "" { - providerForCheck, pErr := storage.Client.ProviderProfile.Query().Where(providerprofile.IDEQ(order.ProviderID)).Only(ctx) - if pErr == nil && providerForCheck != nil && providerForCheck.VisibilityMode == providerprofile.VisibilityModePrivate { - allowNilBucketForPrivate = true - } - } - if !allowNilBucketForPrivate { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "OrderType": order.OrderType, - "Reason": "internal: Order missing provision bucket", - }).Errorf("AssignPaymentOrder.MissingProvisionBucket") - return fmt.Errorf("order %s (type: %s) is missing provision bucket", order.ID.String(), order.OrderType) - } - } else if order.ProvisionBucket.Edges.Currency == nil { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "OrderType": order.OrderType, - "Reason": "internal: Provision bucket missing currency", - }).Errorf("AssignPaymentOrder.MissingCurrency") - return fmt.Errorf("provision bucket for order %s (type: %s) is missing currency", order.ID.String(), order.OrderType) - } - // Defensive check: Verify order is in a valid state for assignment // This prevents duplicate assignments from concurrent sources currentOrder, err := storage.Client.PaymentOrder.Get(ctx, order.ID) @@ -834,47 +831,153 @@ func (s *PriorityQueueService) AssignPaymentOrder(ctx context.Context, order typ } } - // Private orders with nil bucket only use the pre-set path above; do not run queue matching. - if order.ProvisionBucket == nil { - return nil + currency, err := resolveOrderFieldsCurrency(ctx, order) + if err != nil { + return err + } + if order.Token == nil { + return fmt.Errorf("order %s has no token", order.ID) + } + network := order.Network + if network == nil && order.Token.Edges.Network != nil { + network = order.Token.Edges.Network + } + if network == nil { + network, err = order.Token.QueryNetwork().Only(ctx) + if err != nil { + return err + } } - // Use same side-suffixed key as buildQueueForSide so reads/writes use identical keys (rateSide computed above) - baseRedisKey := fmt.Sprintf("bucket_%s_%s_%s", order.ProvisionBucket.Edges.Currency.Code, order.ProvisionBucket.MinAmount, order.ProvisionBucket.MaxAmount) - redisKey := baseRedisKey + "_" + string(rateSide) + requiredBalance := order.Amount + if rateSide == RateSideSell { + requiredBalance = order.Amount.Mul(order.Rate).RoundBank(0) + } - err = s.matchRate(ctx, redisKey, orderIDPrefix, order, excludeList) + candidates, err := storage.Client.ProviderOrderToken.Query(). + Where( + providerordertoken.NetworkEQ(network.Identifier), + providerordertoken.HasTokenWith(token.IDEQ(order.Token.ID)), + providerordertoken.HasCurrencyWith(fiatcurrency.IDEQ(currency.ID)), + providerordertoken.SettlementAddressNEQ(""), + providerordertoken.HasProviderWith( + providerprofile.IDNEQ(orderConf.FallbackProviderID), + providerprofile.IsActive(true), + providerprofile.VisibilityModeEQ(providerprofile.VisibilityModePublic), + ), + ). + WithProvider(func(pq *ent.ProviderProfileQuery) { + pq.WithProviderRating() + }). + WithCurrency(). + All(ctx) if err != nil { - prevRedisKey := redisKey + "_prev" - err = s.matchRate(ctx, prevRedisKey, orderIDPrefix, order, excludeList) - if err != nil { - // Both matchRate attempts failed; try fallback once for regular orders - matchRateErr := err - if order.OrderType != "otc" { - orderEnt, loadErr := storage.Client.PaymentOrder.Query(). - Where(paymentorder.IDEQ(order.ID)). - WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). - Only(ctx) - if loadErr == nil { - if fallbackErr := s.TryFallbackAssignment(ctx, orderEnt); fallbackErr == nil { - return nil - } else { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "Error": fallbackErr.Error(), - }).Errorf("AssignPaymentOrder: TryFallbackAssignment failed after no provider in queue") - var errStuck *types.ErrNoProviderDueToStuck - if errors.As(fallbackErr, &errStuck) { - return fallbackErr - } - } - } + return err + } + if len(candidates) == 0 { + if order.OrderType != "otc" { + orderEnt, loadErr := storage.Client.PaymentOrder.Query().Where(paymentorder.IDEQ(order.ID)).WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }).Only(ctx) + if loadErr == nil { + return s.TryFallbackAssignment(ctx, orderEnt) } - return fmt.Errorf("no provider matched for order: %w", matchRateErr) + } + return fmt.Errorf("no eligible provider order tokens found") + } + providerIDs := make([]string, 0, len(candidates)) + for _, candidate := range candidates { + if candidate.Edges.Provider != nil { + providerIDs = append(providerIDs, candidate.Edges.Provider.ID) } } + recentVolumes, _ := getRecentSuccessfulFiatVolumeByProvider(ctx, providerIDs) + sort.SliceStable(candidates, func(i, j int) bool { + leftScore := decimal.Zero + rightScore := decimal.Zero + if candidates[i].Edges.Provider != nil && candidates[i].Edges.Provider.Edges.ProviderRating != nil { + leftScore = candidates[i].Edges.Provider.Edges.ProviderRating.TrustScore + } + if candidates[j].Edges.Provider != nil && candidates[j].Edges.Provider.Edges.ProviderRating != nil { + rightScore = candidates[j].Edges.Provider.Edges.ProviderRating.TrustScore + } + if !leftScore.Equal(rightScore) { + return leftScore.GreaterThan(rightScore) + } + leftVol := recentVolumes[candidates[i].Edges.Provider.ID] + rightVol := recentVolumes[candidates[j].Edges.Provider.ID] + if !leftVol.Equal(rightVol) { + return leftVol.LessThan(rightVol) + } + return candidates[i].Edges.Provider.ID < candidates[j].Edges.Provider.ID + }) - return nil + for _, candidate := range candidates { + providerID := candidate.Edges.Provider.ID + hasBalance := false + if rateSide == RateSideBuy { + hasBalance, err = storage.Client.ProviderBalances.Query(). + Where( + providerbalances.HasProviderWith(providerprofile.IDEQ(providerID)), + providerbalances.HasTokenWith(token.IDEQ(order.Token.ID)), + providerbalances.IsAvailableEQ(true), + providerbalances.AvailableBalanceGTE(requiredBalance), + ). + Exist(ctx) + } else { + hasBalance, err = storage.Client.ProviderBalances.Query(). + Where( + providerbalances.HasProviderWith(providerprofile.IDEQ(providerID)), + providerbalances.HasFiatCurrencyWith(fiatcurrency.IDEQ(currency.ID)), + providerbalances.IsAvailableEQ(true), + providerbalances.AvailableBalanceGTE(requiredBalance), + ). + Exist(ctx) + } + if err != nil || !hasBalance { + continue + } + excludeCount := s.countProviderInExcludeList(excludeList, providerID) + if order.OrderType == "otc" { + if excludeCount > 0 { + continue + } + } else if excludeCount >= orderConf.ProviderMaxRetryAttempts { + continue + } + if order.OrderType != "otc" && orderConf.ProviderStuckFulfillmentThreshold > 0 { + stuckCount, errStuck := utils.GetProviderStuckOrderCount(ctx, providerID) + if errStuck == nil && stuckCount >= orderConf.ProviderStuckFulfillmentThreshold { + continue + } + } + if order.Amount.LessThan(candidate.MinOrderAmount) { + continue + } + if order.Amount.GreaterThan(candidate.MaxOrderAmount) { + if candidate.MinOrderAmountOtc.IsZero() || candidate.MaxOrderAmountOtc.IsZero() { + continue + } + if order.Amount.LessThan(candidate.MinOrderAmountOtc) || order.Amount.GreaterThan(candidate.MaxOrderAmountOtc) { + continue + } + } + rate := providerOrderTokenRate(candidate, rateSide) + allowedDeviation := order.Rate.Mul(candidate.RateSlippage.Div(decimal.NewFromInt(100))) + if rate.Sub(order.Rate).Abs().GreaterThan(allowedDeviation) { + continue + } + order.ProviderID = providerID + if order.OrderType == "otc" { + return s.assignOtcOrder(ctx, order) + } + return s.sendOrderRequest(ctx, order) + } + if order.OrderType != "otc" { + orderEnt, loadErr := storage.Client.PaymentOrder.Query().Where(paymentorder.IDEQ(order.ID)).WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }).Only(ctx) + if loadErr == nil { + return s.TryFallbackAssignment(ctx, orderEnt) + } + } + return fmt.Errorf("no provider matched for order") } // TryFallbackAssignment attempts to assign the order to the configured fallback provider using only rate and balance checks. @@ -902,12 +1005,8 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order } // Verify order is still in a state that allows assignment; DB-level idempotency for fallback. - // Eagerly load ProvisionBucket+Currency so we never need a separate fallback query for them. currentOrder, err := storage.Client.PaymentOrder.Query(). Where(paymentorder.IDEQ(order.ID)). - WithProvisionBucket(func(pb *ent.ProvisionBucketQuery) { - pb.WithCurrency() - }). Only(ctx) if err != nil { return fmt.Errorf("fallback: failed to load order: %w", err) @@ -966,7 +1065,6 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order AccountIdentifier: order.AccountIdentifier, AccountName: order.AccountName, ProviderID: "", - ProvisionBucket: currentOrder.Edges.ProvisionBucket, MessageHash: order.MessageHash, Memo: order.Memo, UpdatedAt: order.UpdatedAt, @@ -980,39 +1078,9 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order return fmt.Errorf("fallback: order %s has no token", order.ID.String()) } - // If order has no bucket yet, resolve one from institution currency + fiat amount. - if fields.ProvisionBucket == nil { - institution, instErr := utils.GetInstitutionByCode(ctx, order.Institution, true) - if instErr != nil { - return fmt.Errorf("fallback: cannot resolve bucket for order %s: institution lookup failed: %w", fields.ID.String(), instErr) - } - if institution.Edges.FiatCurrency == nil { - return fmt.Errorf("fallback: cannot resolve bucket for order %s: institution %s has no fiat currency", fields.ID.String(), order.Institution) - } - fiatAmount := order.Amount.Mul(order.Rate) - bucket, bErr := storage.Client.ProvisionBucket. - Query(). - Where( - provisionbucket.MaxAmountGTE(fiatAmount), - provisionbucket.MinAmountLTE(fiatAmount), - provisionbucket.HasCurrencyWith(fiatcurrency.IDEQ(institution.Edges.FiatCurrency.ID)), - ). - WithCurrency(). - Only(ctx) - if bErr != nil { - return fmt.Errorf("fallback: no matching provision bucket for order %s (fiat %s %s): %w", - fields.ID.String(), fiatAmount.String(), institution.Edges.FiatCurrency.Code, bErr) - } - fields.ProvisionBucket = bucket - // Persist so later flows (e.g. FulfillOrder) see the bucket and do not panic on nil ProvisionBucket - if _, upErr := storage.Client.PaymentOrder.UpdateOneID(fields.ID).SetProvisionBucket(bucket).Save(ctx); upErr != nil { - return fmt.Errorf("fallback: failed to set provision bucket on order %s: %w", fields.ID.String(), upErr) - } - } - - bucketCurrency := fields.ProvisionBucket.Edges.Currency - if bucketCurrency == nil { - return fmt.Errorf("fallback: provision bucket %d missing currency", fields.ProvisionBucket.ID) + bucketCurrency, err := resolveOrderCurrency(ctx, currentOrder) + if err != nil { + return fmt.Errorf("fallback: resolve order currency: %w", err) } // Skip fallback provider if at or over stuck fulfillment threshold @@ -1358,7 +1426,11 @@ func (s *PriorityQueueService) addProviderToExcludeList(ctx context.Context, ord func (s *PriorityQueueService) sendOrderRequest(ctx context.Context, order types.PaymentOrderFields) error { orderConf := config.OrderConfig() // Reserve balance for this order - currency := order.ProvisionBucket.Edges.Currency.Code + currencyEnt, err := resolveOrderFieldsCurrency(ctx, order) + if err != nil { + return err + } + currency := currencyEnt.Code amount := order.Amount.Mul(order.Rate).RoundBank(0) // Start a transaction for the entire operation @@ -1444,7 +1516,7 @@ func (s *PriorityQueueService) sendOrderRequest(ctx context.Context, order types orderRequestData := map[string]interface{}{ "amount": order.Amount.Mul(order.Rate).RoundBank(0).String(), "institution": order.Institution, - "currency": order.ProvisionBucket.Edges.Currency.Code, + "currency": currency, "providerId": order.ProviderID, } diff --git a/services/provider_selection_constants.go b/services/provider_selection_constants.go new file mode 100644 index 00000000..ce8b95f3 --- /dev/null +++ b/services/provider_selection_constants.go @@ -0,0 +1,34 @@ +package services + +import ( + "strings" + "time" + + "github.com/shopspring/decimal" +) + +var ( + RewardFulfilledValidated = decimal.NewFromFloat(1.0) + PenaltyCancelInsufficientFunds = decimal.NewFromFloat(-1.5) + PenaltyCancelProviderFault = decimal.NewFromFloat(-1.0) + PenaltyValidationFailed = decimal.NewFromFloat(-2.0) + PenaltyOrderRequestExpired = decimal.NewFromFloat(-0.5) + RecentProcessedVolumeWindow = 24 * time.Hour + ProviderFaultCancelReasons = []string{ + "out of stock", + "declined", + "rate expired", + "unable to fulfill", + "capacity limit", + } +) + +func isProviderFaultCancelReason(reason string) bool { + normalized := strings.TrimSpace(strings.ToLower(reason)) + for _, allowed := range ProviderFaultCancelReasons { + if normalized == allowed { + return true + } + } + return false +} diff --git a/services/provider_selection_helpers.go b/services/provider_selection_helpers.go new file mode 100644 index 00000000..a9031711 --- /dev/null +++ b/services/provider_selection_helpers.go @@ -0,0 +1,63 @@ +package services + +import ( + "context" + "fmt" + "time" + + "github.com/paycrest/aggregator/ent" + "github.com/paycrest/aggregator/ent/paymentorder" + "github.com/paycrest/aggregator/ent/providerprofile" + "github.com/paycrest/aggregator/storage" + "github.com/paycrest/aggregator/types" + "github.com/paycrest/aggregator/utils" + "github.com/shopspring/decimal" +) + +func resolveInstitutionCurrency(ctx context.Context, institutionCode string) (*ent.FiatCurrency, error) { + institution, err := utils.GetInstitutionByCode(ctx, institutionCode, true) + if err != nil { + return nil, err + } + if institution == nil || institution.Edges.FiatCurrency == nil { + return nil, fmt.Errorf("institution %s has no fiat currency", institutionCode) + } + return institution.Edges.FiatCurrency, nil +} + +func resolveOrderCurrency(ctx context.Context, order *ent.PaymentOrder) (*ent.FiatCurrency, error) { + if order == nil { + return nil, fmt.Errorf("payment order is nil") + } + return resolveInstitutionCurrency(ctx, order.Institution) +} + +func resolveOrderFieldsCurrency(ctx context.Context, order types.PaymentOrderFields) (*ent.FiatCurrency, error) { + return resolveInstitutionCurrency(ctx, order.Institution) +} + +func getRecentSuccessfulFiatVolumeByProvider(ctx context.Context, providerIDs []string) (map[string]decimal.Decimal, error) { + volumes := make(map[string]decimal.Decimal, len(providerIDs)) + if len(providerIDs) == 0 { + return volumes, nil + } + since := time.Now().Add(-RecentProcessedVolumeWindow) + orders, err := storage.Client.PaymentOrder.Query(). + Where( + paymentorder.HasProviderWith(providerprofile.IDIn(providerIDs...)), + paymentorder.UpdatedAtGTE(since), + paymentorder.StatusIn(paymentorder.StatusValidated, paymentorder.StatusSettled), + ). + WithProvider(). + All(ctx) + if err != nil { + return nil, err + } + for _, order := range orders { + if order.Edges.Provider == nil { + continue + } + volumes[order.Edges.Provider.ID] = volumes[order.Edges.Provider.ID].Add(order.Amount.Mul(order.Rate)) + } + return volumes, nil +} diff --git a/services/provider_selection_helpers_test.go b/services/provider_selection_helpers_test.go new file mode 100644 index 00000000..1ae4797a --- /dev/null +++ b/services/provider_selection_helpers_test.go @@ -0,0 +1,103 @@ +package services + +import ( + "context" + "testing" + + "database/sql" + "entgo.io/ent/dialect" + entsql "entgo.io/ent/dialect/sql" + _ "github.com/mattn/go-sqlite3" + "github.com/paycrest/aggregator/ent" + db "github.com/paycrest/aggregator/storage" + "github.com/paycrest/aggregator/types" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/require" +) + +func setupCurrencyResolutionTestDB(t *testing.T) context.Context { + t.Helper() + + dbConn, err := sql.Open("sqlite3", "file:currency_resolution?mode=memory&cache=shared&_fk=1") + require.NoError(t, err) + t.Cleanup(func() { _ = dbConn.Close() }) + + drv := entsql.OpenDB(dialect.SQLite, dbConn) + client := ent.NewClient(ent.Driver(drv)) + t.Cleanup(func() { _ = client.Close() }) + + db.Client = client + require.NoError(t, client.Schema.Create(context.Background())) + + ctx := context.Background() + + kes, err := client.FiatCurrency.Create(). + SetCode("KES"). + SetShortName("Kenyan Shilling"). + SetDecimals(2). + SetSymbol("KSh"). + SetName("Kenyan Shilling"). + SetIsEnabled(true). + Save(ctx) + require.NoError(t, err) + + ngn, err := client.FiatCurrency.Create(). + SetCode("NGN"). + SetShortName("Naira"). + SetDecimals(2). + SetSymbol("₦"). + SetName("Nigerian Naira"). + SetIsEnabled(true). + Save(ctx) + require.NoError(t, err) + + _, err = client.Institution.Create(). + SetCode("MPESAKES"). + SetName("M-Pesa Kenya"). + SetType("mobile_money"). + SetFiatCurrency(kes). + Save(ctx) + require.NoError(t, err) + + bucket, err := client.ProvisionBucket.Create(). + SetMinAmount(decimal.NewFromInt(1)). + SetMaxAmount(decimal.NewFromInt(1000)). + SetCurrency(ngn). + Save(ctx) + require.NoError(t, err) + + return ctx +} + +func getTestBucket(t *testing.T, ctx context.Context) *ent.ProvisionBucket { + t.Helper() + bucket, err := db.Client.ProvisionBucket.Query().WithCurrency().Only(ctx) + require.NoError(t, err) + return bucket +} + +func TestResolveOrderCurrencyPrefersInstitutionCurrency(t *testing.T) { + ctx := setupCurrencyResolutionTestDB(t) + bucket := getTestBucket(t, ctx) + + currency, err := resolveOrderCurrency(ctx, &ent.PaymentOrder{ + Institution: "MPESAKES", + Edges: ent.PaymentOrderEdges{ + ProvisionBucket: bucket, + }, + }) + require.NoError(t, err) + require.Equal(t, "KES", currency.Code) +} + +func TestResolveOrderFieldsCurrencyPrefersInstitutionCurrency(t *testing.T) { + ctx := setupCurrencyResolutionTestDB(t) + bucket := getTestBucket(t, ctx) + + currency, err := resolveOrderFieldsCurrency(ctx, types.PaymentOrderFields{ + Institution: "MPESAKES", + ProvisionBucket: bucket, + }) + require.NoError(t, err) + require.Equal(t, "KES", currency.Code) +} diff --git a/tasks/fulfillments_webhooks.go b/tasks/fulfillments_webhooks.go index 691e165d..70d1a352 100644 --- a/tasks/fulfillments_webhooks.go +++ b/tasks/fulfillments_webhooks.go @@ -119,9 +119,6 @@ func SyncPaymentOrderFulfillments() { pq.WithAPIKey() }). WithFulfillments(). - WithProvisionBucket(func(pb *ent.ProvisionBucketQuery) { - pb.WithCurrency() - }). All(ctx) if err != nil { return @@ -139,9 +136,6 @@ func SyncPaymentOrderFulfillments() { pq.WithAPIKey() }). WithFulfillments(). - WithProvisionBucket(func(pb *ent.ProvisionBucketQuery) { - pb.WithCurrency() - }). Only(ctx) if err != nil { if ent.IsNotFound(err) { @@ -180,26 +174,19 @@ func SyncPaymentOrderFulfillments() { continue } - if order.Edges.ProvisionBucket == nil { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "ProviderID": order.Edges.Provider.ID, - "Reason": "internal: ProvisionBucket is nil", - }).Errorf("SyncPaymentOrderFulfillments.MissingProvisionBucket") - continue - } - if order.Edges.ProvisionBucket.Edges.Currency == nil { + currencyCode, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if curErr != nil || currencyCode == "" { logger.WithFields(logger.Fields{ "OrderID": order.ID.String(), "ProviderID": order.Edges.Provider.ID, - "Reason": "internal: ProvisionBucket Currency is nil", + "Reason": "internal: institution currency lookup failed", }).Errorf("SyncPaymentOrderFulfillments.MissingCurrency") continue } payload := map[string]interface{}{ "reference": getTxStatusReferenceForVA(order), - "currency": order.Edges.ProvisionBucket.Edges.Currency.Code, + "currency": currencyCode, } data, err := utils.CallProviderWithHMAC(ctx, order.Edges.Provider.ID, "POST", "/tx_status", payload) if err != nil { @@ -326,19 +313,12 @@ func SyncPaymentOrderFulfillments() { } } } else { - if order.Edges.ProvisionBucket == nil { + currencyCode, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if curErr != nil || currencyCode == "" { logger.WithFields(logger.Fields{ "OrderID": order.ID.String(), "ProviderID": order.Edges.Provider.ID, - "Reason": "internal: ProvisionBucket is nil", - }).Errorf("SyncPaymentOrderFulfillments.MissingProvisionBucket") - continue - } - if order.Edges.ProvisionBucket.Edges.Currency == nil { - logger.WithFields(logger.Fields{ - "OrderID": order.ID.String(), - "ProviderID": order.Edges.Provider.ID, - "Reason": "internal: ProvisionBucket Currency is nil", + "Reason": "internal: institution currency lookup failed", }).Errorf("SyncPaymentOrderFulfillments.MissingCurrency") continue } @@ -347,7 +327,7 @@ func SyncPaymentOrderFulfillments() { if fulfillment.ValidationStatus == paymentorderfulfillment.ValidationStatusPending { payload := map[string]interface{}{ "reference": getTxStatusReferenceForVA(order), - "currency": order.Edges.ProvisionBucket.Edges.Currency.Code, + "currency": currencyCode, "psp": fulfillment.Psp, "txId": fulfillment.TxID, } @@ -491,9 +471,13 @@ func SyncPaymentOrderFulfillments() { } else if fulfillment.ValidationStatus == paymentorderfulfillment.ValidationStatusFailed && shouldRetryTxStatusFiveMinFailure(fulfillment) { + currencyCode, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if curErr != nil || currencyCode == "" { + continue + } payload := map[string]interface{}{ "orderId": order.ID.String(), - "currency": order.Edges.ProvisionBucket.Edges.Currency.Code, + "currency": currencyCode, "psp": fulfillment.Psp, "txId": fulfillment.TxID, } @@ -758,11 +742,12 @@ func RetryFailedWebhookNotifications() error { // syncRefundingOrder calls the provider /tx_status for an onramp order in Refunding and updates order/fulfillment to refunded/failed/pending. func syncRefundingOrder(ctx context.Context, order *ent.PaymentOrder) { - if order.Edges.ProvisionBucket == nil || order.Edges.ProvisionBucket.Edges.Currency == nil { + currencyCode, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if curErr != nil || currencyCode == "" { logger.WithFields(logger.Fields{ "OrderID": order.ID.String(), "ProviderID": order.Edges.Provider.ID, - }).Errorf("SyncPaymentOrderFulfillments.syncRefundingOrder: missing ProvisionBucket or Currency") + }).Errorf("SyncPaymentOrderFulfillments.syncRefundingOrder: missing institution currency") return } @@ -778,7 +763,7 @@ func syncRefundingOrder(ctx context.Context, order *ent.PaymentOrder) { payload := map[string]interface{}{ "reference": refundReference, - "currency": order.Edges.ProvisionBucket.Edges.Currency.Code, + "currency": currencyCode, } data, err := utils.CallProviderWithHMAC(ctx, order.Edges.Provider.ID, "POST", "/tx_status", payload) if err != nil { @@ -953,6 +938,10 @@ func callRequestAuthorization(ctx context.Context, order *ent.PaymentOrder, psp, // callTxRefundAndStore calls POST /tx_refund via CallProviderWithHMAC; on 200 stores refundReference in order metadata and returns it. func callTxRefundAndStore(ctx context.Context, order *ent.PaymentOrder) (refundReference string, err error) { + currencyCode, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if curErr != nil || currencyCode == "" { + return "", fmt.Errorf("institution currency lookup failed") + } fiatAmount := order.Amount.Add(order.SenderFee).Mul(order.Rate).RoundBank(0).String() refundAccount := map[string]interface{}{ "accountIdentifier": order.AccountIdentifier, @@ -966,7 +955,7 @@ func callTxRefundAndStore(ctx context.Context, order *ent.PaymentOrder) (refundR } body := map[string]interface{}{ "orderId": order.ID.String(), - "currency": order.Edges.ProvisionBucket.Edges.Currency.Code, + "currency": currencyCode, "amount": fiatAmount, "refundAccount": refundAccount, } diff --git a/tasks/indexing.go b/tasks/indexing.go index f8194f56..45a119e7 100644 --- a/tasks/indexing.go +++ b/tasks/indexing.go @@ -432,15 +432,11 @@ func ProcessStuckValidatedOrders() error { ), ), paymentorder.HasProvider(), - paymentorder.HasProvisionBucket(), ). WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). WithProvider(). - WithProvisionBucket(func(pb *ent.ProvisionBucketQuery) { - pb.WithCurrency() - }). All(runCtx) if err != nil { logger.WithFields(logger.Fields{ diff --git a/tasks/order_requests.go b/tasks/order_requests.go index eb8406db..1504e6c5 100644 --- a/tasks/order_requests.go +++ b/tasks/order_requests.go @@ -14,6 +14,7 @@ import ( "github.com/paycrest/aggregator/services/balance" "github.com/paycrest/aggregator/storage" "github.com/paycrest/aggregator/types" + "github.com/paycrest/aggregator/utils" "github.com/paycrest/aggregator/utils/logger" "github.com/redis/go-redis/v9" "github.com/shopspring/decimal" @@ -30,8 +31,11 @@ func canReassignCancelledOrder(order *ent.PaymentOrder) bool { // cleanupStuckFulfilledFailedOrder clears provider and sets status to Pending for orders stuck in Fulfilled+failed outside refund window (state cleanup only). func cleanupStuckFulfilledFailedOrder(ctx context.Context, order *ent.PaymentOrder) { // Release reserved fiat balance for the provider before clearing (same logic as reassignCancelledOrder). - if order.Edges.Provider != nil && order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil { - currency := order.Edges.ProvisionBucket.Edges.Currency.Code + if order.Edges.Provider != nil { + currency, err := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true) + if err != nil || currency == "" { + goto clearOrder + } amount := order.Amount.Mul(order.Rate).RoundBank(0) balanceSvc := balance.New() if relErr := balanceSvc.ReleaseFiatBalance(ctx, order.Edges.Provider.ID, currency, amount, nil); relErr != nil { @@ -45,6 +49,7 @@ func cleanupStuckFulfilledFailedOrder(ctx context.Context, order *ent.PaymentOrd } } +clearOrder: _, err := storage.Client.PaymentOrder. Update(). Where( @@ -127,8 +132,7 @@ func reassignCancelledOrder(ctx context.Context, order *ent.PaymentOrder, fulfil // Best-effort: release any reserved balance held by this provider for the order. // This prevents "stuck" reserved balances from blocking future assignments. - if order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil { - currency := order.Edges.ProvisionBucket.Edges.Currency.Code + if currency, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true); curErr == nil && currency != "" { amount := order.Amount.Mul(order.Rate).RoundBank(0) balanceSvc := balance.New() if relErr := balanceSvc.ReleaseFiatBalance(ctx, order.Edges.Provider.ID, currency, amount, nil); relErr != nil { @@ -174,7 +178,6 @@ func reassignCancelledOrder(ctx context.Context, order *ent.PaymentOrder, fulfil AccountName: order.AccountName, ProviderID: "", Memo: order.Memo, - ProvisionBucket: order.Edges.ProvisionBucket, } err = services.NewPriorityQueueService().AssignPaymentOrder(ctx, paymentOrder) @@ -250,9 +253,6 @@ func ReassignStaleOrderRequest(ctx context.Context, orderRequestChan <-chan *red Where( paymentorder.IDEQ(orderUUID), ). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { - pbq.WithCurrency() - }). WithProvider(). WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() @@ -310,19 +310,20 @@ func ReassignStaleOrderRequest(ctx context.Context, orderRequestChan <-chan *red // Cleanup metadata key regardless of success to avoid stale entries. _, _ = storage.RedisClient.Del(ctx, metaKey).Result() - } else if order.Edges.Provider != nil && order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil { + } else if order.Edges.Provider != nil { // Fallback: no meta (e.g. key missing/expired) but order has provider and bucket (e.g. private/pre-set). // Release so reserved balance is not left stuck. - currency := order.Edges.ProvisionBucket.Edges.Currency.Code - amount := order.Amount.Mul(order.Rate).RoundBank(0) - if relErr := balanceSvc.ReleaseFiatBalance(ctx, order.Edges.Provider.ID, currency, amount, nil); relErr != nil { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", relErr), - "OrderID": order.ID.String(), - "ProviderID": order.Edges.Provider.ID, - "Currency": currency, - "Amount": amount.String(), - }).Warnf("ReassignStaleOrderRequest: failed to release reserved balance from order (best effort)") + if currency, curErr := utils.GetInstitutionCurrencyCode(ctx, order.Institution, true); curErr == nil && currency != "" { + amount := order.Amount.Mul(order.Rate).RoundBank(0) + if relErr := balanceSvc.ReleaseFiatBalance(ctx, order.Edges.Provider.ID, currency, amount, nil); relErr != nil { + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", relErr), + "OrderID": order.ID.String(), + "ProviderID": order.Edges.Provider.ID, + "Currency": currency, + "Amount": amount.String(), + }).Warnf("ReassignStaleOrderRequest: failed to release reserved balance from order (best effort)") + } } } @@ -406,7 +407,6 @@ func ReassignStaleOrderRequest(ctx context.Context, orderRequestChan <-chan *red Memo: order.Memo, ProviderID: providerID, MessageHash: order.MessageHash, - ProvisionBucket: order.Edges.ProvisionBucket, } // Include token and network if available diff --git a/tasks/stale_ops.go b/tasks/stale_ops.go index d87241ca..ccbe1fb9 100644 --- a/tasks/stale_ops.go +++ b/tasks/stale_ops.go @@ -421,9 +421,6 @@ func RetryStaleUserOperations() error { WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). - WithProvisionBucket(func(pbq *ent.ProvisionBucketQuery) { - pbq.WithCurrency() - }). WithProvider(). All(ctx) if err != nil { @@ -466,9 +463,7 @@ func RetryStaleUserOperations() error { tryFullQueue := !fallbackAlreadyTried // No provider or public provider: can try full-queue (reassign to public). Private provider: skip full-queue. - // Allow nil bucket so we can try to resolve and persist it before assignment (another node can then pick the order). - canTryFullQueue := (order.Edges.ProvisionBucket == nil || (order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil)) && - (order.Edges.Provider == nil || order.Edges.Provider.VisibilityMode != providerprofile.VisibilityModePrivate) + canTryFullQueue := order.Edges.Provider == nil || order.Edges.Provider.VisibilityMode != providerprofile.VisibilityModePrivate if tryFullQueue && canTryFullQueue { // AssignPaymentOrder expects StatusPending; if Cancelled (e.g. from HandleCancellation), set to Pending first. @@ -481,66 +476,38 @@ func RetryStaleUserOperations() error { Save(ctx) } - // Resolve and persist nil provision bucket so AssignPaymentOrder can run and another node can pick the order. - if order.Edges.ProvisionBucket == nil { - institution, instErr := utils.GetInstitutionByCode(ctx, order.Institution, true) - if instErr == nil && institution != nil && institution.Edges.FiatCurrency != nil { - fiatAmount := order.Amount.Mul(order.Rate) - bucket, bErr := storage.Client.ProvisionBucket. - Query(). - Where( - provisionbucket.MaxAmountGTE(fiatAmount), - provisionbucket.MinAmountLTE(fiatAmount), - provisionbucket.HasCurrencyWith(fiatcurrency.IDEQ(institution.Edges.FiatCurrency.ID)), - ). - WithCurrency(). - First(ctx) - if bErr == nil && bucket != nil { - if _, upErr := storage.Client.PaymentOrder.UpdateOneID(order.ID).SetProvisionBucket(bucket).Save(ctx); upErr == nil { - order.Edges.ProvisionBucket = bucket - } - } - } - if order.Edges.ProvisionBucket == nil { - logger.WithFields(logger.Fields{"OrderID": order.ID.String()}).Warnf("stale_ops: could not resolve provision bucket for order; skipping full-queue assignment") - } + orderFields := types.PaymentOrderFields{ + ID: order.ID, + OrderType: order.OrderType.String(), + Token: order.Edges.Token, + GatewayID: order.GatewayID, + Amount: order.Amount, + Rate: order.Rate, + Institution: order.Institution, + AccountIdentifier: order.AccountIdentifier, + AccountName: order.AccountName, + ProviderID: "", + MessageHash: order.MessageHash, + Memo: order.Memo, + UpdatedAt: order.UpdatedAt, + CreatedAt: order.CreatedAt, + } + if order.Edges.Token != nil && order.Edges.Token.Edges.Network != nil { + orderFields.Network = order.Edges.Token.Edges.Network } - if order.Edges.ProvisionBucket != nil && order.Edges.ProvisionBucket.Edges.Currency != nil { - orderFields := types.PaymentOrderFields{ - ID: order.ID, - OrderType: order.OrderType.String(), - Token: order.Edges.Token, - GatewayID: order.GatewayID, - Amount: order.Amount, - Rate: order.Rate, - Institution: order.Institution, - AccountIdentifier: order.AccountIdentifier, - AccountName: order.AccountName, - ProviderID: "", - ProvisionBucket: order.Edges.ProvisionBucket, - MessageHash: order.MessageHash, - Memo: order.Memo, - UpdatedAt: order.UpdatedAt, - CreatedAt: order.CreatedAt, - } - if order.Edges.Token != nil && order.Edges.Token.Edges.Network != nil { - orderFields.Network = order.Edges.Token.Edges.Network - } - - err := pq.AssignPaymentOrder(ctx, orderFields) - if err == nil { - logger.WithFields(logger.Fields{"OrderID": order.ID.String()}).Infof("order assigned to provider during refund process; skipping refund") - continue - } - // We tried public reassignment and it failed; set cancellation count to threshold immediately so we can refund. - // Any failure should proceed to try fallback, else proceed with refund - _, _ = storage.Client.PaymentOrder. - Update(). - Where(paymentorder.IDEQ(order.ID)). - SetCancellationCount(orderConf.RefundCancellationCount). - Save(ctx) + err := pq.AssignPaymentOrder(ctx, orderFields) + if err == nil { + logger.WithFields(logger.Fields{"OrderID": order.ID.String()}).Infof("order assigned to provider during refund process; skipping refund") + continue } + // We tried public reassignment and it failed; set cancellation count to threshold immediately so we can refund. + // Any failure should proceed to try fallback, else proceed with refund + _, _ = storage.Client.PaymentOrder. + Update(). + Where(paymentorder.IDEQ(order.ID)). + SetCancellationCount(orderConf.RefundCancellationCount). + Save(ctx) } // Private orders: try assignment to pre-set provider (AssignPaymentOrder accepts nil bucket for pre-set provider). @@ -563,7 +530,6 @@ func RetryStaleUserOperations() error { AccountIdentifier: order.AccountIdentifier, AccountName: order.AccountName, ProviderID: order.Edges.Provider.ID, - ProvisionBucket: order.Edges.ProvisionBucket, MessageHash: order.MessageHash, Memo: order.Memo, UpdatedAt: order.UpdatedAt, diff --git a/tasks/startup.go b/tasks/startup.go index 3766df34..fb46406e 100644 --- a/tasks/startup.go +++ b/tasks/startup.go @@ -5,7 +5,6 @@ import ( "time" "github.com/go-co-op/gocron" - "github.com/paycrest/aggregator/services" "github.com/paycrest/aggregator/storage" "github.com/paycrest/aggregator/utils/logger" ) @@ -40,32 +39,17 @@ func SubscribeToRedisKeyspaceEvents() func() { func StartCronJobs() { // Use the system's local timezone instead of hardcoded UTC to prevent timezone conflicts scheduler := gocron.NewScheduler(time.Local) - priorityQueue := services.NewPriorityQueueService() - err := ComputeMarketRate() if err != nil { logger.Errorf("StartCronJobs for ComputeMarketRate: %v", err) } - if serverConf.Environment != "production" { - err = priorityQueue.ProcessBucketQueues() - if err != nil { - logger.Errorf("StartCronJobs for ProcessBucketQueues: %v", err) - } - } - // Compute market rate every 9 minutes _, err = scheduler.Every(9).Minutes().Do(ComputeMarketRate) if err != nil { logger.Errorf("StartCronJobs for ComputeMarketRate: %v", err) } - // Refresh provision bucket priority queues every X minutes - _, err = scheduler.Every(orderConf.BucketQueueRebuildInterval).Minutes().Do(priorityQueue.ProcessBucketQueues) - if err != nil { - logger.Errorf("StartCronJobs for ProcessBucketQueues: %v", err) - } - // Retry failed webhook notifications every 13 minutes _, err = scheduler.Every(13).Minutes().Do(RetryFailedWebhookNotifications) if err != nil { diff --git a/utils/utils.go b/utils/utils.go index dc44c8c9..b3a67d5f 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "errors" "fmt" "io" "math/big" @@ -976,81 +975,31 @@ func IsBase64(s string) bool { // GetTokenRateFromQueue gets the rate of a token from the priority queue for the given side (buy/sell). // Bucket keys are side-suffixed: bucket_{currency}_{min}_{max}_{side}. Scanning without side would mix buy and sell rates. func GetTokenRateFromQueue(tokenSymbol string, orderAmount decimal.Decimal, fiatCurrency string, marketRate decimal.Decimal, side RateSide) (decimal.Decimal, error) { - ctx := context.Background() - - // Scan for side-specific bucket keys: bucket_{currency}_{min}_{max}_{side} - keys, _, err := storage.RedisClient.Scan(ctx, uint64(0), "bucket_"+fiatCurrency+"_*_*_"+string(side), 100).Result() + tokenEntities, err := storage.Client.Token.Query().Where(tokenEnt.SymbolEQ(tokenSymbol)).All(context.Background()) if err != nil { return decimal.Decimal{}, err } - - rateResponse := marketRate - highestMaxAmount := decimal.NewFromInt(0) - - // Scan through the buckets to find a suitable rate - for _, key := range keys { - bd, err := parseBucketKey(key) - if err != nil { + if len(tokenEntities) == 0 { + return decimal.Decimal{}, fmt.Errorf("token not found") + } + currency, err := storage.Client.FiatCurrency.Query().Where(fiatcurrency.CodeEQ(fiatCurrency)).Only(context.Background()) + if err != nil { + return decimal.Decimal{}, err + } + bestRate := decimal.Zero + for _, tokenEntity := range tokenEntities { + result, rateErr := validateBucketRate(context.Background(), tokenEntity, currency, orderAmount, "", side) + if rateErr != nil { continue } - minAmount, maxAmount := bd.MinAmount, bd.MaxAmount - - for index := 0; ; index++ { - // Get the topmost provider in the priority queue of the bucket - providerData, err := storage.RedisClient.LIndex(ctx, key, int64(index)).Result() - if err != nil { - break - } - parts := strings.Split(providerData, ":") - if len(parts) != 6 { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "ProviderData": providerData, - "Token": tokenSymbol, - "Currency": fiatCurrency, - "MinAmount": minAmount, - "MaxAmount": maxAmount, - }).Errorf("GetTokenRate.InvalidProviderData: %v", providerData) - continue - } - - // Skip entry if token doesn't match - if parts[1] != tokenSymbol { - continue - } - - // Skip entry if order amount is not within provider's min and max order amount - minOrderAmount, err := decimal.NewFromString(parts[4]) - if err != nil { - continue - } - - maxOrderAmount, err := decimal.NewFromString(parts[5]) - if err != nil { - continue - } - - if orderAmount.LessThan(minOrderAmount) || orderAmount.GreaterThan(maxOrderAmount) { - continue - } - - // Get fiat equivalent of the token amount - rate, _ := decimal.NewFromString(parts[3]) - fiatAmount := orderAmount.Mul(rate) - - // Check if fiat amount is within the bucket range and set the rate - if fiatAmount.GreaterThanOrEqual(minAmount) && fiatAmount.LessThanOrEqual(maxAmount) { - rateResponse = rate - break - } else if maxAmount.GreaterThan(highestMaxAmount) { - // Get the highest max amount - highestMaxAmount = maxAmount - rateResponse = rate - } + if bestRate.IsZero() || result.Rate.GreaterThan(bestRate) { + bestRate = result.Rate } } - - return rateResponse, nil + if bestRate.IsZero() { + return decimal.Decimal{}, fmt.Errorf("no provider available for this token/currency pair") + } + return bestRate, nil } // GetInstitutionByCode returns the institution for a given institution code @@ -1076,6 +1025,17 @@ func GetInstitutionByCode(ctx context.Context, institutionCode string, enabledFi return institution, nil } +func GetInstitutionCurrencyCode(ctx context.Context, institutionCode string, enabledFiatCurrency bool) (string, error) { + institution, err := GetInstitutionByCode(ctx, institutionCode, enabledFiatCurrency) + if err != nil { + return "", err + } + if institution == nil || institution.Edges.FiatCurrency == nil { + return "", fmt.Errorf("institution %s has no fiat currency", institutionCode) + } + return institution.Edges.FiatCurrency.Code, nil +} + // Helper function to validate HTTPS URL func IsValidHttpsUrl(urlStr string) bool { // Check if URL starts with https:// @@ -1349,116 +1309,102 @@ func getProviderRateFromRedis(ctx context.Context, providerID, tokenSymbol, curr // validateBucketRate handles bucket-based rate validation func validateBucketRate(ctx context.Context, token *ent.Token, currency *ent.FiatCurrency, amount decimal.Decimal, networkIdentifier string, side RateSide) (RateValidationResult, error) { - // Get redis keys for provision buckets for the specific side - // Scan for side-specific bucket keys: bucket_{currency}_{min}_{max}_{side} - keys, _, err := storage.RedisClient.Scan(ctx, uint64(0), "bucket_"+currency.Code+"_*_*_"+string(side), 100).Result() + q := storage.Client.ProviderOrderToken.Query(). + Where( + providerordertoken.HasTokenWith(tokenEnt.IDEQ(token.ID)), + providerordertoken.HasCurrencyWith(fiatcurrency.CodeEQ(currency.Code)), + providerordertoken.SettlementAddressNEQ(""), + providerordertoken.HasProviderWith( + providerprofile.IsActive(true), + providerprofile.VisibilityModeEQ(providerprofile.VisibilityModePublic), + ), + ). + WithProvider(func(pq *ent.ProviderProfileQuery) { pq.WithProviderRating() }). + WithCurrency() + if networkIdentifier != "" { + q = q.Where(providerordertoken.NetworkEQ(networkIdentifier)) + } + orderTokens, err := q.All(ctx) if err != nil { - logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "Currency": currency.Code, - "Network": networkIdentifier, - }).Errorf("Failed to scan Redis buckets for bucket rate") return RateValidationResult{}, fmt.Errorf("internal server error") } - - // Track the best available rate and reason for logging - var bestRate decimal.Decimal - var foundExactMatch bool - var selectedProviderID string - var selectedOrderType paymentorder.OrderType - var anySkippedDueToStuck bool - var currencyForStuck string - - // Scan through the buckets to find a matching rate - for _, key := range keys { - bucketData, err := parseBucketKey(key) - if err != nil { - logger.WithFields(logger.Fields{ - "Key": key, - "Error": err, - }).Errorf("ValidateRate.InvalidBucketKey: failed to parse bucket key") - continue + var best RateValidationResult + for _, orderToken := range orderTokens { + providerID := "" + if orderToken.Edges.Provider != nil { + providerID = orderToken.Edges.Provider.ID } - - // Get all providers in this bucket to find the first suitable one (priority queue order) - providers, err := storage.RedisClient.LRange(ctx, key, 0, -1).Result() - if err != nil { - logger.WithFields(logger.Fields{ - "Key": key, - "Error": err, - }).Errorf("ValidateRate.FailedToGetProviders: failed to get providers from bucket") + if amount.LessThan(orderToken.MinOrderAmount) { continue } - - // Find the first provider at the top of the queue that matches our criteria - result := findSuitableProviderRate(ctx, providers, token, networkIdentifier, amount, bucketData, side) - if result.Found { - foundExactMatch = true - bestRate = result.Rate - selectedProviderID = result.ProviderID - selectedOrderType = result.OrderType - break // Found exact match, no need to continue + orderType := DetermineOrderType(orderToken, amount) + if orderType == paymentorder.OrderTypeRegular && amount.GreaterThan(orderToken.MaxOrderAmount) { + continue } - if result.AllSkippedDueToStuck && result.CurrencyCode != "" { - anySkippedDueToStuck = true - currencyForStuck = result.CurrencyCode + if orderType == paymentorder.OrderTypeOtc { + if orderToken.MinOrderAmountOtc.IsZero() || orderToken.MaxOrderAmountOtc.IsZero() { + continue + } + if amount.LessThan(orderToken.MinOrderAmountOtc) || amount.GreaterThan(orderToken.MaxOrderAmountOtc) { + continue + } } - - // Track the best available rate for logging purposes - if result.Rate.GreaterThan(bestRate) { - bestRate = result.Rate + if orderType == paymentorder.OrderTypeRegular && config.OrderConfig().ProviderStuckFulfillmentThreshold > 0 && providerID != "" { + stuckCount, errStuck := GetProviderStuckOrderCount(ctx, providerID) + if errStuck == nil && stuckCount >= config.OrderConfig().ProviderStuckFulfillmentThreshold { + continue + } } - } - - // If no exact match found, try fallback provider (if configured) even though it's not on the bucket queue - if !foundExactMatch { - if fallbackID := config.OrderConfig().FallbackProviderID; fallbackID != "" { - fallbackResult, fallbackErr := validateProviderRate(ctx, token, currency, amount, fallbackID, networkIdentifier, side) - if fallbackErr == nil { - // Quote the queue's best rate when amount was below/above queue providers' min/max but fallback accepts it - if bestRate.GreaterThan(decimal.Zero) { - return RateValidationResult{ - Rate: bestRate, - ProviderID: fallbackResult.ProviderID, - OrderType: fallbackResult.OrderType, - }, nil - } - return fallbackResult, nil + rate := decimal.Zero + if side == RateSideBuy { + if !orderToken.FixedBuyRate.IsZero() { + rate = orderToken.FixedBuyRate + } else if currency.MarketBuyRate.IsZero() { + continue + } else { + rate = currency.MarketBuyRate.Add(orderToken.FloatingBuyDelta).RoundBank(2) } - var errStuck *types.ErrNoProviderDueToStuck - if errors.As(fallbackErr, &errStuck) { - return RateValidationResult{}, fallbackErr + } else { + if !orderToken.FixedSellRate.IsZero() { + rate = orderToken.FixedSellRate + } else if currency.MarketSellRate.IsZero() { + continue + } else { + rate = currency.MarketSellRate.Add(orderToken.FloatingSellDelta).RoundBank(2) } } - if anySkippedDueToStuck && currencyForStuck != "" { - return RateValidationResult{}, &types.ErrNoProviderDueToStuck{CurrencyCode: currencyForStuck} + if rate.IsZero() { + continue } - logger.WithFields(logger.Fields{ - "Token": token.Symbol, - "Currency": currency.Code, - "Amount": amount, - "NetworkFilter": networkIdentifier, - "BestRate": bestRate, - }).Warnf("ValidateRate.NoSuitableProvider: no provider found for the given parameters") - - // Provide more specific error message (buy = fiat to crypto, sell = crypto to fiat) - networkMsg := networkIdentifier - if networkMsg == "" { - networkMsg = "any network" - } - from, to := token.Symbol, currency.Code + hasBalance := false if side == RateSideBuy { - from, to = currency.Code, token.Symbol + hasBalance, err = storage.Client.ProviderBalances.Query(). + Where( + providerbalances.HasProviderWith(providerprofile.IDEQ(providerID)), + providerbalances.HasTokenWith(tokenEnt.IDEQ(token.ID)), + providerbalances.AvailableBalanceGTE(amount), + providerbalances.IsAvailableEQ(true), + ).Exist(ctx) + } else { + hasBalance, err = storage.Client.ProviderBalances.Query(). + Where( + providerbalances.HasProviderWith(providerprofile.IDEQ(providerID)), + providerbalances.HasFiatCurrencyWith(fiatcurrency.CodeEQ(currency.Code)), + providerbalances.AvailableBalanceGTE(amount.Mul(rate).RoundBank(0)), + providerbalances.IsAvailableEQ(true), + ).Exist(ctx) + } + if err != nil || !hasBalance { + continue + } + if best.Rate.IsZero() || rate.GreaterThan(best.Rate) { + best = RateValidationResult{Rate: rate, ProviderID: providerID, OrderType: orderType} } - return RateValidationResult{}, fmt.Errorf("no provider available for %s to %s conversion with amount %s on %s", - from, to, amount, networkMsg) } - - return RateValidationResult{ - Rate: bestRate, - ProviderID: selectedProviderID, - OrderType: selectedOrderType, - }, nil + if best.Rate.IsZero() { + return RateValidationResult{}, fmt.Errorf("no provider available for this token/currency pair") + } + return best, nil } // parseBucketKey parses and validates bucket key format