Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions components/audioin/audioin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Package audioin defines an audioin component
package audioin

import (
"context"

commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/audioin/v1"

"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
)

func init() {
resource.RegisterAPI(API, resource.APIRegistration[AudioIn]{
RPCServiceServerConstructor: NewRPCServiceServer,
RPCServiceHandler: pb.RegisterAudioInServiceHandlerFromEndpoint,
RPCServiceDesc: &pb.AudioInService_ServiceDesc,
RPCClient: NewClientFromConn,
})
}

// SubtypeName is a constant that identifies the AudioIn resource subtype string.
const SubtypeName = "audio_in"

// API is a variable that identifies the AudioIn resource API.
var API = resource.APINamespaceRDK.WithComponentType(SubtypeName)

// Named is a helper for getting the named AudioIn's typed resource name.
func Named(name string) resource.Name {
return resource.NewName(API, name)
}

// Properties defines properties of an audio in device.
type Properties struct {
SupportedCodecs []string
SampleRateHz int32
NumChannels int32
}

// AudioInfo defines information about audio data.
type AudioInfo struct {
Codec string
SampleRateHz int32
NumChannels int32
}

// AudioChunk defines a chunk of audio data.
type AudioChunk struct {
AudioData []byte
AudioInfo *AudioInfo
Sequence int32
StartTimestampNanoseconds int64
EndTimestampNanoseconds int64
RequestID string
}

// AudioIn defines an audioin component.
type AudioIn interface {
resource.Resource
GetAudio(ctx context.Context, codec string, durationSeconds float32, previousTimestamp int64, extra map[string]interface{}) (
chan *AudioChunk, error)
Properties(ctx context.Context, extra map[string]interface{}) (Properties, error)
}

// FromDependencies is a helper for getting the named AudioIn from a collection of
// dependencies.
func FromDependencies(deps resource.Dependencies, name string) (AudioIn, error) {
return resource.FromDependencies[AudioIn](deps, Named(name))
}

// FromRobot is a helper for getting the named AudioIn from the given Robot.
func FromRobot(r robot.Robot, name string) (AudioIn, error) {
return robot.ResourceFromRobot[AudioIn](r, Named(name))
}

// NamesFromRobot is a helper for getting all AudioIn names from the given Robot.
func NamesFromRobot(r robot.Robot) []string {
return robot.NamesByAPI(r, API)
}

func audioChunkToPb(chunk *AudioChunk) *pb.AudioChunk {
if chunk == nil {
return nil
}

var info *commonpb.AudioInfo
if chunk.AudioInfo != nil {
info = &commonpb.AudioInfo{
Codec: chunk.AudioInfo.Codec,
SampleRateHz: chunk.AudioInfo.SampleRateHz,
NumChannels: chunk.AudioInfo.NumChannels,
}
}

return &pb.AudioChunk{
AudioData: chunk.AudioData,
AudioInfo: info,
StartTimestampNanoseconds: chunk.StartTimestampNanoseconds,
EndTimestampNanoseconds: chunk.EndTimestampNanoseconds,
Sequence: chunk.Sequence,
}
}

func audioInfoPBToStruct(pb *commonpb.AudioInfo) *AudioInfo {
return &AudioInfo{
Codec: pb.Codec,
SampleRateHz: pb.SampleRateHz,
NumChannels: pb.NumChannels,
}
}
149 changes: 149 additions & 0 deletions components/audioin/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package audioin

import (
"context"
"errors"
"io"

"github.com/google/uuid"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/audioin/v1"
utils "go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
)

// client implements AudioInServiceClient.
type client struct {
resource.Named
resource.TriviallyReconfigurable
resource.TriviallyCloseable
name string
client pb.AudioInServiceClient
logger logging.Logger
}

// NewClientFromConn constructs a new Client from connection passed in.
func NewClientFromConn(
ctx context.Context,
conn rpc.ClientConn,
remoteName string,
name resource.Name,
logger logging.Logger,
) (AudioIn, error) {
serviceClient := pb.NewAudioInServiceClient(conn)
return &client{
Named: name.PrependRemote(remoteName).AsNamed(),
name: name.Name,
client: serviceClient,
logger: logger,
}, nil
}

func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
return protoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
}

func (c *client) GetAudio(ctx context.Context, codec string, durationSeconds float32, previousTimestamp int64,
extra map[string]interface{}) (chan *AudioChunk, error,
) {
ext, err := utils.StructToStructPb(extra)
if err != nil {
return nil, err
}

// This only sets up the stream,it doesn't send the request to the server yet
// The actual RPC call happens on first Recv()
stream, err := c.client.GetAudio(ctx, &pb.GetAudioRequest{
Name: c.name,
DurationSeconds: durationSeconds,
Codec: codec,
PreviousTimestampNanoseconds: previousTimestamp,
RequestId: uuid.New().String(),
Extra: ext,
})
if err != nil {
return nil, err
}

// receive one chunk outside of the goroutine to catch any errors
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized I had a bug with client error handling so this is different than first review

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receive one chunk outside of the goroutine to catch any errors is the goroutine itself not robust to certain errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time the goroutine starts this function has already returned the channel, so we need to check here to be able to return an error from this function. The goroutine will log an error and return if any error occurs.

Copy link
Member

@hexbabe hexbabe Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so it's more about surfacing the error to the user more directly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

resp, err := stream.Recv()
if err != nil {
return nil, err
}

// small buffered channel prevents blocking when receiver is temporarily slow
ch := make(chan *AudioChunk, 8)

go func() {
defer close(ch)

// Send the first response we already received
var info *AudioInfo
if resp.Audio.AudioInfo != nil {
info = audioInfoPBToStruct(resp.Audio.AudioInfo)
}

ch <- &AudioChunk{
AudioData: resp.Audio.AudioData,
AudioInfo: info,
Sequence: resp.Audio.Sequence,
StartTimestampNanoseconds: resp.Audio.StartTimestampNanoseconds,
EndTimestampNanoseconds: resp.Audio.EndTimestampNanoseconds,
RequestID: resp.RequestId,
}

// Continue receiving the rest of the stream
for {
select {
case <-ctx.Done():
c.logger.Debugf("context done, returning from GetAudio: %v", ctx.Err())
return
default:
}
resp, err := stream.Recv()
if err != nil {
// EOF error indicates stream was closed by server.
if !errors.Is(err, io.EOF) {
c.logger.Error(err)
}
return
}

var info *AudioInfo
if resp.Audio.AudioInfo != nil {
info = audioInfoPBToStruct(resp.Audio.AudioInfo)
}

ch <- &AudioChunk{
AudioData: resp.Audio.AudioData,
AudioInfo: info,
Sequence: resp.Audio.Sequence,
StartTimestampNanoseconds: resp.Audio.StartTimestampNanoseconds,
EndTimestampNanoseconds: resp.Audio.EndTimestampNanoseconds,
RequestID: resp.RequestId,
}
}
}()

return ch, nil
}

func (c *client) Properties(ctx context.Context, extra map[string]interface{}) (Properties, error) {
ext, err := utils.StructToStructPb(extra)
if err != nil {
return Properties{}, err
}
resp, err := c.client.GetProperties(ctx, &commonpb.GetPropertiesRequest{
Name: c.name,
Extra: ext,
})
if err != nil {
return Properties{}, err
}

return Properties{SupportedCodecs: resp.SupportedCodecs, SampleRateHz: resp.SampleRateHz, NumChannels: resp.NumChannels}, nil
}
Loading
Loading