Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement local watcher #182

69 changes: 69 additions & 0 deletions watcher/local/adjudicatorpubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021 - See NOTICE file for copyright holders.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"github.com/pkg/errors"

"perun.network/go-perun/channel"
"perun.network/go-perun/pkg/sync/atomic"
"perun.network/go-perun/watcher"
)

var _ watcher.AdjudicatorSub = &adjudicatorPubSub{}

type (
adjudicatorPubSub struct {
closed atomic.Bool
err error
pipe chan channel.AdjudicatorEvent
}

adjudicatorPub interface {
publish(channel.AdjudicatorEvent)
close() error
}
)

func newAdjudicatorEventsPubSub() *adjudicatorPubSub {
return &adjudicatorPubSub{
pipe: make(chan channel.AdjudicatorEvent, 10),
}
}

func (a *adjudicatorPubSub) publish(e channel.AdjudicatorEvent) {
a.pipe <- e
}

// Close closes the publisher instance and all the subscriptions associated
// with it. Any further call to Publish should immediately return error.
func (a *adjudicatorPubSub) close() error {
if a.closed.IsSet() {
return errors.New("publisher is closed")
}
a.err = errors.WithStack(ErrAlreadyClosed)
close(a.pipe)
a.closed.Set()
return nil
}

func (a *adjudicatorPubSub) EventStream() <-chan channel.AdjudicatorEvent {
return a.pipe
}

// Err returns the error after the the subscription is closed.
func (a *adjudicatorPubSub) Err() error {
return a.err
}
5 changes: 3 additions & 2 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ type (
// from the watcher to the client.
//
// This is initialized when client starts watching for a given channel.
// Client can receive the event via the channel returned by Next method.
// Client can receive the event via the channel returned by EventStream
// method.
//
// This channel will be closed when client requests the watcher to stop
// watching or when there is an error. After the channel is closed, error
// message can be read using the Err method.
AdjudicatorSub interface {
Next() <-chan channel.AdjudicatorEvent
EventStream() <-chan channel.AdjudicatorEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wha happens if EventStream() is called twice? Should the implementation support multiplexing?

Copy link
Author

@manoranjith manoranjith Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated documentation that Repeated calls to the EventStream method returns the same channel instance. in PR #212.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think multiplexing is not necessary in our use case, because the only consumer of the events is the watch function.

If at a later point in time, we have the need for consuming the events in multiple places, then we could implement multiplexing.


Err() error
}
Expand Down