Skip to content

Commit

Permalink
✨ [watcher/local] Handle adj events with dispute
Browse files Browse the repository at this point in the history
Signed-off-by: Manoranjith <[email protected]>
  • Loading branch information
Manoranjith committed Sep 13, 2021
1 parent b6d0e23 commit 096646e
Show file tree
Hide file tree
Showing 2 changed files with 300 additions and 14 deletions.
164 changes: 150 additions & 14 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"context"
"sync"
"time"

"github.com/pkg/errors"

Expand All @@ -25,6 +26,12 @@ import (
"perun.network/go-perun/watcher"
)

const (
// Duration for which the watcher will wait for latest transactions
// when an adjudicator event is received.
statesFromClientWaitTime = 1 * time.Millisecond
)

type (
// Watcher implements a local watcher.
Watcher struct {
Expand All @@ -43,6 +50,10 @@ type (
params *channel.Params
parent channel.ID

registeredVersion uint64
requestLatestTx chan struct{}
latestTx chan channel.Transaction

subChsAccess sync.Mutex
}
)
Expand Down Expand Up @@ -92,34 +103,159 @@ func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedSt

w.registry.updateCh(ch)

tx := channel.Transaction{
State: signedState.State,
Sigs: signedState.Sigs,
}
go handleStatesFromClient(tx, statesPubSub, ch.requestLatestTx, ch.latestTx)
go w.handleEventsFromChain(eventsFromChainSub, eventsToClientPubSub, ch)

return statesPubSub, eventsToClientPubSub, nil
}

func newCh(id, parent channel.ID, params *channel.Params) *ch {
return &ch{
id: id,
params: params,
parent: parent,
id: id,
params: params,
parent: parent,
registeredVersion: 0,
requestLatestTx: make(chan struct{}),
latestTx: make(chan channel.Transaction),
}
}

func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSubscription, eventsToClientPubSub adjudicatorPub,
thisCh *ch) {
func (ch *ch) retreiveLatestTx() channel.Transaction {
ch.requestLatestTx <- struct{}{}
return <-ch.latestTx
}

func handleStatesFromClient(currentTx channel.Transaction, statesSub statesSub, requestLatestTxn chan struct{},
latestTx chan channel.Transaction) {
var _tx channel.Transaction
var ok bool
for {
select {
case _tx, ok = <-statesSub.Next():
if !ok {
log.WithField("ID", currentTx.State.ID).Info("States sub closed by client. Shutting down handler")
return
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
case <-requestLatestTxn:
currentTx = receiveTxUntil(statesSub, time.NewTimer(statesFromClientWaitTime).C, currentTx)
latestTx <- currentTx
}
}
}

// receiveTxUntil wait for the transactions on statesPub until the timeout channel is closed
// or statesPub is closed and returns the last received transaction.
// If no transaction was received, then returns currentTx itself.
func receiveTxUntil(statesSub statesSub, timeout <-chan time.Time, currentTx channel.Transaction) channel.Transaction {
var _tx channel.Transaction
var ok bool
for {
select {
case _tx, ok = <-statesSub.Next():
if !ok {
return currentTx // states sub was closed, send the latest event.
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
case <-timeout:
return currentTx // timer expired, send the latest the event.
}
}
}

func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSubscription,
eventsToClientPubSub adjudicatorPub, thisCh *ch) {
parent := thisCh
if thisCh.parent != channel.Zero {
parent, _ = w.ch(thisCh.parent)
}

for e := eventsFromChainSub.Next(); e != nil; e = eventsFromChainSub.Next() {
switch e.(type) {
case *channel.RegisteredEvent:
log := log.WithFields(log.Fields{"ID": e.ID(), "Version": e.Version()})
log.Debug("Received registered event from chain")

err := eventsToClientPubSub.Publish(e)
if err != nil {
log.Errorf("Error publishing to client: %v", err)
} else {
log.Debug("Published to client")
}
parent.subChsAccess.Lock()
func() {
defer parent.subChsAccess.Unlock()

log := log.WithFields(log.Fields{"ID": e.ID(), "Version": e.Version()})
log.Debug("Received registered event from chain")

err := eventsToClientPubSub.Publish(e)
if err != nil {
log.Errorf("Error publishing to client: %v", err)
}

latestTx := thisCh.retreiveLatestTx()
log.Debugf("Latest version is (%d)", latestTx.Version)

if e.Version() < latestTx.Version {
if e.Version() < thisCh.registeredVersion {
log.Debugf("Latest version (%d) already registered ", thisCh.registeredVersion)
return
}
log.Debugf("Registering latest version (%d)", latestTx.Version)
err := registerDispute(w.registry, w.rs, parent)
if err != nil {
log.Error("Error registering dispute")
// TODO: Should the subscription be closed ?
return
}
log.Debug("Registered successfully")
}
}()
default:
}
}
}

// registerDispute collects the latest tx for the parent channel and each of
// its children. It then registers dispute for the channel tree.
//
// This function assumes the callers has locked the parent channel.
func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) error {
parentTx := parentCh.retreiveLatestTx()
subStates := retreiveLatestSubStates(r, parentTx)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := registerer.Register(ctx, makeAdjudicatorReq(parentCh.params, parentTx), subStates)
if err != nil {
return err
}

parentCh.registeredVersion = parentTx.Version
for i := range subStates {
subCh, _ := r.ch(parentTx.Allocation.Locked[i].ID)
subCh.registeredVersion = subStates[i].State.Version
}
return nil
}

func retreiveLatestSubStates(r *registry, parentTx channel.Transaction) []channel.SignedState {
subStates := make([]channel.SignedState, len(parentTx.Allocation.Locked))
for i := range parentTx.Allocation.Locked {
// Can be done concurrently.
subCh, _ := r.ch(parentTx.Allocation.Locked[i].ID)
subChTx := subCh.retreiveLatestTx()
subStates[i] = channel.SignedState{
Params: subCh.params,
State: subChTx.State,
Sigs: subChTx.Sigs,
}
}
return subStates
}

func makeAdjudicatorReq(params *channel.Params, tx channel.Transaction) channel.AdjudicatorReq {
return channel.AdjudicatorReq{
Params: params,
Tx: tx,
Secondary: false,
}
}
150 changes: 150 additions & 0 deletions watcher/local/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -106,6 +107,26 @@ func Test_Watcher_Working(t *testing.T) {
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)
rs.AssertExpectations(t)
})
t.Run("older_state_registered", func(t *testing.T) {
params, txs := randomTxsForSingleCh(rng, 3)
adjSub, trigger := setupAdjudicatorSub(makeRegisteredEvent(txs[1], txs[2])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSub, nil)
setupRegistered(t, rs, &channelTree{txs[2], []channel.Transaction{}})
w := newWatcher(t, rs)
// Start watching.
statesPub, eventsForClient := startWatchingForLedgerCh(t, w, makeSignedState(params, txs[0].State))

// Publish states
require.NoError(t, statesPub.Publish(txs[1]))
require.NoError(t, statesPub.Publish(txs[2]))

// Trigger events and assert.
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)
rs.AssertExpectations(t)
})
})

t.Run("ledger_channel_with_sub_channel", func(t *testing.T) {
Expand Down Expand Up @@ -135,6 +156,83 @@ func Test_Watcher_Working(t *testing.T) {
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)
rs.AssertExpectations(t)
})
t.Run("older_state_registered", func(t *testing.T) {
parentParams, parentTxs := randomTxsForSingleCh(rng, 3)
childParams, childTxs := randomTxsForSingleCh(rng, 3)
parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation.

adjSubParent, triggerParent := setupAdjudicatorSub(makeRegisteredEvent(parentTxs[1], parentTxs[2])...)
adjSubChild, triggerChild := setupAdjudicatorSub(makeRegisteredEvent(childTxs[1], childTxs[2])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubParent, nil).Once()
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubChild, nil).Once()
setupRegistered(t, rs, &channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}})

w := newWatcher(t, rs)
parentSignedState := makeSignedState(parentParams, parentTxs[0].State)
parentStatesPub, eventsForClientParent := startWatchingForLedgerCh(t, w, parentSignedState)

// Publish states
require.NoError(t, parentStatesPub.Publish(parentTxs[1]))
require.NoError(t, parentStatesPub.Publish(parentTxs[2]))

// Register and publish states on child channel.
childSignedState := makeSignedState(childParams, childTxs[0].State)
childStatesPub, eventsForClientChild := startWatchingForNonLedgerCh(t, w, childSignedState, parentTxs[0].State.ID)
require.NoError(t, childStatesPub.Publish(childTxs[1]))
require.NoError(t, childStatesPub.Publish(childTxs[2]))

triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)
rs.AssertExpectations(t)
})
t.Run("older_state_registered_receive_new_state", func(t *testing.T) {
parentParams, parentTxs := randomTxsForSingleCh(rng, 3)
childParams, childTxs := randomTxsForSingleCh(rng, 4)
parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation.

adjSubParent, triggerParent := setupAdjudicatorSub(makeRegisteredEvent(parentTxs[1], parentTxs[2], parentTxs[2])...)
adjSubChild, triggerChild := setupAdjudicatorSub(makeRegisteredEvent(childTxs[1], childTxs[2], childTxs[3])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubParent, nil).Once()
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubChild, nil).Once()
setupRegistered(t, rs,
&channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}},
&channelTree{parentTxs[2], []channel.Transaction{childTxs[3]}})

w := newWatcher(t, rs)
parentSignedState := makeSignedState(parentParams, parentTxs[0].State)
parentStatesPub, eventsForClientParent := startWatchingForLedgerCh(t, w, parentSignedState)

// Publish states
require.NoError(t, parentStatesPub.Publish(parentTxs[1]))
require.NoError(t, parentStatesPub.Publish(parentTxs[2]))

// Register and publish states on child channel.
childSignedState := makeSignedState(childParams, childTxs[0].State)
childStatesPub, eventsForClientChild := startWatchingForNonLedgerCh(t, w, childSignedState, parentTxs[0].State.ID)
require.NoError(t, childStatesPub.Publish(childTxs[1]))
require.NoError(t, childStatesPub.Publish(childTxs[2]))

// Trigger older adjudicator event for and wait for register.
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

// Publish newer state, trigger adjudicator event for registered state and wait for re-register.
require.NoError(t, childStatesPub.Publish(childTxs[3]))
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

// Trigger adjudicator event for registered state.
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)
rs.AssertExpectations(t)
})
})
}

Expand Down Expand Up @@ -211,6 +309,58 @@ func setupAdjudicatorSub(adjEvents ...channel.AdjudicatorEvent) (*mocks.Adjudica
return adjSub, triggers
}

type channelTree struct {
rootTx channel.Transaction
subTxs []channel.Transaction
}

func setupRegistered(t *testing.T, rs *mocks.RegisterSubscriber, channelTrees ...*channelTree) {
limit := len(channelTrees)
mtx := sync.Mutex{}
iChannelTree := 0

rs.On("Register", mock.Anything,
mock.MatchedBy(func(req channel.AdjudicatorReq) bool {
mtx.Lock()
if iChannelTree >= limit {
return false
}
return assertEqualAdjudicatorReq(t, req, channelTrees[iChannelTree].rootTx.State)
}),
mock.MatchedBy(func(subStates []channel.SignedState) bool {
defer func() {
iChannelTree += 1
mtx.Unlock()
}()
if iChannelTree >= limit {
return false
}
return assertEqualSignedStates(t, subStates, channelTrees[iChannelTree].subTxs)
})).Return(nil).Times(limit)
}

func assertEqualAdjudicatorReq(t *testing.T, got channel.AdjudicatorReq, want *channel.State) bool {
if nil != got.Tx.State.Equal(want) {
t.Logf("Got %+v, expected %+v", got.Tx.State, want)
return false
}
return true
}

func assertEqualSignedStates(t *testing.T, got []channel.SignedState, want []channel.Transaction) bool {
if len(got) != len(want) {
t.Logf("Got %d sub states, expected %d sub states", len(got), len(want))
return false
}
for iSubState := range got {
t.Logf("Got %+v, expected %+v", got[iSubState].State, want[iSubState].State)
if nil != got[iSubState].State.Equal(want[iSubState].State) {
return false
}
}
return true
}

func makeRegisteredEvent(txs ...channel.Transaction) []channel.AdjudicatorEvent {
events := make([]channel.AdjudicatorEvent, len(txs))
for i, tx := range txs {
Expand Down

0 comments on commit 096646e

Please sign in to comment.