Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bidengine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
types "github.com/akash-network/akash-api/go/node/types/v1beta3"
)

// Config represents the configuration parameters for the bid engine.
// It controls pricing, deposits, timeouts and provider capabilities and attributes
type Config struct {
PricingStrategy BidPricingStrategy
Deposit sdk.Coin
Expand Down
74 changes: 49 additions & 25 deletions bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,36 @@ import (
"github.com/akash-network/provider/session"
)

// order manages bidding and general lifecycle handling of an order.
// order manages bidding and general lifecycle handling of an order. The lifecycle includes:
type order struct {
// orderID is the unique identifier for this order from the blockchain
orderID mtypes.OrderID
cfg Config

session session.Session
cluster cluster.Cluster
bus pubsub.Bus
sub pubsub.Subscriber
// cfg holds configuration parameters for bid engine (pricing, deposits, timeouts etc).
cfg Config

// session provides blockchain client and provider info.
session session.Session

// cluster interface to the provider's compute cluster for resource management.
cluster cluster.Cluster

// bus is the event bus for publishing lease events.
bus pubsub.Bus

// sub is the subscriber for receiving blockchain events.
sub pubsub.Subscriber

// reservationFulfilledNotify is the channel to notify when resources are reserved.
reservationFulfilledNotify chan<- int

log log.Logger
lc lifecycle.Lifecycle
// log is the logger instance
log log.Logger

// lc contains the lifecycle management for graceful startup/shutdown
lc lifecycle.Lifecycle

// pass is the service for validating provider attributes and signatures
pass ProviderAttrSignatureService
}

Expand Down Expand Up @@ -156,15 +173,22 @@ func (o *order) run(checkForExistingBid bool) {
ctx, cancel := context.WithCancel(context.Background())

var (
// channels for async operations.
groupch <-chan runner.Result
// Channel for receiving group details query result.
groupch <-chan runner.Result
// Channel for storing group details result while checking existing bid.
storedGroupCh <-chan runner.Result
clusterch <-chan runner.Result
bidch <-chan runner.Result
pricech <-chan runner.Result
queryBidCh <-chan runner.Result
shouldBidCh <-chan runner.Result
bidTimeout <-chan time.Time
// Channel for receiving cluster reservation result.
clusterch <-chan runner.Result
// Channel for receiving bid creation transaction result.
bidch <-chan runner.Result
// Channel for receiving bid price calculation result.
pricech <-chan runner.Result
// Channel for receiving existing bid query result.
queryBidCh <-chan runner.Result
// Channel for receiving result of bid eligibility check.
shouldBidCh <-chan runner.Result
// Channel that triggers when bid timeout is reached.
bidTimeout <-chan time.Time

group *dtypes.Group
reservation ctypes.Reservation
Expand Down Expand Up @@ -209,7 +233,7 @@ loop:
if matchBidNotFound.MatchString(err.Error()) {
bidFound = false
} else {
o.session.Log().Error("could not get existing bid", "err", err, "errtype", fmt.Sprintf("%T", err))
o.session.Log().Error("could not get existing bid", "error", err, "errtype", fmt.Sprintf("%T", err))
break loop
}
}
Expand Down Expand Up @@ -307,7 +331,7 @@ loop:
o.log.Info("group fetched")

if result.Error() != nil {
o.log.Error("fetching group", "err", result.Error())
o.log.Error("fetching group", "error", result.Error())
break loop
}

Expand All @@ -323,7 +347,7 @@ loop:

if result.Error() != nil {
shouldBidCounter.WithLabelValues(metricsutils.FailLabel).Inc()
o.log.Error("failure during checking should bid", "err", result.Error())
o.log.Error("failure during checking should bid", "error", result.Error())
break loop
}

Expand All @@ -346,7 +370,7 @@ loop:

if result.Error() != nil {
reservationCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel)
o.log.Error("reserving resources", "err", result.Error())
o.log.Error("reserving resources", "error", result.Error())
break loop
}

Expand Down Expand Up @@ -382,7 +406,7 @@ loop:
case result := <-pricech:
pricech = nil
if result.Error() != nil {
o.log.Error("error calculating price", "err", result.Error())
o.log.Error("error calculating price", "error", result.Error())
break loop
}

Expand Down Expand Up @@ -413,7 +437,7 @@ loop:
bidch = nil
if result.Error() != nil {
bidCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel).Inc()
o.log.Error("bid failed", "err", result.Error())
o.log.Error("bid failed", "error", result.Error())
break loop
}

Expand Down Expand Up @@ -448,7 +472,7 @@ loop:
if reservation != nil {
o.log.Debug("unreserving reservation")
if err := o.cluster.Unreserve(reservation.OrderID()); err != nil {
o.log.Error("error unreserving reservation", "err", err)
o.log.Error("error unreserving reservation", "error", err)
reservationCounter.WithLabelValues("close", metricsutils.FailLabel)
} else {
reservationCounter.WithLabelValues("close", metricsutils.SuccessLabel)
Expand All @@ -464,7 +488,7 @@ loop:

_, err := o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
if err != nil {
o.log.Error("closing bid", "err", err)
o.log.Error("closing bid", "error", err)
bidCounter.WithLabelValues("close", metricsutils.FailLabel).Inc()
} else {
o.log.Info("bid closed", "order-id", o.orderID)
Expand Down Expand Up @@ -556,7 +580,7 @@ func (o *order) shouldBid(group *dtypes.Group) (bool, error) {

if err := group.GroupSpec.ValidateBasic(); err != nil {
o.log.Error("unable to fulfill: group validation error",
"err", err)
"error", err)
return false, nil
}
return true, nil
Expand Down
32 changes: 23 additions & 9 deletions bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,38 @@ func NewService(
return s, nil
}

// service manages the lifecycle and coordination of order processing and bid creation.
type service struct {
// session provides blockchain client and provider info.
session session.Session
// cluster interface to the provider's compute cluster for resource management.
cluster cluster.Cluster
cfg Config
// cfg holds configuration parameters for bid engine (pricing, deposits, timeouts etc).
cfg Config

// bus is the event bus for publishing lease events.
bus pubsub.Bus
// sub is the subscriber for receiving blockchain events.
sub pubsub.Subscriber

// statusch receives requests for bid engine status.
statusch chan chan<- *apclient.BidEngineStatus
orders map[string]*order
drainch chan *order
// orders tracks all active order processing.
orders map[string]*order
// drainch receives completed orders for cleanup.
drainch chan *order
// ordersch receives new orders to process from the blockchain.
ordersch chan []mtypes.OrderID

group *errgroup.Group
group *errgroup.Group
// cancel holds the cancel function of the service context.
cancel context.CancelFunc
lc lifecycle.Lifecycle
pass *providerAttrSignatureService
// lc manages graceful startup/shutdown.
lc lifecycle.Lifecycle
// pass validates provider attributes and signatures.
pass *providerAttrSignatureService

// waiter coordinates operator startup dependencies.
waiter waiter.OperatorWaiter
}

Expand Down Expand Up @@ -247,7 +261,7 @@ loop:
for {
select {
case shutdownErr := <-s.lc.ShutdownRequest():
s.session.Log().Debug("received shutdown request", "err", shutdownErr)
s.session.Log().Debug("received shutdown request", "error", shutdownErr)
s.lc.ShutdownInitiated(nil)
break loop
case orders := <-s.ordersch:
Expand All @@ -256,7 +270,7 @@ loop:
s.session.Log().Debug("creating catchup order", "order", key)
order, err := newOrder(s, orderID, s.cfg, s.pass, true)
if err != nil {
s.session.Log().Error("creating catchup order", "order", key, "err", err)
s.session.Log().Error("creating catchup order", "order", key, "error", err)
continue
}
s.orders[key] = order
Expand All @@ -278,7 +292,7 @@ loop:
// create an order object for managing the bid process and order lifecycle
order, err := newOrder(s, ev.ID, s.cfg, s.pass, false)
if err != nil {
s.session.Log().Error("handling order", "order", key, "err", err)
s.session.Log().Error("handling order", "order", key, "error", err)
break
}

Expand Down
5 changes: 2 additions & 3 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ func (is *inventoryService) handleRequest(req inventoryRequest, state *inventory
state.reservations = append(state.reservations, reservation)
req.ch <- inventoryResponse{value: reservation}
inventoryRequestsCounter.WithLabelValues("reserve", "create").Inc()

}

func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservation) {
Expand Down Expand Up @@ -519,7 +518,7 @@ loop:
for {
select {
case err := <-is.lc.ShutdownRequest():
is.log.Debug("received shutdown request", "err", err)
is.log.Debug("received shutdown request", "error", err)
is.lc.ShutdownInitiated(err)
break loop
case ev := <-is.sub.Events():
Expand Down Expand Up @@ -687,7 +686,7 @@ loop:
}
}
} else {
is.log.Error("checking IP addresses", "err", err)
is.log.Error("checking IP addresses", "error", err)
}

resumeProcessingReservations()
Expand Down
Loading