Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bidengine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
types "github.com/akash-network/akash-api/go/node/types/v1beta3"
)

// Config represents the configuration parameters for the bid engine.
// It controls pricing, deposits, timeouts and provider capabilities and attributes
type Config struct {
PricingStrategy BidPricingStrategy
Deposit sdk.Coin
Expand Down
74 changes: 49 additions & 25 deletions bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,36 @@
"github.com/akash-network/provider/session"
)

// order manages bidding and general lifecycle handling of an order.
// order manages bidding and general lifecycle handling of an order. The lifecycle includes:
type order struct {
// orderID is the unique identifier for this order from the blockchain
orderID mtypes.OrderID
cfg Config

session session.Session
cluster cluster.Cluster
bus pubsub.Bus
sub pubsub.Subscriber
// cfg holds configuraiton parameters for bid engine (pricing, deposits, timeouts etc).

Check failure on line 35 in bidengine/order.go

View workflow job for this annotation

GitHub Actions / lint

`configuraiton` is a misspelling of `configuration` (misspell)
cfg Config

// session provides blockchain client and provider info.
session session.Session

// cluster interface to the provider's compute cluster for resource management.
cluster cluster.Cluster

// bus is the event bus for publishing lease events.
bus pubsub.Bus

// sub is the subscriber for receiving blockchain events.
sub pubsub.Subscriber

// reservationFulfilledNotify is the channel to notify when resources are reserved.
reservationFulfilledNotify chan<- int

log log.Logger
lc lifecycle.Lifecycle
// log is the logger instance
log log.Logger

// lc contains the lifecycle management for graceful startup/shutdown
lc lifecycle.Lifecycle

// pass is the service for validating provider attributes and signatures
pass ProviderAttrSignatureService
}

Expand Down Expand Up @@ -156,15 +173,22 @@
ctx, cancel := context.WithCancel(context.Background())

var (
// channels for async operations.
groupch <-chan runner.Result
// Channel for receiving group details query result.
groupch <-chan runner.Result
// Channel for storing group details result while checking existing bid.
storedGroupCh <-chan runner.Result
clusterch <-chan runner.Result
bidch <-chan runner.Result
pricech <-chan runner.Result
queryBidCh <-chan runner.Result
shouldBidCh <-chan runner.Result
bidTimeout <-chan time.Time
// Channel for receiving cluster reservation result.
clusterch <-chan runner.Result
// Channel for receiving bid creation transaction result.
bidch <-chan runner.Result
// Channel for receiving bid price calculation result.
pricech <-chan runner.Result
// Channel for receiving existing bid query result.
queryBidCh <-chan runner.Result
// Channel for receiving result of bid eligibility check.
shouldBidCh <-chan runner.Result
// Channel that triggers when bid timeout is reached.
bidTimeout <-chan time.Time

group *dtypes.Group
reservation ctypes.Reservation
Expand Down Expand Up @@ -209,7 +233,7 @@
if matchBidNotFound.MatchString(err.Error()) {
bidFound = false
} else {
o.session.Log().Error("could not get existing bid", "err", err, "errtype", fmt.Sprintf("%T", err))
o.session.Log().Error("could not get existing bid", "error", err, "errtype", fmt.Sprintf("%T", err))
break loop
}
}
Expand Down Expand Up @@ -307,7 +331,7 @@
o.log.Info("group fetched")

if result.Error() != nil {
o.log.Error("fetching group", "err", result.Error())
o.log.Error("fetching group", "error", result.Error())
break loop
}

Expand All @@ -323,7 +347,7 @@

if result.Error() != nil {
shouldBidCounter.WithLabelValues(metricsutils.FailLabel).Inc()
o.log.Error("failure during checking should bid", "err", result.Error())
o.log.Error("failure during checking should bid", "error", result.Error())
break loop
}

Expand All @@ -346,7 +370,7 @@

if result.Error() != nil {
reservationCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel)
o.log.Error("reserving resources", "err", result.Error())
o.log.Error("reserving resources", "error", result.Error())
break loop
}

Expand Down Expand Up @@ -382,7 +406,7 @@
case result := <-pricech:
pricech = nil
if result.Error() != nil {
o.log.Error("error calculating price", "err", result.Error())
o.log.Error("error calculating price", "error", result.Error())
break loop
}

Expand Down Expand Up @@ -413,7 +437,7 @@
bidch = nil
if result.Error() != nil {
bidCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel).Inc()
o.log.Error("bid failed", "err", result.Error())
o.log.Error("bid failed", "error", result.Error())
break loop
}

Expand Down Expand Up @@ -448,7 +472,7 @@
if reservation != nil {
o.log.Debug("unreserving reservation")
if err := o.cluster.Unreserve(reservation.OrderID()); err != nil {
o.log.Error("error unreserving reservation", "err", err)
o.log.Error("error unreserving reservation", "error", err)
reservationCounter.WithLabelValues("close", metricsutils.FailLabel)
} else {
reservationCounter.WithLabelValues("close", metricsutils.SuccessLabel)
Expand All @@ -464,7 +488,7 @@

_, err := o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
if err != nil {
o.log.Error("closing bid", "err", err)
o.log.Error("closing bid", "error", err)
bidCounter.WithLabelValues("close", metricsutils.FailLabel).Inc()
} else {
o.log.Info("bid closed", "order-id", o.orderID)
Expand Down Expand Up @@ -556,7 +580,7 @@

if err := group.GroupSpec.ValidateBasic(); err != nil {
o.log.Error("unable to fulfill: group validation error",
"err", err)
"error", err)
return false, nil
}
return true, nil
Expand Down
32 changes: 23 additions & 9 deletions bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,38 @@ func NewService(
return s, nil
}

// service manages the lifecycle and coordination of order processing and bid creation.
type service struct {
// session provides blockchain client and provider info.
session session.Session
// cluster interface to the provider's compute cluster for resource management.
cluster cluster.Cluster
cfg Config
// cfg holds configuration parameters for bid engine (pricing, deposits, timeouts etc).
cfg Config

// bus is the event bus for publishing lease events.
bus pubsub.Bus
// sub is the subscriber for receiving blockchain events.
sub pubsub.Subscriber

// statusch receives requests for bid engine status.
statusch chan chan<- *apclient.BidEngineStatus
orders map[string]*order
drainch chan *order
// orders tracks all active order processing.
orders map[string]*order
// drainch receives completed orders for cleanup.
drainch chan *order
// ordersch receives new orders to process from the blockchain.
ordersch chan []mtypes.OrderID

group *errgroup.Group
group *errgroup.Group
// cancel holds the cancel function of the service context.
cancel context.CancelFunc
lc lifecycle.Lifecycle
pass *providerAttrSignatureService
// lc manages graceful startup/shutdown.
lc lifecycle.Lifecycle
// pass validates provider attributes and signatures.
pass *providerAttrSignatureService

// waiter coordinates operator startup dependencies.
waiter waiter.OperatorWaiter
}

Expand Down Expand Up @@ -247,7 +261,7 @@ loop:
for {
select {
case shutdownErr := <-s.lc.ShutdownRequest():
s.session.Log().Debug("received shutdown request", "err", shutdownErr)
s.session.Log().Debug("received shutdown request", "error", shutdownErr)
s.lc.ShutdownInitiated(nil)
break loop
case orders := <-s.ordersch:
Expand All @@ -256,7 +270,7 @@ loop:
s.session.Log().Debug("creating catchup order", "order", key)
order, err := newOrder(s, orderID, s.cfg, s.pass, true)
if err != nil {
s.session.Log().Error("creating catchup order", "order", key, "err", err)
s.session.Log().Error("creating catchup order", "order", key, "error", err)
continue
}
s.orders[key] = order
Expand All @@ -278,7 +292,7 @@ loop:
// create an order object for managing the bid process and order lifecycle
order, err := newOrder(s, ev.ID, s.cfg, s.pass, false)
if err != nil {
s.session.Log().Error("handling order", "order", key, "err", err)
s.session.Log().Error("handling order", "order", key, "error", err)
break
}

Expand Down
5 changes: 2 additions & 3 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ func (is *inventoryService) handleRequest(req inventoryRequest, state *inventory
state.reservations = append(state.reservations, reservation)
req.ch <- inventoryResponse{value: reservation}
inventoryRequestsCounter.WithLabelValues("reserve", "create").Inc()

}

func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservation) {
Expand Down Expand Up @@ -519,7 +518,7 @@ loop:
for {
select {
case err := <-is.lc.ShutdownRequest():
is.log.Debug("received shutdown request", "err", err)
is.log.Debug("received shutdown request", "error", err)
is.lc.ShutdownInitiated(err)
break loop
case ev := <-is.sub.Events():
Expand Down Expand Up @@ -687,7 +686,7 @@ loop:
}
}
} else {
is.log.Error("checking IP addresses", "err", err)
is.log.Error("checking IP addresses", "error", err)
}

resumeProcessingReservations()
Expand Down
Loading