Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement local watcher #182

155 changes: 147 additions & 8 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"context"
"sync"
"time"

"github.com/pkg/errors"

Expand All @@ -25,6 +26,12 @@ import (
"perun.network/go-perun/watcher"
)

const (
// Duration for which the watcher will wait for latest transactions
// when an adjudicator event is received.
statesFromClientWaitTime = 1 * time.Millisecond
)

type (
// Watcher implements a local watcher.
Watcher struct {
Expand All @@ -42,6 +49,10 @@ type (
params *channel.Params
parent channel.ID

registeredVersion uint64
requestLatestTx chan struct{}
latestTx chan channel.Transaction

subChsAccess sync.Mutex
}
)
Expand Down Expand Up @@ -95,28 +106,156 @@ func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedSt

w.registry.addUnsafe(ch)

tx := channel.Transaction{
State: signedState.State,
Sigs: signedState.Sigs,
}
go handleStatesFromClient(tx, statesPubSub, ch.requestLatestTx, ch.latestTx)
go w.handleEventsFromChain(eventsFromChainSub, eventsToClientPubSub, ch)

return statesPubSub, eventsToClientPubSub, nil
}

func newCh(id, parent channel.ID, params *channel.Params) *ch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to the file that defines ch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to #213

return &ch{
id: id,
params: params,
parent: parent,
id: id,
params: params,
parent: parent,
registeredVersion: 0,
requestLatestTx: make(chan struct{}),
latestTx: make(chan channel.Transaction),
}
}

func (ch *ch) retreiveLatestTx() channel.Transaction {
ch.requestLatestTx <- struct{}{}
return <-ch.latestTx
}

func handleStatesFromClient(currentTx channel.Transaction, statesSub statesSub, requestLatestTxn chan struct{},
latestTx chan channel.Transaction) {
var _tx channel.Transaction
var ok bool
for {
select {
case _tx, ok = <-statesSub.statesStream():
if !ok {
log.WithField("ID", currentTx.State.ID).Info("States sub closed by client. Shutting down handler")
return
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
case <-requestLatestTxn:
currentTx = receiveTxUntil(statesSub, time.NewTimer(statesFromClientWaitTime).C, currentTx)
latestTx <- currentTx
}
}
}

// receiveTxUntil wait for the transactions on statesPub until the timeout channel is closed
// or statesPub is closed and returns the last received transaction.
// If no transaction was received, then returns currentTx itself.
func receiveTxUntil(statesSub statesSub, timeout <-chan time.Time, currentTx channel.Transaction) channel.Transaction {
var _tx channel.Transaction
var ok bool
for {
select {
case _tx, ok = <-statesSub.statesStream():
if !ok {
return currentTx // states sub was closed, send the latest event.
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
case <-timeout:
return currentTx // timer expired, send the latest the event.
}
}
}

func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSubscription, eventsToClientPubSub adjudicatorPub,
thisCh *ch) {
func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSubscription,
eventsToClientPubSub adjudicatorPub, thisCh *ch) {
parent := thisCh
if thisCh.parent != channel.Zero {
parent, _ = w.registry.retrieve(thisCh.parent)
}

for e := eventsFromChainSub.Next(); e != nil; e = eventsFromChainSub.Next() {
switch e.(type) {
case *channel.RegisteredEvent:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in #207 , you may need to check also for the Progressed and Concluded events to be sure that you notice channel registration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in #215.

log := log.WithFields(log.Fields{"ID": e.ID(), "Version": e.Version()})
log.Debug("Received registered event from chain")
eventsToClientPubSub.Publish(e)
parent.subChsAccess.Lock()
func() {
defer parent.subChsAccess.Unlock()

log := log.WithFields(log.Fields{"ID": e.ID(), "Version": e.Version()})
log.Debug("Received registered event from chain")

eventsToClientPubSub.publish(e)

latestTx := thisCh.retreiveLatestTx()
log.Debugf("Latest version is (%d)", latestTx.Version)

if e.Version() < latestTx.Version {
if e.Version() < thisCh.registeredVersion {
log.Debugf("Latest version (%d) already registered ", thisCh.registeredVersion)
return
}
log.Debugf("Registering latest version (%d)", latestTx.Version)
err := registerDispute(w.registry, w.rs, parent)
if err != nil {
log.Error("Error registering dispute")
// TODO: Should the subscription be closed ?
return
}
log.Debug("Registered successfully")
}
}()
default:
}
}
}

// registerDispute collects the latest transaction for the parent channel and
// each of its children. It then registers dispute for the channel tree.
//
// This function assumes the callers has locked the parent channel.
func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) error {
parentTx := parentCh.retreiveLatestTx()
subStates := retreiveLatestSubStates(r, parentTx)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := registerer.Register(ctx, makeAdjudicatorReq(parentCh.params, parentTx), subStates)
if err != nil {
return err
}

parentCh.registeredVersion = parentTx.Version
for i := range subStates {
subCh, _ := r.retrieve(parentTx.Allocation.Locked[i].ID)
subCh.registeredVersion = subStates[i].State.Version
}
return nil
}

func retreiveLatestSubStates(r *registry, parentTx channel.Transaction) []channel.SignedState {
subStates := make([]channel.SignedState, len(parentTx.Allocation.Locked))
for i := range parentTx.Allocation.Locked {
// Can be done concurrently.
subCh, _ := r.retrieve(parentTx.Allocation.Locked[i].ID)
subChTx := subCh.retreiveLatestTx()
subStates[i] = channel.SignedState{
Params: subCh.params,
State: subChTx.State,
Sigs: subChTx.Sigs,
}
}
return subStates
}

func makeAdjudicatorReq(params *channel.Params, tx channel.Transaction) channel.AdjudicatorReq {
return channel.AdjudicatorReq{
Params: params,
Tx: tx,
Secondary: false,
}
}
150 changes: 150 additions & 0 deletions watcher/local/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -124,6 +125,26 @@ func Test_Watcher_Working(t *testing.T) {
// Trigger events.
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)

rs.AssertExpectations(t)
})
t.Run("error/older_state_registered", func(t *testing.T) {
params, txs := randomTxsForSingleCh(rng, 3)
adjSub, trigger := setupAdjudicatorSub(makeRegisteredEvent(txs[1], txs[2])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSub, nil)
setupRegistered(t, rs, &channelTree{txs[2], []channel.Transaction{}})
w := newWatcher(t, rs)
// Start watching and publish states.
statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State))
require.NoError(t, statesPub.Publish(txs[1]))
require.NoError(t, statesPub.Publish(txs[2]))

// Trigger events.
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)
// Trigger events for registered state.
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)

rs.AssertExpectations(t)
})
})
Expand Down Expand Up @@ -188,6 +209,83 @@ func Test_Watcher_Working(t *testing.T) {
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)
rs.AssertExpectations(t)
})
t.Run("happy/older_state_registered", func(t *testing.T) {
parentParams, parentTxs := randomTxsForSingleCh(rng, 3)
childParams, childTxs := randomTxsForSingleCh(rng, 3)
parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation.

adjSubParent, triggerParent := setupAdjudicatorSub(makeRegisteredEvent(parentTxs[1], parentTxs[2])...)
adjSubChild, triggerChild := setupAdjudicatorSub(makeRegisteredEvent(childTxs[1], childTxs[2])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubParent, nil).Once()
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubChild, nil).Once()
setupRegistered(t, rs, &channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}})

w := newWatcher(t, rs)
// Parent: Start watching and publish states.
parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State)
parentStatesPub, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState)
require.NoError(t, parentStatesPub.Publish(parentTxs[1]))
require.NoError(t, parentStatesPub.Publish(parentTxs[2]))

// Child: Start watching and publish states.
childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State)
childStatesPub, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID)
require.NoError(t, childStatesPub.Publish(childTxs[1]))
require.NoError(t, childStatesPub.Publish(childTxs[2]))

// Parent, Child: Trigger events.
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)
// Parent, Child: Trigger events for registered state.
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)
rs.AssertExpectations(t)
})
t.Run("happy/older_state_registered_then_newer_state_received", func(t *testing.T) {
parentParams, parentTxs := randomTxsForSingleCh(rng, 3)
childParams, childTxs := randomTxsForSingleCh(rng, 4)
parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation.

adjSubParent, triggerParent := setupAdjudicatorSub(makeRegisteredEvent(parentTxs[1], parentTxs[2], parentTxs[2])...)
adjSubChild, triggerChild := setupAdjudicatorSub(makeRegisteredEvent(childTxs[1], childTxs[2], childTxs[3])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubParent, nil).Once()
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubChild, nil).Once()
setupRegistered(t, rs,
&channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}},
&channelTree{parentTxs[2], []channel.Transaction{childTxs[3]}})

w := newWatcher(t, rs)
// Parent: Start watching and publish states.
parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State)
parentStatesPub, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState)
require.NoError(t, parentStatesPub.Publish(parentTxs[1]))
require.NoError(t, parentStatesPub.Publish(parentTxs[2]))

// Child: Start watching and publish states.
childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State)
childStatesPub, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID)
require.NoError(t, childStatesPub.Publish(childTxs[1]))
require.NoError(t, childStatesPub.Publish(childTxs[2]))

// Trigger event with older state.
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

// Child: Publish newer state. Parent, Child: trigger adjduciator event for the registered state.
require.NoError(t, childStatesPub.Publish(childTxs[3]))
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

// Parent, Child: Trigger events for registered state(second time).
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

rs.AssertExpectations(t)
})
})
}

Expand Down Expand Up @@ -264,6 +362,58 @@ func setupAdjudicatorSub(adjEvents ...channel.AdjudicatorEvent) (*mocks.Adjudica
return adjSub, triggers
}

type channelTree struct {
rootTx channel.Transaction
subTxs []channel.Transaction
}

func setupRegistered(t *testing.T, rs *mocks.RegisterSubscriber, channelTrees ...*channelTree) {
limit := len(channelTrees)
mtx := sync.Mutex{}
iChannelTree := 0

rs.On("Register", mock.Anything,
mock.MatchedBy(func(req channel.AdjudicatorReq) bool {
mtx.Lock()
if iChannelTree >= limit {
return false
}
return assertEqualAdjudicatorReq(t, req, channelTrees[iChannelTree].rootTx.State)
}),
mock.MatchedBy(func(subStates []channel.SignedState) bool {
defer func() {
iChannelTree += 1
mtx.Unlock()
}()
if iChannelTree >= limit {
return false
}
return assertEqualSignedStates(t, subStates, channelTrees[iChannelTree].subTxs)
})).Return(nil).Times(limit)
}

func assertEqualAdjudicatorReq(t *testing.T, got channel.AdjudicatorReq, want *channel.State) bool {
if nil != got.Tx.State.Equal(want) {
t.Logf("Got %+v, expected %+v", got.Tx.State, want)
return false
}
return true
}

func assertEqualSignedStates(t *testing.T, got []channel.SignedState, want []channel.Transaction) bool {
if len(got) != len(want) {
t.Logf("Got %d sub states, expected %d sub states", len(got), len(want))
return false
}
for iSubState := range got {
t.Logf("Got %+v, expected %+v", got[iSubState].State, want[iSubState].State)
if nil != got[iSubState].State.Equal(want[iSubState].State) {
return false
}
}
return true
}

func makeRegisteredEvent(txs ...channel.Transaction) []channel.AdjudicatorEvent {
events := make([]channel.AdjudicatorEvent, len(txs))
for i, tx := range txs {
Expand Down