-
Notifications
You must be signed in to change notification settings - Fork 22
Refactor provider selection to use ProviderOrderToken queries and add helper utilities #732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,9 +2,9 @@ package services | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "math/rand" | ||
| "sort" | ||
| "strings" | ||
| "time" | ||
|
|
||
|
|
@@ -340,6 +340,24 @@ func (s *PriorityQueueService) GetProviderRate(ctx context.Context, provider *en | |
| return rate, nil | ||
| } | ||
|
|
||
| func providerOrderTokenRate(orderToken *ent.ProviderOrderToken, side RateSide) decimal.Decimal { | ||
| if orderToken == nil || orderToken.Edges.Currency == nil { | ||
| return decimal.Zero | ||
| } | ||
| switch side { | ||
| case RateSideBuy: | ||
| if !orderToken.FixedBuyRate.IsZero() { | ||
| return orderToken.FixedBuyRate | ||
| } | ||
| return orderToken.Edges.Currency.MarketBuyRate.Add(orderToken.FloatingBuyDelta).RoundBank(2) | ||
| default: | ||
| if !orderToken.FixedSellRate.IsZero() { | ||
| return orderToken.FixedSellRate | ||
| } | ||
| return orderToken.Edges.Currency.MarketSellRate.Add(orderToken.FloatingSellDelta).RoundBank(2) | ||
| } | ||
| } | ||
|
|
||
| // deleteQueue deletes existing circular queue | ||
| func (s *PriorityQueueService) deleteQueue(ctx context.Context, key string) error { | ||
| _, err := storage.RedisClient.Del(ctx, key).Result() | ||
|
|
@@ -699,35 +717,8 @@ func (s *PriorityQueueService) tryUsePreSetProvider(ctx context.Context, order t | |
|
|
||
| // AssignPaymentOrder assigns payment orders to providers | ||
| func (s *PriorityQueueService) AssignPaymentOrder(ctx context.Context, order types.PaymentOrderFields) error { | ||
| orderIDPrefix := strings.Split(order.ID.String(), "-")[0] | ||
| orderConf := config.OrderConfig() | ||
|
|
||
| // Both regular and OTC orders must have a provision bucket, unless order has a pre-set private provider (private orders don't require buckets). | ||
| allowNilBucketForPrivate := false | ||
| if order.ProvisionBucket == nil { | ||
| if order.ProviderID != "" { | ||
| providerForCheck, pErr := storage.Client.ProviderProfile.Query().Where(providerprofile.IDEQ(order.ProviderID)).Only(ctx) | ||
| if pErr == nil && providerForCheck != nil && providerForCheck.VisibilityMode == providerprofile.VisibilityModePrivate { | ||
| allowNilBucketForPrivate = true | ||
| } | ||
| } | ||
| if !allowNilBucketForPrivate { | ||
| logger.WithFields(logger.Fields{ | ||
| "OrderID": order.ID.String(), | ||
| "OrderType": order.OrderType, | ||
| "Reason": "internal: Order missing provision bucket", | ||
| }).Errorf("AssignPaymentOrder.MissingProvisionBucket") | ||
| return fmt.Errorf("order %s (type: %s) is missing provision bucket", order.ID.String(), order.OrderType) | ||
| } | ||
| } else if order.ProvisionBucket.Edges.Currency == nil { | ||
| logger.WithFields(logger.Fields{ | ||
| "OrderID": order.ID.String(), | ||
| "OrderType": order.OrderType, | ||
| "Reason": "internal: Provision bucket missing currency", | ||
| }).Errorf("AssignPaymentOrder.MissingCurrency") | ||
| return fmt.Errorf("provision bucket for order %s (type: %s) is missing currency", order.ID.String(), order.OrderType) | ||
| } | ||
|
|
||
| // Defensive check: Verify order is in a valid state for assignment | ||
| // This prevents duplicate assignments from concurrent sources | ||
| currentOrder, err := storage.Client.PaymentOrder.Get(ctx, order.ID) | ||
|
|
@@ -834,47 +825,129 @@ func (s *PriorityQueueService) AssignPaymentOrder(ctx context.Context, order typ | |
| } | ||
| } | ||
|
|
||
| // Private orders with nil bucket only use the pre-set path above; do not run queue matching. | ||
| if order.ProvisionBucket == nil { | ||
| return nil | ||
| currency, err := resolveInstitutionCurrency(ctx, order.Institution) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if order.Token == nil { | ||
| return fmt.Errorf("order %s has no token", order.ID) | ||
| } | ||
| network := order.Network | ||
| if network == nil && order.Token.Edges.Network != nil { | ||
| network = order.Token.Edges.Network | ||
| } | ||
| if network == nil { | ||
| network, err = order.Token.QueryNetwork().Only(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // Use same side-suffixed key as buildQueueForSide so reads/writes use identical keys (rateSide computed above) | ||
| baseRedisKey := fmt.Sprintf("bucket_%s_%s_%s", order.ProvisionBucket.Edges.Currency.Code, order.ProvisionBucket.MinAmount, order.ProvisionBucket.MaxAmount) | ||
| redisKey := baseRedisKey + "_" + string(rateSide) | ||
|
|
||
| err = s.matchRate(ctx, redisKey, orderIDPrefix, order, excludeList) | ||
| candidates, err := storage.Client.ProviderOrderToken.Query(). | ||
| Where( | ||
| providerordertoken.NetworkEQ(network.Identifier), | ||
| providerordertoken.HasTokenWith(token.IDEQ(order.Token.ID)), | ||
| providerordertoken.HasCurrencyWith(fiatcurrency.IDEQ(currency.ID)), | ||
| providerordertoken.SettlementAddressNEQ(""), | ||
| providerordertoken.HasProviderWith( | ||
| providerprofile.IsActive(true), | ||
| providerprofile.VisibilityModeEQ(providerprofile.VisibilityModePublic), | ||
|
Comment on lines
+852
to
+854
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep the fallback provider out of the first-pass candidate set. Lines 433-435 still skip 🤖 Prompt for AI Agents |
||
| providerprofile.HasProviderBalancesWith( | ||
| providerbalances.HasFiatCurrencyWith(fiatcurrency.IDEQ(currency.ID)), | ||
|
Comment on lines
+852
to
+856
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||
| providerbalances.IsAvailableEQ(true), | ||
| providerbalances.AvailableBalanceGTE(order.Amount.Mul(order.Rate).RoundBank(0)), | ||
| ), | ||
|
Comment on lines
+855
to
+859
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use side-aware balance filters here. Lines 855-859 always require fiat balance sized as 🤖 Prompt for AI Agents |
||
| ), | ||
| ). | ||
| WithProvider(func(pq *ent.ProviderProfileQuery) { | ||
| pq.WithProviderRating() | ||
| }). | ||
| WithCurrency(). | ||
| All(ctx) | ||
| if err != nil { | ||
| prevRedisKey := redisKey + "_prev" | ||
| err = s.matchRate(ctx, prevRedisKey, orderIDPrefix, order, excludeList) | ||
| if err != nil { | ||
| // Both matchRate attempts failed; try fallback once for regular orders | ||
| matchRateErr := err | ||
| if order.OrderType != "otc" { | ||
| orderEnt, loadErr := storage.Client.PaymentOrder.Query(). | ||
| Where(paymentorder.IDEQ(order.ID)). | ||
| WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }). | ||
| Only(ctx) | ||
| if loadErr == nil { | ||
| if fallbackErr := s.TryFallbackAssignment(ctx, orderEnt); fallbackErr == nil { | ||
| return nil | ||
| } else { | ||
| logger.WithFields(logger.Fields{ | ||
| "OrderID": order.ID.String(), | ||
| "Error": fallbackErr.Error(), | ||
| }).Errorf("AssignPaymentOrder: TryFallbackAssignment failed after no provider in queue") | ||
| var errStuck *types.ErrNoProviderDueToStuck | ||
| if errors.As(fallbackErr, &errStuck) { | ||
| return fallbackErr | ||
| } | ||
| } | ||
| } | ||
| return err | ||
| } | ||
| if len(candidates) == 0 { | ||
| if order.OrderType != "otc" { | ||
| orderEnt, loadErr := storage.Client.PaymentOrder.Query().Where(paymentorder.IDEQ(order.ID)).WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }).Only(ctx) | ||
| if loadErr == nil { | ||
| return s.TryFallbackAssignment(ctx, orderEnt) | ||
| } | ||
| return fmt.Errorf("no provider matched for order: %w", matchRateErr) | ||
| } | ||
| return fmt.Errorf("no eligible provider order tokens found") | ||
| } | ||
| providerIDs := make([]string, 0, len(candidates)) | ||
| for _, candidate := range candidates { | ||
| if candidate.Edges.Provider != nil { | ||
| providerIDs = append(providerIDs, candidate.Edges.Provider.ID) | ||
| } | ||
| } | ||
| recentVolumes, _ := getRecentSuccessfulFiatVolumeByProvider(ctx, providerIDs) | ||
| sort.SliceStable(candidates, func(i, j int) bool { | ||
| leftScore := decimal.Zero | ||
| rightScore := decimal.Zero | ||
| if candidates[i].Edges.Provider != nil && candidates[i].Edges.Provider.Edges.ProviderRating != nil { | ||
| leftScore = candidates[i].Edges.Provider.Edges.ProviderRating.TrustScore | ||
| } | ||
| if candidates[j].Edges.Provider != nil && candidates[j].Edges.Provider.Edges.ProviderRating != nil { | ||
| rightScore = candidates[j].Edges.Provider.Edges.ProviderRating.TrustScore | ||
| } | ||
| if !leftScore.Equal(rightScore) { | ||
| return leftScore.GreaterThan(rightScore) | ||
| } | ||
| leftVol := recentVolumes[candidates[i].Edges.Provider.ID] | ||
| rightVol := recentVolumes[candidates[j].Edges.Provider.ID] | ||
| if !leftVol.Equal(rightVol) { | ||
| return leftVol.LessThan(rightVol) | ||
| } | ||
| return candidates[i].Edges.Provider.ID < candidates[j].Edges.Provider.ID | ||
| }) | ||
|
|
||
| return nil | ||
| for _, candidate := range candidates { | ||
| providerID := candidate.Edges.Provider.ID | ||
| excludeCount := s.countProviderInExcludeList(excludeList, providerID) | ||
| if order.OrderType == "otc" { | ||
| if excludeCount > 0 { | ||
| continue | ||
| } | ||
| } else if excludeCount >= orderConf.ProviderMaxRetryAttempts { | ||
| continue | ||
| } | ||
| if order.OrderType != "otc" && orderConf.ProviderStuckFulfillmentThreshold > 0 { | ||
| stuckCount, errStuck := utils.GetProviderStuckOrderCount(ctx, providerID) | ||
| if errStuck == nil && stuckCount >= orderConf.ProviderStuckFulfillmentThreshold { | ||
| continue | ||
| } | ||
| } | ||
| if order.Amount.LessThan(candidate.MinOrderAmount) { | ||
| continue | ||
| } | ||
| if order.Amount.GreaterThan(candidate.MaxOrderAmount) { | ||
| if candidate.MinOrderAmountOtc.IsZero() || candidate.MaxOrderAmountOtc.IsZero() { | ||
| continue | ||
| } | ||
| if order.Amount.LessThan(candidate.MinOrderAmountOtc) || order.Amount.GreaterThan(candidate.MaxOrderAmountOtc) { | ||
| continue | ||
| } | ||
| } | ||
| rate := providerOrderTokenRate(candidate, rateSide) | ||
| allowedDeviation := order.Rate.Mul(candidate.RateSlippage.Div(decimal.NewFromInt(100))) | ||
| if rate.Sub(order.Rate).Abs().GreaterThan(allowedDeviation) { | ||
| continue | ||
| } | ||
| order.ProviderID = providerID | ||
| if order.OrderType == "otc" { | ||
| return s.assignOtcOrder(ctx, order) | ||
| } | ||
| return s.sendOrderRequest(ctx, order) | ||
| } | ||
| if order.OrderType != "otc" { | ||
| orderEnt, loadErr := storage.Client.PaymentOrder.Query().Where(paymentorder.IDEQ(order.ID)).WithToken(func(tq *ent.TokenQuery) { tq.WithNetwork() }).Only(ctx) | ||
| if loadErr == nil { | ||
| return s.TryFallbackAssignment(ctx, orderEnt) | ||
| } | ||
| } | ||
| return fmt.Errorf("no provider matched for order") | ||
| } | ||
|
|
||
| // TryFallbackAssignment attempts to assign the order to the configured fallback provider using only rate and balance checks. | ||
|
|
@@ -905,9 +978,6 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order | |
| // Eagerly load ProvisionBucket+Currency so we never need a separate fallback query for them. | ||
| currentOrder, err := storage.Client.PaymentOrder.Query(). | ||
| Where(paymentorder.IDEQ(order.ID)). | ||
| WithProvisionBucket(func(pb *ent.ProvisionBucketQuery) { | ||
| pb.WithCurrency() | ||
| }). | ||
| Only(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("fallback: failed to load order: %w", err) | ||
|
|
@@ -966,7 +1036,6 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order | |
| AccountIdentifier: order.AccountIdentifier, | ||
| AccountName: order.AccountName, | ||
| ProviderID: "", | ||
| ProvisionBucket: currentOrder.Edges.ProvisionBucket, | ||
| MessageHash: order.MessageHash, | ||
| Memo: order.Memo, | ||
| UpdatedAt: order.UpdatedAt, | ||
|
|
@@ -980,39 +1049,9 @@ func (s *PriorityQueueService) TryFallbackAssignment(ctx context.Context, order | |
| return fmt.Errorf("fallback: order %s has no token", order.ID.String()) | ||
| } | ||
|
|
||
| // If order has no bucket yet, resolve one from institution currency + fiat amount. | ||
| if fields.ProvisionBucket == nil { | ||
| institution, instErr := utils.GetInstitutionByCode(ctx, order.Institution, true) | ||
| if instErr != nil { | ||
| return fmt.Errorf("fallback: cannot resolve bucket for order %s: institution lookup failed: %w", fields.ID.String(), instErr) | ||
| } | ||
| if institution.Edges.FiatCurrency == nil { | ||
| return fmt.Errorf("fallback: cannot resolve bucket for order %s: institution %s has no fiat currency", fields.ID.String(), order.Institution) | ||
| } | ||
| fiatAmount := order.Amount.Mul(order.Rate) | ||
| bucket, bErr := storage.Client.ProvisionBucket. | ||
| Query(). | ||
| Where( | ||
| provisionbucket.MaxAmountGTE(fiatAmount), | ||
| provisionbucket.MinAmountLTE(fiatAmount), | ||
| provisionbucket.HasCurrencyWith(fiatcurrency.IDEQ(institution.Edges.FiatCurrency.ID)), | ||
| ). | ||
| WithCurrency(). | ||
| Only(ctx) | ||
| if bErr != nil { | ||
| return fmt.Errorf("fallback: no matching provision bucket for order %s (fiat %s %s): %w", | ||
| fields.ID.String(), fiatAmount.String(), institution.Edges.FiatCurrency.Code, bErr) | ||
| } | ||
| fields.ProvisionBucket = bucket | ||
| // Persist so later flows (e.g. FulfillOrder) see the bucket and do not panic on nil ProvisionBucket | ||
| if _, upErr := storage.Client.PaymentOrder.UpdateOneID(fields.ID).SetProvisionBucket(bucket).Save(ctx); upErr != nil { | ||
| return fmt.Errorf("fallback: failed to set provision bucket on order %s: %w", fields.ID.String(), upErr) | ||
| } | ||
| } | ||
|
|
||
| bucketCurrency := fields.ProvisionBucket.Edges.Currency | ||
| if bucketCurrency == nil { | ||
| return fmt.Errorf("fallback: provision bucket %d missing currency", fields.ProvisionBucket.ID) | ||
| bucketCurrency, err := resolveInstitutionCurrency(ctx, order.Institution) | ||
| if err != nil { | ||
| return fmt.Errorf("fallback: resolve institution currency: %w", err) | ||
| } | ||
|
|
||
| // Skip fallback provider if at or over stuck fulfillment threshold | ||
|
|
@@ -1358,7 +1397,11 @@ func (s *PriorityQueueService) addProviderToExcludeList(ctx context.Context, ord | |
| func (s *PriorityQueueService) sendOrderRequest(ctx context.Context, order types.PaymentOrderFields) error { | ||
| orderConf := config.OrderConfig() | ||
| // Reserve balance for this order | ||
| currency := order.ProvisionBucket.Edges.Currency.Code | ||
| currencyEnt, err := resolveInstitutionCurrency(ctx, order.Institution) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| currency := currencyEnt.Code | ||
| amount := order.Amount.Mul(order.Rate).RoundBank(0) | ||
|
|
||
| // Start a transaction for the entire operation | ||
|
|
@@ -1444,7 +1487,7 @@ func (s *PriorityQueueService) sendOrderRequest(ctx context.Context, order types | |
| orderRequestData := map[string]interface{}{ | ||
| "amount": order.Amount.Mul(order.Rate).RoundBank(0).String(), | ||
| "institution": order.Institution, | ||
| "currency": order.ProvisionBucket.Edges.Currency.Code, | ||
| "currency": currency, | ||
| "providerId": order.ProviderID, | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| package services | ||
|
|
||
| import ( | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/shopspring/decimal" | ||
| ) | ||
|
|
||
| var ( | ||
| RewardFulfilledValidated = decimal.NewFromFloat(1.0) | ||
| PenaltyCancelInsufficientFunds = decimal.NewFromFloat(-1.5) | ||
| PenaltyCancelProviderFault = decimal.NewFromFloat(-1.0) | ||
| PenaltyValidationFailed = decimal.NewFromFloat(-2.0) | ||
| PenaltyOrderRequestExpired = decimal.NewFromFloat(-0.5) | ||
| RecentProcessedVolumeWindow = 24 * time.Hour | ||
| ProviderFaultCancelReasons = []string{ | ||
| "out of stock", | ||
| "declined", | ||
| "rate expired", | ||
| "unable to fulfill", | ||
| "capacity limit", | ||
| } | ||
| ) | ||
|
|
||
| func isProviderFaultCancelReason(reason string) bool { | ||
| normalized := strings.TrimSpace(strings.ToLower(reason)) | ||
| for _, allowed := range ProviderFaultCancelReasons { | ||
| if normalized == allowed { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the market-rate guard in this helper.
GetProviderRateandtokenRateForBucketboth refuse to derive floating rates when the market rate is zero. This copy doesn't, so a missing market rate turnsFloating*Deltainto the whole quote and can even produce negative rates during candidate matching.Suggested fix
func providerOrderTokenRate(orderToken *ent.ProviderOrderToken, side RateSide) decimal.Decimal { if orderToken == nil || orderToken.Edges.Currency == nil { return decimal.Zero } switch side { case RateSideBuy: if !orderToken.FixedBuyRate.IsZero() { return orderToken.FixedBuyRate } + if orderToken.Edges.Currency.MarketBuyRate.IsZero() { + return decimal.Zero + } return orderToken.Edges.Currency.MarketBuyRate.Add(orderToken.FloatingBuyDelta).RoundBank(2) - default: + case RateSideSell: if !orderToken.FixedSellRate.IsZero() { return orderToken.FixedSellRate } + if orderToken.Edges.Currency.MarketSellRate.IsZero() { + return decimal.Zero + } return orderToken.Edges.Currency.MarketSellRate.Add(orderToken.FloatingSellDelta).RoundBank(2) + default: + return decimal.Zero } }📝 Committable suggestion
🤖 Prompt for AI Agents