Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
* [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode)
* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
* [#3781](https://github.com/livepeer/go-livepeer/pull/3781) worker/docker: Destroy containers from watch routines (@victorges)
* [#3727](https://github.com/livepeer/go-livepeer/pull/3727) BYOC: add streaming for BYOC pipelines using trickle (@ad-astra-video)

#### CLI
1 change: 1 addition & 0 deletions common/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func IgnoreRoutines() []goleak.Option {
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup",
"internal/synctest.Run",
"testing/synctest.testingSynctestTest",
"github.com/livepeer/go-livepeer/server.startTrickleSubscribe.func2",
}
ignoreAnywhereFuncs := []string{
// glog’s file flusher often has syscall/os.* on top
Expand Down
48 changes: 45 additions & 3 deletions core/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ func (b *Balance) Balance() *big.Rat {

// AddressBalances holds credit balances for ETH addresses
type AddressBalances struct {
balances map[ethcommon.Address]*Balances
mtx sync.Mutex
ttl time.Duration
balances map[ethcommon.Address]*Balances
mtx sync.Mutex
sharedBalMtx sync.Mutex
ttl time.Duration
}

// NewAddressBalances creates a new AddressBalances instance
Expand Down Expand Up @@ -99,6 +100,47 @@ func (a *AddressBalances) Balance(addr ethcommon.Address, id ManifestID) *big.Ra
return a.balancesForAddr(addr).Balance(id)
}

// compares expected balance with current balance and updates accordingly with the expected balance being the target
// returns the difference and if minimum balance was covered
// also returns if balance was reset to zero because expected was zero
func (a *AddressBalances) CompareAndUpdateBalance(addr ethcommon.Address, id ManifestID, expected *big.Rat, minimumBal *big.Rat) (*big.Rat, *big.Rat, bool, bool) {
a.sharedBalMtx.Lock()
defer a.sharedBalMtx.Unlock()
current := a.balancesForAddr(addr).Balance(id)
if current == nil {
//create a balance of 1 to start tracking
a.Debit(addr, id, big.NewRat(0, 1))
current = a.balancesForAddr(addr).Balance(id)
}
if expected == nil {
expected = big.NewRat(0, 1)
}
diff := new(big.Rat).Sub(expected, current)

if diff.Sign() > 0 {
a.Credit(addr, id, diff)
} else {
a.Debit(addr, id, new(big.Rat).Abs(diff))
}

var resetToZero bool
if expected.Sign() == 0 {
a.Debit(addr, id, current)

resetToZero = true
}

//get updated balance after changes
current = a.balancesForAddr(addr).Balance(id)

var minimumBalCovered bool
if current.Cmp(minimumBal) >= 0 {
minimumBalCovered = true
}

return current, diff, minimumBalCovered, resetToZero
}

// StopCleanup stops the cleanup loop for all balances
func (a *AddressBalances) StopCleanup() {
a.mtx.Lock()
Expand Down
94 changes: 94 additions & 0 deletions core/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,97 @@ func TestBalancesCleanup(t *testing.T) {
// Now balance for mid1 should be cleaned as well
assert.Nil(b.Balance(mid1))
}

func TestAddressBalances_CompareAndUpdateBalance(t *testing.T) {
addr := ethcommon.BytesToAddress([]byte("foo"))
mid := ManifestID("some manifestID")
balances := NewAddressBalances(1 * time.Minute)
defer balances.StopCleanup()

assert := assert.New(t)

// Test 1: Balance doesn't exist - should initialize to 1 and then update to expected
expected := big.NewRat(10, 1)
minimumBal := big.NewRat(5, 1)
current, diff, minimumBalCovered, resetToZero := balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be expected - initial (10 - 1)")
assert.True(minimumBalCovered, "Minimum balance should be covered when going from 1 to 10")
assert.False(resetToZero, "Should not be reset to zero")

// Test 2: Expected > Current (Credit scenario)
expected = big.NewRat(20, 1)
minimumBal = big.NewRat(15, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be 20 - 10 = 10")
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing threshold")
assert.False(resetToZero, "Should not be reset to zero")

// Test 3: Expected < Current (Debit scenario)
expected = big.NewRat(5, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(-15, 1).Cmp(diff), "Diff should be 5 - 20 = -15")
assert.True(minimumBalCovered, "Minimum balance should still be covered")
assert.False(resetToZero, "Should not be reset to zero")

// Test 4: Expected == Current (No change)
expected = big.NewRat(5, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should remain the same")
assert.Zero(big.NewRat(0, 1).Cmp(diff), "Diff should be 0")
assert.True(minimumBalCovered, "Minimum balance should still be covered")
assert.False(resetToZero, "Should not be reset to zero")

// Test 5: Reset to zero (current > 0, expected = 0)
balances.Credit(addr, mid, big.NewRat(5, 1)) // Set current to 10
expected = big.NewRat(0, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be reset to zero")
assert.Zero(big.NewRat(-10, 1).Cmp(diff), "Diff should be 0 - 10 = -10")
assert.False(minimumBalCovered, "Minimum balance should not be covered when resetting to zero")
assert.True(resetToZero, "Should be marked as reset to zero")

// Test 6: Minimum balance covered threshold - just below to just above
expected = big.NewRat(2, 1)
minimumBal = big.NewRat(5, 1)
balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal) // Set to 2

expected = big.NewRat(5, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to 5")
assert.Zero(big.NewRat(3, 1).Cmp(diff), "Diff should be 5 - 2 = 3")
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing from below to at threshold")
assert.False(resetToZero, "Should not be reset to zero")

// Test 7: Minimum balance not covered - already above threshold
expected = big.NewRat(10, 1)
minimumBal = big.NewRat(5, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to 10")
assert.Zero(big.NewRat(5, 1).Cmp(diff), "Diff should be 10 - 5 = 5")
assert.True(minimumBalCovered, "Minimum balance should still be covered")
assert.False(resetToZero, "Should not be reset to zero")

// Test 8: Negative balance handling
balances.Debit(addr, mid, big.NewRat(20, 1)) // Force negative: 10 - 20 = -10
expected = big.NewRat(5, 1)
minimumBal = big.NewRat(3, 1)
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)

assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
assert.Zero(big.NewRat(15, 1).Cmp(diff), "Diff should be 5 - (-10) = 15")
assert.True(minimumBalCovered, "Minimum balance should be covered when going from negative to positive above minimum")
assert.False(resetToZero, "Should not be reset to zero")
}
14 changes: 10 additions & 4 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,8 +1163,8 @@ func (orch *orchestrator) CheckExternalCapabilityCapacity(extCapability string)
func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string) error {
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
if ok {
cap.mu.Lock()
defer cap.mu.Unlock()
cap.Mu.Lock()
defer cap.Mu.Unlock()

cap.Load++
return nil
Expand All @@ -1176,8 +1176,8 @@ func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string
func (orch *orchestrator) FreeExternalCapabilityCapacity(extCapability string) error {
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
if ok {
cap.mu.Lock()
defer cap.mu.Unlock()
cap.Mu.Lock()
defer cap.Mu.Unlock()

cap.Load--
return nil
Expand All @@ -1200,6 +1200,12 @@ func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability s
return nil, err
}

//ensure price numerator and denominator can be int64
jobPrice, err = common.PriceToInt64(jobPrice)
if err != nil {
return nil, fmt.Errorf("invalid job price: %w", err)
}

return &net.PriceInfo{
PricePerUnit: jobPrice.Num().Int64(),
PixelsPerUnit: jobPrice.Denom().Int64(),
Expand Down
Loading
Loading