Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions components/audioin/audioin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Package audioin defines an audioin component
package audioin

import (
"context"

commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/audioin/v1"

"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
)

func init() {
resource.RegisterAPI(API, resource.APIRegistration[AudioIn]{
RPCServiceServerConstructor: NewRPCServiceServer,
RPCServiceHandler: pb.RegisterAudioInServiceHandlerFromEndpoint,
RPCServiceDesc: &pb.AudioInService_ServiceDesc,
RPCClient: NewClientFromConn,
})
}

// SubtypeName is a constant that identifies the AudioIn resource subtype string.
const SubtypeName = "audio_in"

// API is a variable that identifies the AudioIn resource API.
var API = resource.APINamespaceRDK.WithComponentType(SubtypeName)

// Named is a helper for getting the named AudioIn's typed resource name.
func Named(name string) resource.Name {
return resource.NewName(API, name)
}

// Properties defines properties of an audio in device.
type Properties struct {
SupportedCodecs []string
SampleRate int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SampleRateHz

NumChannels int32
}

// AudioInfo defines information about audio data.
type AudioInfo struct {
Codec string
SampleRate int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SampleRateHz

NumChannels int32
}

// AudioChunk defines a chunk of audio data.
type AudioChunk struct {
AudioData []byte
Info *AudioInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AudioInfo

Sequence int32
StartTimestampNanoseconds int64
EndTimestampNanoseconds int64
RequestID string
}

// AudioIn defines an audioin component.
type AudioIn interface {
resource.Resource
GetAudio(ctx context.Context, codec string, durationSeconds float32, previousTimestamp int64, extra map[string]interface{}) (
chan *AudioChunk, error)
Properties(ctx context.Context, extra map[string]interface{}) (Properties, error)
}

// FromDependencies is a helper for getting the named AudioIn from a collection of
// dependencies.
func FromDependencies(deps resource.Dependencies, name string) (AudioIn, error) {
return resource.FromDependencies[AudioIn](deps, Named(name))
}

// FromRobot is a helper for getting the named AudioIn from the given Robot.
func FromRobot(r robot.Robot, name string) (AudioIn, error) {
return robot.ResourceFromRobot[AudioIn](r, Named(name))
}

// NamesFromRobot is a helper for getting all AudioIn names from the given Robot.
func NamesFromRobot(r robot.Robot) []string {
return robot.NamesByAPI(r, API)
}

func audioChunkToPb(chunk *AudioChunk) *pb.AudioChunk {
if chunk == nil {
return nil
}

var info *commonpb.AudioInfo
if chunk.Info != nil {
info = &commonpb.AudioInfo{
Codec: chunk.Info.Codec,
SampleRate: chunk.Info.SampleRate,
NumChannels: chunk.Info.NumChannels,
}
}

return &pb.AudioChunk{
AudioData: chunk.AudioData,
Info: info,
StartTimestampNanoseconds: chunk.StartTimestampNanoseconds,
EndTimestampNanoseconds: chunk.EndTimestampNanoseconds,
Sequence: chunk.Sequence,
}
}

func audioInfoPBToStruct(pb *commonpb.AudioInfo) *AudioInfo {
return &AudioInfo{
Codec: pb.Codec,
SampleRate: pb.SampleRate,
NumChannels: pb.NumChannels,
}
}
124 changes: 124 additions & 0 deletions components/audioin/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package audioin

import (
"context"
"errors"
"io"

"github.com/google/uuid"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/audioin/v1"
utils "go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
)

// client implements AudioInServiceClient.
type client struct {
resource.Named
resource.TriviallyReconfigurable
resource.TriviallyCloseable
name string
client pb.AudioInServiceClient
logger logging.Logger
}

// NewClientFromConn constructs a new Client from connection passed in.
func NewClientFromConn(
ctx context.Context,
conn rpc.ClientConn,
remoteName string,
name resource.Name,
logger logging.Logger,
) (AudioIn, error) {
c := pb.NewAudioInServiceClient(conn)
return &client{
Named: name.PrependRemote(remoteName).AsNamed(),
name: name.Name,
client: c,
logger: logger,
}, nil
}

func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
return protoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
}

func (c *client) GetAudio(ctx context.Context, codec string, durationSeconds float32, previousTimestamp int64,
extra map[string]interface{}) (chan *AudioChunk, error,
) {
ext, err := utils.StructToStructPb(extra)
if err != nil {
return nil, err
}

stream, err := c.client.GetAudio(ctx, &pb.GetAudioRequest{
Name: c.name,
DurationSeconds: durationSeconds,
Codec: codec,
PreviousTimestamp: previousTimestamp,
RequestId: uuid.New().String(),
Extra: ext,
})
if err != nil {
return nil, err
}

// small buffered channel prevents blocking when receiver is temporarily slow
ch := make(chan *AudioChunk, 8)

go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
c.logger.Debug(ctx.Err())
return
default:
}
resp, err := stream.Recv()
if err != nil {
// EOF error indicates stream was closed by server.
if !errors.Is(err, io.EOF) {
c.logger.Error(err)
}
return
}

var info *AudioInfo
if resp.Audio.Info != nil {
info = audioInfoPBToStruct(resp.Audio.Info)
}

ch <- &AudioChunk{
AudioData: resp.Audio.AudioData,
Info: info,
Sequence: resp.Audio.Sequence,
StartTimestampNanoseconds: resp.Audio.StartTimestampNanoseconds,
EndTimestampNanoseconds: resp.Audio.EndTimestampNanoseconds,
RequestID: resp.RequestId,
}
}
}()

return ch, nil
}

func (c *client) Properties(ctx context.Context, extra map[string]interface{}) (Properties, error) {
ext, err := utils.StructToStructPb(extra)
if err != nil {
return Properties{}, err
}
resp, err := c.client.GetProperties(ctx, &commonpb.GetPropertiesRequest{
Name: c.name,
Extra: ext,
})
if err != nil {
return Properties{}, err
}

return Properties{SupportedCodecs: resp.SupportedCodecs, SampleRate: resp.SampleRate, NumChannels: resp.NumChannels}, nil
}
Loading
Loading