Skip to content

Commit daf4126

Browse files
authored
Revert "BYOC: add streaming" (#3804)
This commit reverts #3804 because it was merged with a failing test and without review from members of the other core teams. The plan is to resubmit it for proper review and cross-team discussion.
1 parent 88807fa commit daf4126

15 files changed

+292
-4508
lines changed

CHANGELOG_PENDING.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,5 @@
2323
* [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode)
2424
* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
2525
* [#3781](https://github.com/livepeer/go-livepeer/pull/3781) worker/docker: Destroy containers from watch routines (@victorges)
26-
* [#3727](https://github.com/livepeer/go-livepeer/pull/3727) BYOC: add streaming for BYOC pipelines using trickle (@ad-astra-video)
2726

2827
#### CLI

common/testutil.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,6 @@ func (s *StubServerStream) Send(n *net.NotifySegment) error {
8282
func IgnoreRoutines() []goleak.Option {
8383
// goleak works by making list of all running goroutines and reporting error if it finds any
8484
// this list tells goleak to ignore these goroutines - we're not interested in these particular goroutines
85-
// following added for job_stream_tests, believe related to open connections on trickle server that are cleaned up periodically
86-
// net/http.(*persistConn).mapRoundTripError
87-
// net/http.(*persistConn).readLoop
88-
// net/http.(*persistConn).writeLoop
89-
// io.(*pipe).read
9085
funcs2ignore := []string{"github.com/golang/glog.(*loggingT).flushDaemon", "go.opencensus.io/stats/view.(*worker).start",
9186
"github.com/rjeczalik/notify.(*recursiveTree).dispatch", "github.com/rjeczalik/notify._Cfunc_CFRunLoopRun", "github.com/ethereum/go-ethereum/metrics.(*meterArbiter).tick",
9287
"github.com/ethereum/go-ethereum/consensus/ethash.(*Ethash).remote", "github.com/ethereum/go-ethereum/core.(*txSenderCacher).cache",
@@ -98,12 +93,6 @@ func IgnoreRoutines() []goleak.Option {
9893
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup",
9994
"internal/synctest.Run",
10095
"testing/synctest.testingSynctestTest",
101-
"github.com/livepeer/go-livepeer/server.startTrickleSubscribe.func2",
102-
"net/http.(*persistConn).mapRoundTripError",
103-
"net/http.(*persistConn).readLoop",
104-
"net/http.(*persistConn).writeLoop",
105-
"io.(*pipe).read",
106-
"github.com/livepeer/go-livepeer/media.gatherIncomingTracks",
10796
}
10897
ignoreAnywhereFuncs := []string{
10998
// glog’s file flusher often has syscall/os.* on top
@@ -115,6 +104,7 @@ func IgnoreRoutines() []goleak.Option {
115104
res = append(res, goleak.IgnoreTopFunction(f))
116105
}
117106
for _, f := range ignoreAnywhereFuncs {
107+
// ignore if these function signatures appear anywhere in the call stack
118108
res = append(res, goleak.IgnoreAnyFunction(f))
119109
}
120110
return res

core/accounting.go

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,9 @@ func (b *Balance) Balance() *big.Rat {
6666

6767
// AddressBalances holds credit balances for ETH addresses
6868
type AddressBalances struct {
69-
balances map[ethcommon.Address]*Balances
70-
mtx sync.Mutex
71-
sharedBalMtx sync.Mutex
72-
ttl time.Duration
69+
balances map[ethcommon.Address]*Balances
70+
mtx sync.Mutex
71+
ttl time.Duration
7372
}
7473

7574
// NewAddressBalances creates a new AddressBalances instance
@@ -100,47 +99,6 @@ func (a *AddressBalances) Balance(addr ethcommon.Address, id ManifestID) *big.Ra
10099
return a.balancesForAddr(addr).Balance(id)
101100
}
102101

103-
// compares expected balance with current balance and updates accordingly with the expected balance being the target
104-
// returns the difference and if minimum balance was covered
105-
// also returns if balance was reset to zero because expected was zero
106-
func (a *AddressBalances) CompareAndUpdateBalance(addr ethcommon.Address, id ManifestID, expected *big.Rat, minimumBal *big.Rat) (*big.Rat, *big.Rat, bool, bool) {
107-
a.sharedBalMtx.Lock()
108-
defer a.sharedBalMtx.Unlock()
109-
current := a.balancesForAddr(addr).Balance(id)
110-
if current == nil {
111-
//create a balance of 1 to start tracking
112-
a.Debit(addr, id, big.NewRat(0, 1))
113-
current = a.balancesForAddr(addr).Balance(id)
114-
}
115-
if expected == nil {
116-
expected = big.NewRat(0, 1)
117-
}
118-
diff := new(big.Rat).Sub(expected, current)
119-
120-
if diff.Sign() > 0 {
121-
a.Credit(addr, id, diff)
122-
} else {
123-
a.Debit(addr, id, new(big.Rat).Abs(diff))
124-
}
125-
126-
var resetToZero bool
127-
if expected.Sign() == 0 {
128-
a.Debit(addr, id, current)
129-
130-
resetToZero = true
131-
}
132-
133-
//get updated balance after changes
134-
current = a.balancesForAddr(addr).Balance(id)
135-
136-
var minimumBalCovered bool
137-
if current.Cmp(minimumBal) >= 0 {
138-
minimumBalCovered = true
139-
}
140-
141-
return current, diff, minimumBalCovered, resetToZero
142-
}
143-
144102
// StopCleanup stops the cleanup loop for all balances
145103
func (a *AddressBalances) StopCleanup() {
146104
a.mtx.Lock()

core/accounting_test.go

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -265,97 +265,3 @@ func TestBalancesCleanup(t *testing.T) {
265265
// Now balance for mid1 should be cleaned as well
266266
assert.Nil(b.Balance(mid1))
267267
}
268-
269-
func TestAddressBalances_CompareAndUpdateBalance(t *testing.T) {
270-
addr := ethcommon.BytesToAddress([]byte("foo"))
271-
mid := ManifestID("some manifestID")
272-
balances := NewAddressBalances(1 * time.Minute)
273-
defer balances.StopCleanup()
274-
275-
assert := assert.New(t)
276-
277-
// Test 1: Balance doesn't exist - should initialize to 1 and then update to expected
278-
expected := big.NewRat(10, 1)
279-
minimumBal := big.NewRat(5, 1)
280-
current, diff, minimumBalCovered, resetToZero := balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
281-
282-
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
283-
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be expected - initial (10 - 1)")
284-
assert.True(minimumBalCovered, "Minimum balance should be covered when going from 1 to 10")
285-
assert.False(resetToZero, "Should not be reset to zero")
286-
287-
// Test 2: Expected > Current (Credit scenario)
288-
expected = big.NewRat(20, 1)
289-
minimumBal = big.NewRat(15, 1)
290-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
291-
292-
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
293-
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be 20 - 10 = 10")
294-
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing threshold")
295-
assert.False(resetToZero, "Should not be reset to zero")
296-
297-
// Test 3: Expected < Current (Debit scenario)
298-
expected = big.NewRat(5, 1)
299-
minimumBal = big.NewRat(3, 1)
300-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
301-
302-
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
303-
assert.Zero(big.NewRat(-15, 1).Cmp(diff), "Diff should be 5 - 20 = -15")
304-
assert.True(minimumBalCovered, "Minimum balance should still be covered")
305-
assert.False(resetToZero, "Should not be reset to zero")
306-
307-
// Test 4: Expected == Current (No change)
308-
expected = big.NewRat(5, 1)
309-
minimumBal = big.NewRat(3, 1)
310-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
311-
312-
assert.Zero(expected.Cmp(current), "Balance should remain the same")
313-
assert.Zero(big.NewRat(0, 1).Cmp(diff), "Diff should be 0")
314-
assert.True(minimumBalCovered, "Minimum balance should still be covered")
315-
assert.False(resetToZero, "Should not be reset to zero")
316-
317-
// Test 5: Reset to zero (current > 0, expected = 0)
318-
balances.Credit(addr, mid, big.NewRat(5, 1)) // Set current to 10
319-
expected = big.NewRat(0, 1)
320-
minimumBal = big.NewRat(3, 1)
321-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
322-
323-
assert.Zero(expected.Cmp(current), "Balance should be reset to zero")
324-
assert.Zero(big.NewRat(-10, 1).Cmp(diff), "Diff should be 0 - 10 = -10")
325-
assert.False(minimumBalCovered, "Minimum balance should not be covered when resetting to zero")
326-
assert.True(resetToZero, "Should be marked as reset to zero")
327-
328-
// Test 6: Minimum balance covered threshold - just below to just above
329-
expected = big.NewRat(2, 1)
330-
minimumBal = big.NewRat(5, 1)
331-
balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal) // Set to 2
332-
333-
expected = big.NewRat(5, 1)
334-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
335-
336-
assert.Zero(expected.Cmp(current), "Balance should be updated to 5")
337-
assert.Zero(big.NewRat(3, 1).Cmp(diff), "Diff should be 5 - 2 = 3")
338-
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing from below to at threshold")
339-
assert.False(resetToZero, "Should not be reset to zero")
340-
341-
// Test 7: Minimum balance not covered - already above threshold
342-
expected = big.NewRat(10, 1)
343-
minimumBal = big.NewRat(5, 1)
344-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
345-
346-
assert.Zero(expected.Cmp(current), "Balance should be updated to 10")
347-
assert.Zero(big.NewRat(5, 1).Cmp(diff), "Diff should be 10 - 5 = 5")
348-
assert.True(minimumBalCovered, "Minimum balance should still be covered")
349-
assert.False(resetToZero, "Should not be reset to zero")
350-
351-
// Test 8: Negative balance handling
352-
balances.Debit(addr, mid, big.NewRat(20, 1)) // Force negative: 10 - 20 = -10
353-
expected = big.NewRat(5, 1)
354-
minimumBal = big.NewRat(3, 1)
355-
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
356-
357-
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
358-
assert.Zero(big.NewRat(15, 1).Cmp(diff), "Diff should be 5 - (-10) = 15")
359-
assert.True(minimumBalCovered, "Minimum balance should be covered when going from negative to positive above minimum")
360-
assert.False(resetToZero, "Should not be reset to zero")
361-
}

core/ai_orchestrator.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,8 +1163,8 @@ func (orch *orchestrator) CheckExternalCapabilityCapacity(extCapability string)
11631163
func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string) error {
11641164
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
11651165
if ok {
1166-
cap.Mu.Lock()
1167-
defer cap.Mu.Unlock()
1166+
cap.mu.Lock()
1167+
defer cap.mu.Unlock()
11681168

11691169
cap.Load++
11701170
return nil
@@ -1176,8 +1176,8 @@ func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string
11761176
func (orch *orchestrator) FreeExternalCapabilityCapacity(extCapability string) error {
11771177
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
11781178
if ok {
1179-
cap.Mu.Lock()
1180-
defer cap.Mu.Unlock()
1179+
cap.mu.Lock()
1180+
defer cap.mu.Unlock()
11811181

11821182
cap.Load--
11831183
return nil
@@ -1200,12 +1200,6 @@ func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability s
12001200
return nil, err
12011201
}
12021202

1203-
//ensure price numerator and denominator can be int64
1204-
jobPrice, err = common.PriceToInt64(jobPrice)
1205-
if err != nil {
1206-
return nil, fmt.Errorf("invalid job price: %w", err)
1207-
}
1208-
12091203
return &net.PriceInfo{
12101204
PricePerUnit: jobPrice.Num().Int64(),
12111205
PixelsPerUnit: jobPrice.Denom().Int64(),

0 commit comments

Comments
 (0)