Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement local watcher #182

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
go.uber.org/goleak v1.1.11 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,13 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down Expand Up @@ -457,6 +460,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -508,8 +512,10 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 h1:uCLL3g5wH2xjxVREVuAbP9JM5PPKjRbXKRa6IBjkzmU=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down Expand Up @@ -553,6 +559,7 @@ golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
4 changes: 4 additions & 0 deletions watcher/internal/mocks/AdjudicatorSubscription.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

160 changes: 136 additions & 24 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package local

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -33,6 +34,16 @@ const (
)

type (
// closer is the interface that wraps the close method.
closer interface {
close() error
}

// Closer is the interface that wraps the Close method.
Closer interface {
Close() error
}

// Watcher implements a local watcher.
Watcher struct {
rs channel.RegisterSubscriber
Expand All @@ -47,12 +58,19 @@ type (
ch struct {
id channel.ID
params *channel.Params
parent channel.ID

parent channel.ID
subChs map[channel.ID]struct{}
archivedSubChStates map[channel.ID]channel.SignedState

registeredVersion uint64
requestLatestTx chan struct{}
latestTx chan channel.Transaction

eventsFromChainSub Closer
eventsToClientSub closer
statesPubSub closer

subChsAccess sync.Mutex
}
)
Expand Down Expand Up @@ -81,9 +99,15 @@ func (w *Watcher) StartWatchingSubChannel(ctx context.Context, parent channel.ID
if !ok {
return nil, nil, errors.New("parent channel not registered with the watcher")
}

parentCh.subChsAccess.Lock()
defer parentCh.subChsAccess.Unlock()
return w.startWatching(ctx, parent, signedState)
statesPub, eventsSub, err := w.startWatching(ctx, parent, signedState)
if err != nil {
return nil, nil, err
}
parentCh.subChs[signedState.State.ID] = struct{}{}
return statesPub, eventsSub, nil
}

func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedState channel.SignedState) (
Expand All @@ -102,7 +126,7 @@ func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedSt
}
statesPubSub := newStatesPubSub()
eventsToClientPubSub := newAdjudicatorEventsPubSub()
ch := newCh(id, parent, signedState.Params)
ch := newCh(id, parent, signedState.Params, eventsFromChainSub, eventsToClientPubSub, statesPubSub)

w.registry.addUnsafe(ch)

Expand All @@ -116,14 +140,23 @@ func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedSt
return statesPubSub, eventsToClientPubSub, nil
}

func newCh(id, parent channel.ID, params *channel.Params) *ch {
func newCh(id, parent channel.ID, params *channel.Params,
eventsFromChainSub Closer, eventsToClientSub, statesPubSub closer) *ch {
return &ch{
id: id,
params: params,
parent: parent,
id: id,
params: params,

parent: parent,
subChs: make(map[channel.ID]struct{}),
archivedSubChStates: make(map[channel.ID]channel.SignedState),

registeredVersion: 0,
requestLatestTx: make(chan struct{}),
latestTx: make(chan channel.Transaction),

eventsFromChainSub: eventsFromChainSub,
eventsToClientSub: eventsToClientSub,
statesPubSub: statesPubSub,
}
}

Expand All @@ -132,20 +165,21 @@ func (ch *ch) retreiveLatestTx() channel.Transaction {
return <-ch.latestTx
}

func handleStatesFromClient(currentTx channel.Transaction, statesSub statesSub, requestLatestTxn chan struct{},
func handleStatesFromClient(currentTx channel.Transaction, statesSub statesSub, requestLatestTx chan struct{},
latestTx chan channel.Transaction) {
var _tx channel.Transaction
var ok bool
for {
select {
case _tx, ok = <-statesSub.statesStream():
if !ok {
// TODO: Read error.
log.WithField("ID", currentTx.State.ID).Info("States sub closed by client. Shutting down handler")
return
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
case <-requestLatestTxn:
log.WithField("ID", currentTx.ID).Debugf("Received state from client %v", currentTx.State)
case <-requestLatestTx:
currentTx = receiveTxUntil(statesSub, time.NewTimer(statesFromClientWaitTime).C, currentTx)
latestTx <- currentTx
}
Expand All @@ -165,7 +199,7 @@ func receiveTxUntil(statesSub statesSub, timeout <-chan time.Time, currentTx cha
return currentTx // states sub was closed, send the latest event.
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
log.WithField("ID", currentTx.ID).Debugf("Received state from client %v", currentTx.State)
case <-timeout:
return currentTx // timer expired, send the latest the event.
}
Expand All @@ -187,7 +221,7 @@ func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSu
defer parent.subChsAccess.Unlock()

log := log.WithFields(log.Fields{"ID": e.ID(), "Version": e.Version()})
log.Debug("Received registered event from chain")
log.Debugf("Received registered event from chain: %v", e)

eventsToClientPubSub.publish(e)

Expand Down Expand Up @@ -215,15 +249,18 @@ func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSu
default:
}
}

err := eventsFromChainSub.Err()
log := log.WithField("ID", thisCh.id)
log.Errorf("Events from chain sub was closed with error: %v", err)
}

// registerDispute collects the latest transaction for the parent channel and
// each of its children. It then registers dispute for the channel tree.
//
// This function assumes the callers has locked the parent channel.
func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) error {
parentTx := parentCh.retreiveLatestTx()
subStates := retreiveLatestSubStates(r, parentTx)
parentTx, subStates := retreiveLatestSubStates(r, parentCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -234,25 +271,28 @@ func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) e

parentCh.registeredVersion = parentTx.Version
for i := range subStates {
subCh, _ := r.retrieve(parentTx.Allocation.Locked[i].ID)
subCh.registeredVersion = subStates[i].State.Version
subCh, ok := r.retrieve(parentTx.Allocation.Locked[i].ID)
if ok {
subCh.registeredVersion = subStates[i].State.Version
}
}
return nil
}

func retreiveLatestSubStates(r *registry, parentTx channel.Transaction) []channel.SignedState {
func retreiveLatestSubStates(r *registry, parent *ch) (channel.Transaction, []channel.SignedState) {
parentTx := parent.retreiveLatestTx()
subStates := make([]channel.SignedState, len(parentTx.Allocation.Locked))
for i := range parentTx.Allocation.Locked {
// Can be done concurrently.
subCh, _ := r.retrieve(parentTx.Allocation.Locked[i].ID)
subChTx := subCh.retreiveLatestTx()
subStates[i] = channel.SignedState{
Params: subCh.params,
State: subChTx.State,
Sigs: subChTx.Sigs,
subCh, ok := r.retrieve(parentTx.Allocation.Locked[i].ID)
if ok {
subChTx := subCh.retreiveLatestTx()
subStates[i] = makeSignedState(subCh.params, subChTx)
} else {
subStates[i] = parent.archivedSubChStates[parentTx.Allocation.Locked[i].ID]
}
}
return subStates
return parentTx, subStates
}

func makeAdjudicatorReq(params *channel.Params, tx channel.Transaction) channel.AdjudicatorReq {
Expand All @@ -262,3 +302,75 @@ func makeAdjudicatorReq(params *channel.Params, tx channel.Transaction) channel.
Secondary: false,
}
}

// StopWatching stops watching for adjudicator events, closes the pub-sub
// instances and removes the channel from the registry.
//
// Client should invoke stop watching for all the sub-channels before invoking
// for the parent ledger channel.
//
// In case of stop watching for sub-channels, watcher ensures that, when it
// receives a registered event for its parent channel or any other sub-channels
// of the parent channel, it is able to successfully refute with the latest
// states for the ledger channel and all its sub-channels (even if the watcher
// has stopped watching for some of the sub-channel).
func (w *Watcher) StopWatching(id channel.ID) error {
ch, ok := w.retrieve(id)
if !ok {
return errors.New("channel not registered with the watcher")
}

parent := ch.parent
if parent != channel.Zero { // Sub channel.
parentCh, ok := w.retrieve(parent)
if !ok {
// Code MUST NOT reach this point
return errors.New("Fatal error: parent channel not registered with watcher")
}
parentCh.subChsAccess.Lock()
defer parentCh.subChsAccess.Unlock()
if _, ok := parentCh.retreiveLatestTx().SubAlloc(id); ok {
parentCh.archivedSubChStates[id] = makeSignedState(ch.params, ch.retreiveLatestTx())
}
delete(parentCh.subChs, id)
} else { // Ledger channel.
ch.subChsAccess.Lock()
defer ch.subChsAccess.Unlock()

if len(ch.subChs) > 0 {
return fmt.Errorf("cannot de-register when sub-channels are present: %d %v", len(ch.subChs), ch.id)
}
}

errMsg := closePubSubs(ch)
w.remove(ch.id)

if errMsg != "" {
err := errors.New("Stop Watching errors: " + errMsg)
log.WithField("id", id).Error(err.Error())
return err
}
return nil
}

func closePubSubs(ch *ch) string {
errMsg := ""
if err := ch.eventsFromChainSub.Close(); err != nil {
errMsg += fmt.Sprintf("closing events from chain sub: %v:", err)
}
if err := ch.eventsToClientSub.close(); err != nil {
errMsg += fmt.Sprintf("closing events to client pub-sub: %v:", err)
}
if err := ch.statesPubSub.close(); err != nil {
errMsg += fmt.Sprintf("closing states from client pub-sub: %v:", err)
}
return errMsg
}

func makeSignedState(params *channel.Params, tx channel.Transaction) channel.SignedState {
return channel.SignedState{
Params: params,
State: tx.State,
Sigs: tx.Sigs,
}
}
Loading