Skip to content

Commit

Permalink
RSDK-7268 RSDK-7329 Make StreamTicks take in a list of DigitalInterru…
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviamiller authored and vijayvuyyuru committed Apr 25, 2024
1 parent f91107b commit 9167c20
Show file tree
Hide file tree
Showing 23 changed files with 154 additions and 87 deletions.
16 changes: 2 additions & 14 deletions components/board/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package board

import (
"context"
"fmt"
"time"

commonpb "go.viam.com/api/common/v1"
Expand Down Expand Up @@ -82,7 +81,8 @@ type Board interface {
WriteAnalog(ctx context.Context, pin string, value int32, extra map[string]interface{}) error

// StreamTicks starts a stream of digital interrupt ticks.
StreamTicks(ctx context.Context, interrupts []string, ch chan Tick, extra map[string]interface{}) error
StreamTicks(ctx context.Context, interrupts []DigitalInterrupt, ch chan Tick,
extra map[string]interface{}) error
}

// An Analog represents an analog pin that resides on a board.
Expand All @@ -109,15 +109,3 @@ func FromRobot(r robot.Robot, name string) (Board, error) {
func NamesFromRobot(r robot.Robot) []string {
return robot.NamesByAPI(r, API)
}

// RemoveCallbacks removes the callbacks from the given interrupts.
func RemoveCallbacks(b Board, interrupts []string, ch chan Tick) error {
for _, name := range interrupts {
i, ok := b.DigitalInterruptByName(name)
if !ok {
return fmt.Errorf("unknown digitial interrupt: %s", name)
}
i.RemoveCallback(ch)
}
return nil
}
8 changes: 7 additions & 1 deletion components/board/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,20 @@ func (dic *digitalInterruptClient) Tick(ctx context.Context, high bool, nanoseco
panic(errUnimplemented)
}

func (dic *digitalInterruptClient) Name() string {
return dic.digitalInterruptName
}

func (dic *digitalInterruptClient) AddCallback(ch chan Tick) {
panic(errUnimplemented)
}

func (dic *digitalInterruptClient) RemoveCallback(ch chan Tick) {
}

func (c *client) StreamTicks(ctx context.Context, interrupts []string, ch chan Tick, extra map[string]interface{}) error {
func (c *client) StreamTicks(ctx context.Context, interrupts []DigitalInterrupt, ch chan Tick,
extra map[string]interface{},
) error {
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions components/board/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,21 @@ func TestWorkingClient(t *testing.T) {
actualExtra = nil

// StreamTicks
injectBoard.StreamTicksFunc = func(ctx context.Context, interrupts []string, ch chan board.Tick, extra map[string]interface{}) error {
injectBoard.StreamTicksFunc = func(ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
actualExtra = extra
return nil
}
err = injectBoard.StreamTicks(context.Background(), []string{"pin1"}, make(chan board.Tick), expectedExtra)
err = injectBoard.StreamTicks(context.Background(), []board.DigitalInterrupt{digital1}, make(chan board.Tick), expectedExtra)
test.That(t, err, test.ShouldBeNil)
test.That(t, actualExtra, test.ShouldResemble, expectedExtra)
actualExtra = nil
injectDigitalInterrupt.NameFunc = func() string {
return "digital1"
}
name := digital1.Name()
test.That(t, name, test.ShouldEqual, "digital1")

// SetPowerMode (currently unimplemented in RDK)
injectBoard.SetPowerModeFunc = func(ctx context.Context, mode boardpb.PowerMode, duration *time.Duration) error {
Expand Down
3 changes: 3 additions & 0 deletions components/board/digital_interrupts.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ type DigitalInterrupt interface {
// RemoveCallback removes a listener for interrupts.
RemoveCallback(c chan Tick)

// Name returns the name of the interrupt.
Name() string

Close(ctx context.Context) error
}
20 changes: 10 additions & 10 deletions components/board/fake/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,9 @@ func (b *Board) WriteAnalog(ctx context.Context, pin string, value int32, extra
}

// StreamTicks starts a stream of digital interrupt ticks.
func (b *Board) StreamTicks(ctx context.Context, interruptNames []string, ch chan board.Tick, extra map[string]interface{}) error {
var interrupts []board.DigitalInterrupt
for _, name := range interruptNames {
interrupt, ok := b.DigitalInterruptByName(name)
if !ok {
return errors.Errorf("unknown digital interrupt: %s", name)
}
interrupts = append(interrupts, interrupt)
}

func (b *Board) StreamTicks(ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
for _, i := range interrupts {
i.AddCallback(ch)
}
Expand Down Expand Up @@ -450,6 +443,13 @@ func (s *DigitalInterruptWrapper) RemoveCallback(c chan board.Tick) {
s.di.RemoveCallback(c)
}

// Name returns the name of the digital interrupt.
func (s *DigitalInterruptWrapper) Name() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.conf.Name
}

// Close does nothing.
func (s *DigitalInterruptWrapper) Close(ctx context.Context) error {
return nil
Expand Down
13 changes: 3 additions & 10 deletions components/board/genericlinux/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,16 +501,9 @@ func (b *Board) WriteAnalog(ctx context.Context, pin string, value int32, extra
}

// StreamTicks starts a stream of digital interrupt ticks.
func (b *Board) StreamTicks(ctx context.Context, interruptNames []string, ch chan board.Tick, extra map[string]interface{}) error {
var interrupts []board.DigitalInterrupt
for _, name := range interruptNames {
interrupt, ok := b.DigitalInterruptByName(name)
if !ok {
return errors.Errorf("unknown digital interrupt: %s", name)
}
interrupts = append(interrupts, interrupt)
}

func (b *Board) StreamTicks(ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
for _, i := range interrupts {
i.AddCallback(ch)
}
Expand Down
4 changes: 3 additions & 1 deletion components/board/hat/pca9685/pca9685.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ func (pca *PCA9685) frequency(ctx context.Context) (float64, error) {

// StreamTicks streams digital interrupt ticks.
// The pca9685 board does not have the systems hardware to implement a Tick counter.
func (pca *PCA9685) StreamTicks(ctx context.Context, interrupts []string, ch chan board.Tick, extra map[string]interface{}) error {
func (pca *PCA9685) StreamTicks(ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
return grpc.UnimplementedError
}

Expand Down
8 changes: 6 additions & 2 deletions components/board/interrupt_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type interruptStream struct {
extra *structpb.Struct
}

func (s *interruptStream) startStream(ctx context.Context, interrupts []string, ch chan Tick) error {
func (s *interruptStream) startStream(ctx context.Context, interrupts []DigitalInterrupt, ch chan Tick) error {
s.streamMu.Lock()
defer s.streamMu.Unlock()

Expand All @@ -39,10 +39,14 @@ func (s *interruptStream) startStream(ctx context.Context, interrupts []string,
return ctx.Err()
default:
}
names := []string{}
for _, i := range interrupts {
names = append(names, i.Name())
}

req := &pb.StreamTicksRequest{
Name: s.client.info.name,
PinNames: interrupts,
PinNames: names,
Extra: s.extra,
}

Expand Down
4 changes: 3 additions & 1 deletion components/board/numato/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ func (b *numatoBoard) readThread() {

// StreamTicks streams digital interrupt ticks.
// The numato board does not have the systems hardware to implement a Tick counter.
func (b *numatoBoard) StreamTicks(ctx context.Context, interrupts []string, ch chan board.Tick, extra map[string]interface{}) error {
func (b *numatoBoard) StreamTicks(ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
return grpc.UnimplementedError
}

Expand Down
13 changes: 3 additions & 10 deletions components/board/pi/impl/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,9 @@ func (pi *piPigpio) Reconfigure(
}

// StreamTicks starts a stream of digital interrupt ticks.
func (pi *piPigpio) StreamTicks(ctx context.Context, interruptNames []string, ch chan board.Tick, extra map[string]interface{}) error {
var interrupts []board.DigitalInterrupt
for _, name := range interruptNames {
interrupt, ok := pi.DigitalInterruptByName(name)
if !ok {
return errors.Errorf("unknown digital interrupt: %s", name)
}
interrupts = append(interrupts, interrupt)
}

func (pi *piPigpio) StreamTicks(ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
for _, i := range interrupts {
i.AddCallback(ch)
}
Expand Down
14 changes: 14 additions & 0 deletions components/board/pi/impl/digital_interrupts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ func (i *BasicDigitalInterrupt) RemoveCallback(c chan board.Tick) {
}
}

// Name returns the name of the interrupt.
func (i *BasicDigitalInterrupt) Name() string {
i.mu.Lock()
defer i.mu.Unlock()
return i.cfg.Name
}

// Close does nothing.
func (i *BasicDigitalInterrupt) Close(ctx context.Context) error {
return nil
Expand Down Expand Up @@ -204,6 +211,13 @@ func (i *ServoDigitalInterrupt) RemoveCallback(c chan board.Tick) {
panic("servos can't have callback")
}

// Name returns the name of the interrupt.
func (i *ServoDigitalInterrupt) Name() string {
i.mu.Lock()
defer i.mu.Unlock()
return i.cfg.Name
}

// Reconfigure reconfigures this digital interrupt.
func (i *ServoDigitalInterrupt) Reconfigure(conf DigitalInterruptConfig) error {
i.mu.Lock()
Expand Down
7 changes: 7 additions & 0 deletions components/board/pinwrappers/digital_interrupts.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ func (i *BasicDigitalInterrupt) RemoveCallback(c chan board.Tick) {
}
}

// Name returns the name of the digital interrupt.
func (i *BasicDigitalInterrupt) Name() string {
i.mu.Lock()
defer i.mu.Unlock()
return i.cfg.Name
}

// Close does nothing.
func (i *BasicDigitalInterrupt) Close(ctx context.Context) error {
return nil
Expand Down
19 changes: 16 additions & 3 deletions components/board/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/board/v1"
"go.viam.com/utils"

"go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -218,18 +217,32 @@ func (s *serviceServer) StreamTicks(
}

ticksChan := make(chan Tick)
err = b.StreamTicks(server.Context(), req.PinNames, ticksChan, req.Extra.AsMap())
interrupts := []DigitalInterrupt{}

for _, name := range req.PinNames {
di, ok := b.DigitalInterruptByName(name)
if !ok {
return errors.Errorf("unknown digital interrupt: %s", name)
}
interrupts = append(interrupts, di)
}
err = b.StreamTicks(server.Context(), interrupts, ticksChan, req.Extra.AsMap())
if err != nil {
return err
}

defer func() {
for _, i := range interrupts {
i.RemoveCallback(ticksChan)
}
}()

// Send an empty response first so the client doesn't block while checking for errors.
err = server.Send(&pb.StreamTicksResponse{})
if err != nil {
return err
}

defer utils.UncheckedErrorFunc(func() error { return RemoveCallbacks(b, req.PinNames, ticksChan) })
for {
select {
case <-server.Context().Done():
Expand Down
15 changes: 10 additions & 5 deletions components/board/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,10 +861,10 @@ func TestStreamTicks(t *testing.T) {
name: "unknown digital interrupt should return error",
injectDigitalInterrupts: nil,
injectDigitalInterruptOk: false,
streamTicksErr: errors.New("unknown digital interrupt: digital1"),
req: &request{Name: testBoardName, PinNames: []string{"digital1"}},
streamTicksErr: errors.New("unknown digital interrupt: digital3"),
req: &request{Name: testBoardName, PinNames: []string{"digital3"}},
expResp: nil,
expRespErr: "unknown digital interrupt: digital1",
expRespErr: "unknown digital interrupt: digital3",
sendFail: false,
},
{
Expand All @@ -886,7 +886,10 @@ func TestStreamTicks(t *testing.T) {
var actualExtra map[string]interface{}
callbacks := []chan board.Tick{}

injectBoard.StreamTicksFunc = func(ctx context.Context, interrupts []string, ch chan board.Tick, extra map[string]interface{}) error {
injectBoard.StreamTicksFunc = func(
ctx context.Context, interrupts []board.DigitalInterrupt, ch chan board.Tick,
extra map[string]interface{},
) error {
actualExtra = extra
callbacks = append(callbacks, ch)
return tc.streamTicksErr
Expand All @@ -895,8 +898,10 @@ func TestStreamTicks(t *testing.T) {
injectBoard.DigitalInterruptByNameFunc = func(name string) (board.DigitalInterrupt, bool) {
if name == "digital1" {
return tc.injectDigitalInterrupts[0], tc.injectDigitalInterruptOk
} else if name == "digital2" {
return tc.injectDigitalInterrupts[1], tc.injectDigitalInterruptOk
}
return tc.injectDigitalInterrupts[1], tc.injectDigitalInterruptOk
return nil, false
}
if tc.injectDigitalInterrupts != nil {
for _, i := range tc.injectDigitalInterrupts {
Expand Down
12 changes: 6 additions & 6 deletions components/encoder/incremental/incremental_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,19 @@ func (e *Encoder) Reconfigure(
e.boardName = newConf.BoardName
e.encAName = newConf.Pins.A
e.encBName = newConf.Pins.B
interrupts := []string{e.encAName, e.encBName}
// state is not really valid anymore
atomic.StoreInt64(&e.position, 0)
atomic.StoreInt64(&e.pRaw, 0)
atomic.StoreInt64(&e.pState, 0)
e.mu.Unlock()

e.Start(ctx, board, interrupts)
e.Start(ctx, board)

return nil
}

// Start starts the Encoder background thread.
func (e *Encoder) Start(ctx context.Context, b board.Board, interrupts []string) {
func (e *Encoder) Start(ctx context.Context, b board.Board) {
/**
a rotary encoder looks like
Expand Down Expand Up @@ -209,7 +208,7 @@ func (e *Encoder) Start(ctx context.Context, b board.Board, interrupts []string)
// x -> impossible state

ch := make(chan board.Tick)
err := b.StreamTicks(e.cancelCtx, interrupts, ch, nil)
err := b.StreamTicks(e.cancelCtx, []board.DigitalInterrupt{e.A, e.B}, ch, nil)
if err != nil {
utils.Logger.Errorw("error getting digital interrupt ticks", "error", err)
return
Expand All @@ -229,7 +228,8 @@ func (e *Encoder) Start(ctx context.Context, b board.Board, interrupts []string)

utils.ManagedGo(func() {
// Remove the callbacks added by the interrupt stream.
defer utils.UncheckedErrorFunc(func() error { return board.RemoveCallbacks(b, interrupts, ch) })
defer e.A.RemoveCallback(ch)
defer e.B.RemoveCallback(ch)
for {
// This looks redundant with the other select statement below, but it's not: if we're
// supposed to return, we need to do that even if chanA and chanB are full of data, and
Expand All @@ -248,7 +248,7 @@ func (e *Encoder) Start(ctx context.Context, b board.Board, interrupts []string)
case <-e.cancelCtx.Done():
return
case tick = <-ch:
if tick.Name == e.encAName {
if tick.Name == e.A.Name() {
aLevel = 0
if tick.High {
aLevel = 1
Expand Down
Loading

0 comments on commit 9167c20

Please sign in to comment.