diff --git a/codec.go b/codec.go new file mode 100644 index 00000000..e793a1ce --- /dev/null +++ b/codec.go @@ -0,0 +1,117 @@ +package mediadevices + +import ( + "errors" + "fmt" + "strings" + + "github.com/pion/mediadevices/pkg/codec" + "github.com/pion/mediadevices/pkg/io/audio" + "github.com/pion/mediadevices/pkg/io/video" + "github.com/pion/mediadevices/pkg/prop" + "github.com/pion/webrtc/v2" +) + +// CodecSelector is a container of video and audio encoder builders, which later will be used +// for codec matching. +type CodecSelector struct { + videoEncoders []codec.VideoEncoderBuilder + audioEncoders []codec.AudioEncoderBuilder +} + +// CodecSelectorOption is a type for specifying CodecSelector options +type CodecSelectorOption func(*CodecSelector) + +// WithVideoEncoders replace current video codecs with listed encoders +func WithVideoEncoders(encoders ...codec.VideoEncoderBuilder) CodecSelectorOption { + return func(t *CodecSelector) { + t.videoEncoders = encoders + } +} + +// WithVideoEncoders replace current audio codecs with listed encoders +func WithAudioEncoders(encoders ...codec.AudioEncoderBuilder) CodecSelectorOption { + return func(t *CodecSelector) { + t.audioEncoders = encoders + } +} + +// NewCodecSelector constructs CodecSelector with given variadic options +func NewCodecSelector(opts ...CodecSelectorOption) *CodecSelector { + var track CodecSelector + + for _, opt := range opts { + opt(&track) + } + + return &track +} + +// Populate lets the webrtc engine be aware of supported codecs that are contained in CodecSelector +func (selector *CodecSelector) Populate(setting *webrtc.MediaEngine) { + for _, encoder := range selector.videoEncoders { + setting.RegisterCodec(encoder.RTPCodec().RTPCodec) + } + + for _, encoder := range selector.audioEncoders { + setting.RegisterCodec(encoder.RTPCodec().RTPCodec) + } +} + +func (selector *CodecSelector) selectVideoCodec(wantCodecs []*webrtc.RTPCodec, reader video.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) { + var selectedEncoder codec.VideoEncoderBuilder + var encodedReader codec.ReadCloser + var errReasons []string + var err error + +outer: + for _, wantCodec := range wantCodecs { + name := wantCodec.Name + for _, encoder := range selector.videoEncoders { + if encoder.RTPCodec().Name == name { + encodedReader, err = encoder.BuildVideoEncoder(reader, inputProp) + if err == nil { + selectedEncoder = encoder + break outer + } + } + + errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err)) + } + } + + if selectedEncoder == nil { + return nil, nil, errors.New(strings.Join(errReasons, "\n\n")) + } + + return encodedReader, selectedEncoder.RTPCodec(), nil +} + +func (selector *CodecSelector) selectAudioCodec(wantCodecs []*webrtc.RTPCodec, reader audio.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) { + var selectedEncoder codec.AudioEncoderBuilder + var encodedReader codec.ReadCloser + var errReasons []string + var err error + +outer: + for _, wantCodec := range wantCodecs { + name := wantCodec.Name + for _, encoder := range selector.audioEncoders { + if encoder.RTPCodec().Name == name { + encodedReader, err = encoder.BuildAudioEncoder(reader, inputProp) + if err == nil { + selectedEncoder = encoder + break outer + } + } + + errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err)) + } + } + + if selectedEncoder == nil { + return nil, nil, errors.New(strings.Join(errReasons, "\n\n")) + } + + return encodedReader, selectedEncoder.RTPCodec(), nil +} diff --git a/examples/go.mod b/examples/go.mod index 592be604..9ea97408 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -2,8 +2,11 @@ module github.com/pion/mediadevices/examples go 1.14 -// Please don't commit require entries of examples. -// `git checkout master examples/go.mod` to revert this file. -require github.com/pion/mediadevices v0.0.0 +require ( + // Please don't commit require entries of examples. + // `git checkout master examples/go.mod` to revert this file. + github.com/pion/mediadevices v0.0.0 + github.com/pion/webrtc/v2 v2.2.26 +) replace github.com/pion/mediadevices v0.0.0 => ../ diff --git a/examples/http/main.go b/examples/http/main.go new file mode 100644 index 00000000..cad18fd0 --- /dev/null +++ b/examples/http/main.go @@ -0,0 +1,77 @@ +// This is an example of using mediadevices to broadcast your camera through http. +// The example doesn't aim to be performant, but rather it strives to be simple. +package main + +import ( + "bytes" + "fmt" + "image/jpeg" + "io" + "log" + "mime/multipart" + "net/http" + "net/textproto" + + "github.com/pion/mediadevices" + "github.com/pion/mediadevices/pkg/prop" + + // Note: If you don't have a camera or microphone or your adapters are not supported, + // you can always swap your adapters with our dummy adapters below. + // _ "github.com/pion/mediadevices/pkg/driver/videotest" + _ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter +) + +func must(err error) { + if err != nil { + panic(err) + } +} + +func main() { + s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ + Video: func(constraint *mediadevices.MediaTrackConstraints) { + constraint.Width = prop.Int(600) + constraint.Height = prop.Int(400) + }, + }) + must(err) + + t := s.GetVideoTracks()[0] + videoTrack := t.(*mediadevices.VideoTrack) + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + var buf bytes.Buffer + videoReader := videoTrack.NewReader(false) + mimeWriter := multipart.NewWriter(w) + + contentType := fmt.Sprintf("multipart/x-mixed-replace;boundary=%s", mimeWriter.Boundary()) + w.Header().Add("Content-Type", contentType) + + partHeader := make(textproto.MIMEHeader) + partHeader.Add("Content-Type", "image/jpeg") + + for { + frame, release, err := videoReader.Read() + if err == io.EOF { + return + } + must(err) + + err = jpeg.Encode(&buf, frame, nil) + // Since we're done with img, we need to release img so that that the original owner can reuse + // this memory. + release() + must(err) + + partWriter, err := mimeWriter.CreatePart(partHeader) + must(err) + + _, err = partWriter.Write(buf.Bytes()) + buf.Reset() + must(err) + } + }) + + fmt.Println("listening on http://localhost:1313") + log.Println(http.ListenAndServe("localhost:1313", nil)) +} diff --git a/examples/webrtc/main.go b/examples/webrtc/main.go index e8372f08..b174c633 100644 --- a/examples/webrtc/main.go +++ b/examples/webrtc/main.go @@ -5,26 +5,23 @@ import ( "github.com/pion/mediadevices" "github.com/pion/mediadevices/examples/internal/signal" - "github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/prop" "github.com/pion/webrtc/v2" - // This is required to use opus audio encoder - "github.com/pion/mediadevices/pkg/codec/opus" - // If you don't like vpx, you can also use x264 by importing as below // "github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder // or you can also use openh264 for alternative h264 implementation // "github.com/pion/mediadevices/pkg/codec/openh264" - "github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder + "github.com/pion/mediadevices/pkg/codec/openh264" // This is required to use VP8/VP9 video encoder + "github.com/pion/mediadevices/pkg/codec/opus" // This is required to use VP8/VP9 video encoder // Note: If you don't have a camera or microphone or your adapters are not supported, // you can always swap your adapters with our dummy adapters below. // _ "github.com/pion/mediadevices/pkg/driver/videotest" // _ "github.com/pion/mediadevices/pkg/driver/audiotest" - _ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter - _ "github.com/pion/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter + _ "github.com/pion/mediadevices/pkg/driver/audiotest" + _ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter ) const ( @@ -61,44 +58,48 @@ func main() { fmt.Printf("Connection State has changed %s \n", connectionState.String()) }) - md := mediadevices.NewMediaDevices(peerConnection) - - opusParams, err := opus.NewParams() + vp8Params, err := openh264.NewParams() if err != nil { panic(err) } - opusParams.BitRate = 32000 // 32kbps + vp8Params.BitRate = 300_000 // 300kbps - vp8Params, err := vpx.NewVP8Params() + opusParams, err := opus.NewParams() if err != nil { panic(err) } - vp8Params.BitRate = 100000 // 100kbps + codecSelector := mediadevices.NewCodecSelector( + mediadevices.WithVideoEncoders(&vp8Params), + mediadevices.WithAudioEncoders(&opusParams), + ) - s, err := md.GetUserMedia(mediadevices.MediaStreamConstraints{ - Audio: func(c *mediadevices.MediaTrackConstraints) { - c.Enabled = true - c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&opusParams} - }, + s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ Video: func(c *mediadevices.MediaTrackConstraints) { c.FrameFormat = prop.FrameFormat(frame.FormatYUY2) - c.Enabled = true c.Width = prop.Int(640) c.Height = prop.Int(480) - c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&vp8Params} }, + Audio: func(c *mediadevices.MediaTrackConstraints) { + }, + Codec: codecSelector, }) if err != nil { panic(err) } for _, tracker := range s.GetTracks() { - t := tracker.Track() tracker.OnEnded(func(err error) { - fmt.Printf("Track (ID: %s, Label: %s) ended with error: %v\n", - t.ID(), t.Label(), err) + fmt.Printf("Track (ID: %s) ended with error: %v\n", + tracker.ID(), err) }) - _, err = peerConnection.AddTransceiverFromTrack(t, + + // In Pion/webrtc v3, bind will be called automatically after SDP negotiation + webrtcTrack, err := tracker.Bind(peerConnection) + if err != nil { + panic(err) + } + + _, err = peerConnection.AddTransceiverFromTrack(webrtcTrack, webrtc.RtpTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionSendonly, }, diff --git a/mediadevices.go b/mediadevices.go index a028631a..8dbb0202 100644 --- a/mediadevices.go +++ b/mediadevices.go @@ -7,95 +7,26 @@ import ( "github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/webrtc/v2" ) var errNotFound = fmt.Errorf("failed to find the best driver that fits the constraints") -// MediaDevices is an interface that's defined on https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices -type MediaDevices interface { - GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) - GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) - EnumerateDevices() []MediaDeviceInfo -} - -// NewMediaDevices creates MediaDevices interface that provides access to connected media input devices -// like cameras and microphones, as well as screen sharing. -// In essence, it lets you obtain access to any hardware source of media data. -func NewMediaDevices(pc *webrtc.PeerConnection, opts ...MediaDevicesOption) MediaDevices { - codecs := make(map[webrtc.RTPCodecType][]*webrtc.RTPCodec) - for _, kind := range []webrtc.RTPCodecType{ - webrtc.RTPCodecTypeAudio, - webrtc.RTPCodecTypeVideo, - } { - codecs[kind] = pc.GetRegisteredRTPCodecs(kind) - } - return NewMediaDevicesFromCodecs(codecs, opts...) -} - -// NewMediaDevicesFromCodecs creates MediaDevices interface from lists of the available codecs -// that provides access to connected media input devices like cameras and microphones, -// as well as screen sharing. -// In essence, it lets you obtain access to any hardware source of media data. -func NewMediaDevicesFromCodecs(codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec, opts ...MediaDevicesOption) MediaDevices { - mdo := MediaDevicesOptions{ - codecs: codecs, - trackGenerator: defaultTrackGenerator, - } - for _, o := range opts { - o(&mdo) - } - return &mediaDevices{ - MediaDevicesOptions: mdo, - } -} - -// TrackGenerator is a function to create new track. -type TrackGenerator func(payloadType uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error) - -var defaultTrackGenerator = TrackGenerator(func(pt uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error) { - return webrtc.NewTrack(pt, ssrc, id, label, codec) -}) - -type mediaDevices struct { - MediaDevicesOptions -} - -// MediaDevicesOptions stores parameters used by MediaDevices. -type MediaDevicesOptions struct { - codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec - trackGenerator TrackGenerator -} - -// MediaDevicesOption is a type of MediaDevices functional option. -type MediaDevicesOption func(*MediaDevicesOptions) - -// WithTrackGenerator specifies a TrackGenerator to use customized track. -func WithTrackGenerator(gen TrackGenerator) MediaDevicesOption { - return func(o *MediaDevicesOptions) { - o.trackGenerator = gen - } -} - // GetDisplayMedia prompts the user to select and grant permission to capture the contents // of a display or portion thereof (such as a window) as a MediaStream. // Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getDisplayMedia -func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) { - trackers := make([]Tracker, 0) +func GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) { + trackers := make([]Track, 0) cleanTrackers := func() { for _, t := range trackers { - t.Stop() + t.Close() } } var videoConstraints MediaTrackConstraints if constraints.Video != nil { constraints.Video(&videoConstraints) - } - - if videoConstraints.Enabled { - tracker, err := m.selectScreen(videoConstraints) + tracker, err := selectScreen(videoConstraints, constraints.Codec) if err != nil { cleanTrackers() return nil, err @@ -116,27 +47,20 @@ func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (Medi // GetUserMedia prompts the user for permission to use a media input which produces a MediaStream // with tracks containing the requested types of media. // Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia -func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) { +func GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) { // TODO: It should return media stream based on constraints - trackers := make([]Tracker, 0) + trackers := make([]Track, 0) cleanTrackers := func() { for _, t := range trackers { - t.Stop() + t.Close() } } var videoConstraints, audioConstraints MediaTrackConstraints if constraints.Video != nil { constraints.Video(&videoConstraints) - } - - if constraints.Audio != nil { - constraints.Audio(&audioConstraints) - } - - if videoConstraints.Enabled { - tracker, err := m.selectVideo(videoConstraints) + tracker, err := selectVideo(videoConstraints, constraints.Codec) if err != nil { cleanTrackers() return nil, err @@ -145,8 +69,9 @@ func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaSt trackers = append(trackers, tracker) } - if audioConstraints.Enabled { - tracker, err := m.selectAudio(audioConstraints) + if constraints.Audio != nil { + constraints.Audio(&audioConstraints) + tracker, err := selectAudio(audioConstraints, constraints.Codec) if err != nil { cleanTrackers() return nil, err @@ -240,7 +165,7 @@ func selectBestDriver(filter driver.FilterFn, constraints MediaTrackConstraints) return bestDriver, constraints, nil } -func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker, error) { +func selectAudio(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) { typeFilter := driver.FilterAudioRecorder() d, c, err := selectBestDriver(typeFilter, constraints) @@ -248,9 +173,9 @@ func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker, return nil, err } - return newTrack(&m.MediaDevicesOptions, d, c) + return newTrackFromDriver(d, c, selector) } -func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker, error) { +func selectVideo(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) { typeFilter := driver.FilterVideoRecorder() notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen)) filter := driver.FilterAnd(typeFilter, notScreenFilter) @@ -260,10 +185,10 @@ func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker, return nil, err } - return newTrack(&m.MediaDevicesOptions, d, c) + return newTrackFromDriver(d, c, selector) } -func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker, error) { +func selectScreen(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) { typeFilter := driver.FilterVideoRecorder() screenFilter := driver.FilterDeviceType(driver.Screen) filter := driver.FilterAnd(typeFilter, screenFilter) @@ -273,10 +198,10 @@ func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker, return nil, err } - return newTrack(&m.MediaDevicesOptions, d, c) + return newTrackFromDriver(d, c, selector) } -func (m *mediaDevices) EnumerateDevices() []MediaDeviceInfo { +func EnumerateDevices() []MediaDeviceInfo { drivers := driver.GetManager().Query( driver.FilterFn(func(driver.Driver) bool { return true })) info := make([]MediaDeviceInfo, 0, len(drivers)) diff --git a/mediadevices_test.go b/mediadevices_test.go index 834f49b9..28a86821 100644 --- a/mediadevices_test.go +++ b/mediadevices_test.go @@ -1,91 +1,42 @@ package mediadevices import ( - "errors" "io" "testing" "time" - "github.com/pion/webrtc/v2" - "github.com/pion/webrtc/v2/pkg/media" - - "github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/driver" _ "github.com/pion/mediadevices/pkg/driver/audiotest" _ "github.com/pion/mediadevices/pkg/driver/videotest" - "github.com/pion/mediadevices/pkg/io/audio" - "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/prop" ) func TestGetUserMedia(t *testing.T) { - videoParams := mockParams{ - BaseParams: codec.BaseParams{ - BitRate: 100000, - }, - name: "MockVideo", - } - audioParams := mockParams{ - BaseParams: codec.BaseParams{ - BitRate: 32000, - }, - name: "MockAudio", - } - md := NewMediaDevicesFromCodecs( - map[webrtc.RTPCodecType][]*webrtc.RTPCodec{ - webrtc.RTPCodecTypeVideo: { - {Type: webrtc.RTPCodecTypeVideo, Name: "MockVideo", PayloadType: 1}, - }, - webrtc.RTPCodecTypeAudio: { - {Type: webrtc.RTPCodecTypeAudio, Name: "MockAudio", PayloadType: 2}, - }, - }, - WithTrackGenerator( - func(_ uint8, _ uint32, id, _ string, codec *webrtc.RTPCodec) ( - LocalTrack, error, - ) { - return newMockTrack(codec, id), nil - }, - ), - ) constraints := MediaStreamConstraints{ Video: func(c *MediaTrackConstraints) { - c.Enabled = true c.Width = prop.Int(640) c.Height = prop.Int(480) - params := videoParams - c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{¶ms} }, Audio: func(c *MediaTrackConstraints) { - c.Enabled = true - params := audioParams - c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{¶ms} }, } constraintsWrong := MediaStreamConstraints{ Video: func(c *MediaTrackConstraints) { - c.Enabled = true - c.Width = prop.Int(640) + c.Width = prop.IntExact(10000) c.Height = prop.Int(480) - params := videoParams - params.BitRate = 0 - c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{¶ms} }, Audio: func(c *MediaTrackConstraints) { - c.Enabled = true - params := audioParams - c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{¶ms} }, } // GetUserMedia with broken parameters - ms, err := md.GetUserMedia(constraintsWrong) + ms, err := GetUserMedia(constraintsWrong) if err == nil { t.Fatal("Expected error, but got nil") } // GetUserMedia with correct parameters - ms, err = md.GetUserMedia(constraints) + ms, err = GetUserMedia(constraints) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -103,11 +54,11 @@ func TestGetUserMedia(t *testing.T) { time.Sleep(50 * time.Millisecond) for _, track := range tracks { - track.Stop() + track.Close() } // Stop and retry GetUserMedia - ms, err = md.GetUserMedia(constraints) + ms, err = GetUserMedia(constraints) if err != nil { t.Fatalf("Failed to GetUserMedia after the previsous tracks stopped: %v", err) } @@ -124,105 +75,9 @@ func TestGetUserMedia(t *testing.T) { } time.Sleep(50 * time.Millisecond) for _, track := range tracks { - track.Stop() - } -} - -type mockTrack struct { - codec *webrtc.RTPCodec - id string -} - -func newMockTrack(codec *webrtc.RTPCodec, id string) *mockTrack { - return &mockTrack{ - codec: codec, - id: id, - } -} - -func (t *mockTrack) WriteSample(s media.Sample) error { - return nil -} - -func (t *mockTrack) Codec() *webrtc.RTPCodec { - return t.codec -} - -func (t *mockTrack) ID() string { - return t.id -} - -func (t *mockTrack) Kind() webrtc.RTPCodecType { - return t.codec.Type -} - -type mockParams struct { - codec.BaseParams - name string -} - -func (params *mockParams) RTPCodec() *codec.RTPCodec { - rtpCodec := codec.NewRTPH264Codec(90000) - rtpCodec.Name = params.name - return rtpCodec -} - -func (params *mockParams) BuildVideoEncoder(r video.Reader, p prop.Media) (codec.ReadCloser, error) { - if params.BitRate == 0 { - // This is a dummy error to test the failure condition. - return nil, errors.New("wrong codec parameter") - } - return &mockVideoCodec{ - r: r, - closed: make(chan struct{}), - }, nil -} - -func (params *mockParams) BuildAudioEncoder(r audio.Reader, p prop.Media) (codec.ReadCloser, error) { - return &mockAudioCodec{ - r: r, - closed: make(chan struct{}), - }, nil -} - -type mockCodec struct{} - -func (e *mockCodec) SetBitRate(b int) error { - return nil -} - -func (e *mockCodec) ForceKeyFrame() error { - return nil -} - -type mockVideoCodec struct { - mockCodec - r video.Reader - closed chan struct{} -} - -func (m *mockVideoCodec) Read() ([]byte, func(), error) { - if _, _, err := m.r.Read(); err != nil { - return nil, func() {}, err - } - return make([]byte, 20), func() {}, nil -} - -func (m *mockVideoCodec) Close() error { return nil } - -type mockAudioCodec struct { - mockCodec - r audio.Reader - closed chan struct{} -} - -func (m *mockAudioCodec) Read() ([]byte, func(), error) { - if _, _, err := m.r.Read(); err != nil { - return nil, func() {}, err + track.Close() } - return make([]byte, 20), func() {}, nil } -func (m *mockAudioCodec) Close() error { return nil } func TestSelectBestDriverConstraintsResultIsSetProperly(t *testing.T) { filterFn := driver.FilterVideoRecorder() diff --git a/mediastream.go b/mediastream.go index 6b99239b..5ddb62d1 100644 --- a/mediastream.go +++ b/mediastream.go @@ -7,80 +7,80 @@ import ( // MediaStream is an interface that represents a collection of existing tracks. type MediaStream interface { // GetAudioTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getaudiotracks - GetAudioTracks() []Tracker + GetAudioTracks() []Track // GetVideoTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getvideotracks - GetVideoTracks() []Tracker + GetVideoTracks() []Track // GetTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-gettracks - GetTracks() []Tracker + GetTracks() []Track // AddTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-addtrack - AddTrack(t Tracker) + AddTrack(t Track) // RemoveTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-removetrack - RemoveTrack(t Tracker) + RemoveTrack(t Track) } type mediaStream struct { - trackers map[Tracker]struct{} - l sync.RWMutex + tracks map[Track]struct{} + l sync.RWMutex } const trackTypeDefault MediaDeviceType = 0 // NewMediaStream creates a MediaStream interface that's defined in // https://w3c.github.io/mediacapture-main/#dom-mediastream -func NewMediaStream(trackers ...Tracker) (MediaStream, error) { - m := mediaStream{trackers: make(map[Tracker]struct{})} +func NewMediaStream(tracks ...Track) (MediaStream, error) { + m := mediaStream{tracks: make(map[Track]struct{})} - for _, tracker := range trackers { - if _, ok := m.trackers[tracker]; !ok { - m.trackers[tracker] = struct{}{} + for _, track := range tracks { + if _, ok := m.tracks[track]; !ok { + m.tracks[track] = struct{}{} } } return &m, nil } -func (m *mediaStream) GetAudioTracks() []Tracker { +func (m *mediaStream) GetAudioTracks() []Track { return m.queryTracks(AudioInput) } -func (m *mediaStream) GetVideoTracks() []Tracker { +func (m *mediaStream) GetVideoTracks() []Track { return m.queryTracks(VideoInput) } -func (m *mediaStream) GetTracks() []Tracker { +func (m *mediaStream) GetTracks() []Track { return m.queryTracks(trackTypeDefault) } // queryTracks returns all tracks that are the same kind as t. // If t is 0, which is the default, queryTracks will return all the tracks. -func (m *mediaStream) queryTracks(t MediaDeviceType) []Tracker { +func (m *mediaStream) queryTracks(t MediaDeviceType) []Track { m.l.RLock() defer m.l.RUnlock() - result := make([]Tracker, 0) - for tracker := range m.trackers { - if tracker.Kind() == t || t == trackTypeDefault { - result = append(result, tracker) + result := make([]Track, 0) + for track := range m.tracks { + if track.Kind() == t || t == trackTypeDefault { + result = append(result, track) } } return result } -func (m *mediaStream) AddTrack(t Tracker) { +func (m *mediaStream) AddTrack(t Track) { m.l.Lock() defer m.l.Unlock() - if _, ok := m.trackers[t]; ok { + if _, ok := m.tracks[t]; ok { return } - m.trackers[t] = struct{}{} + m.tracks[t] = struct{}{} } -func (m *mediaStream) RemoveTrack(t Tracker) { +func (m *mediaStream) RemoveTrack(t Track) { m.l.Lock() defer m.l.Unlock() - delete(m.trackers, t) + delete(m.tracks, t) } diff --git a/mediastream_test.go b/mediastream_test.go index c86b28be..916f3c5b 100644 --- a/mediastream_test.go +++ b/mediastream_test.go @@ -10,17 +10,14 @@ type mockMediaStreamTrack struct { kind MediaDeviceType } -func (track *mockMediaStreamTrack) Track() *webrtc.Track { - return nil +func (track *mockMediaStreamTrack) ID() string { + return "" } -func (track *mockMediaStreamTrack) LocalTrack() LocalTrack { +func (track *mockMediaStreamTrack) Close() error { return nil } -func (track *mockMediaStreamTrack) Stop() { -} - func (track *mockMediaStreamTrack) Kind() MediaDeviceType { return track.kind } @@ -28,8 +25,16 @@ func (track *mockMediaStreamTrack) Kind() MediaDeviceType { func (track *mockMediaStreamTrack) OnEnded(handler func(error)) { } +func (track *mockMediaStreamTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) { + return nil, nil +} + +func (track *mockMediaStreamTrack) Unbind(pc *webrtc.PeerConnection) error { + return nil +} + func TestMediaStreamFilters(t *testing.T) { - audioTracks := []Tracker{ + audioTracks := []Track{ &mockMediaStreamTrack{AudioInput}, &mockMediaStreamTrack{AudioInput}, &mockMediaStreamTrack{AudioInput}, @@ -37,7 +42,7 @@ func TestMediaStreamFilters(t *testing.T) { &mockMediaStreamTrack{AudioInput}, } - videoTracks := []Tracker{ + videoTracks := []Track{ &mockMediaStreamTrack{VideoInput}, &mockMediaStreamTrack{VideoInput}, &mockMediaStreamTrack{VideoInput}, @@ -49,7 +54,7 @@ func TestMediaStreamFilters(t *testing.T) { t.Fatal(err) } - expect := func(t *testing.T, actual, expected []Tracker) { + expect := func(t *testing.T, actual, expected []Track) { if len(actual) != len(expected) { t.Fatalf("%s: Expected to get %d trackers, but got %d trackers", t.Name(), len(expected), len(actual)) } diff --git a/mediastreamconstraints.go b/mediastreamconstraints.go index 8e3e4292..4ff1703b 100644 --- a/mediastreamconstraints.go +++ b/mediastreamconstraints.go @@ -1,40 +1,18 @@ package mediadevices import ( - "github.com/pion/mediadevices/pkg/codec" - "github.com/pion/mediadevices/pkg/io/audio" - "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/prop" ) type MediaStreamConstraints struct { Audio MediaOption Video MediaOption + Codec *CodecSelector } // MediaTrackConstraints represents https://w3c.github.io/mediacapture-main/#dom-mediatrackconstraints type MediaTrackConstraints struct { prop.MediaConstraints - Enabled bool - // VideoEncoderBuilders are codec builders that are used for encoding the video - // and later being used for sending the appropriate RTP payload type. - // - // If one encoder builder fails to build the codec, the next builder will be used, - // repeating until a codec builds. If no builders build successfully, an error is returned. - VideoEncoderBuilders []codec.VideoEncoderBuilder - // AudioEncoderBuilders are codec builders that are used for encoding the audio - // and later being used for sending the appropriate RTP payload type. - // - // If one encoder builder fails to build the codec, the next builder will be used, - // repeating until a codec builds. If no builders build successfully, an error is returned. - AudioEncoderBuilders []codec.AudioEncoderBuilder - // VideoTransform will be used to transform the video that's coming from the driver. - // So, basically it'll look like following: driver -> VideoTransform -> codec - VideoTransform video.TransformFunc - // AudioTransform will be used to transform the audio that's coming from the driver. - // So, basically it'll look like following: driver -> AudioTransform -> code - AudioTransform audio.TransformFunc - selectedMedia prop.Media } diff --git a/sampler.go b/sampler.go index ad4b263a..513e5f2e 100644 --- a/sampler.go +++ b/sampler.go @@ -4,6 +4,7 @@ import ( "math" "time" + "github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2/pkg/media" ) @@ -11,7 +12,7 @@ type samplerFunc func(b []byte) error // newVideoSampler creates a video sampler that uses the actual video frame rate and // the codec's clock rate to come up with a duration for each sample. -func newVideoSampler(t LocalTrack) samplerFunc { +func newVideoSampler(t *webrtc.Track) samplerFunc { clockRate := float64(t.Codec().ClockRate) lastTimestamp := time.Now() @@ -27,7 +28,7 @@ func newVideoSampler(t LocalTrack) samplerFunc { // newAudioSampler creates a audio sampler that uses a fixed latency and // the codec's clock rate to come up with a duration for each sample. -func newAudioSampler(t LocalTrack, latency time.Duration) samplerFunc { +func newAudioSampler(t *webrtc.Track, latency time.Duration) samplerFunc { samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds())) return samplerFunc(func(b []byte) error { return t.WriteSample(media.Sample{Data: b, Samples: samples}) diff --git a/source.go b/source.go new file mode 100644 index 00000000..c8b8f545 --- /dev/null +++ b/source.go @@ -0,0 +1 @@ +package mediadevices diff --git a/track.go b/track.go index c7b03da4..baa0b3fa 100644 --- a/track.go +++ b/track.go @@ -2,239 +2,326 @@ package mediadevices import ( "errors" + "fmt" + "image" "math/rand" "sync" "github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/driver" + "github.com/pion/mediadevices/pkg/io/audio" + "github.com/pion/mediadevices/pkg/io/video" + "github.com/pion/mediadevices/pkg/wave" "github.com/pion/webrtc/v2" - "github.com/pion/webrtc/v2/pkg/media" ) -// Tracker is an interface that represent MediaStreamTrack +var ( + errInvalidDriverType = errors.New("invalid driver type") + errNotFoundPeerConnection = errors.New("failed to find given peer connection") +) + +// Source is a generic representation of a media source +type Source interface { + ID() string + Close() error +} + +// VideoSource is a specific type of media source that emits a series of video frames +type VideoSource interface { + video.Reader + Source +} + +// AudioSource is a specific type of media source that emits a series of audio chunks +type AudioSource interface { + audio.Reader + Source +} + +// Track is an interface that represent MediaStreamTrack // Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack -type Tracker interface { - Track() *webrtc.Track - LocalTrack() LocalTrack - Stop() - Kind() MediaDeviceType +type Track interface { + Source // OnEnded registers a handler to receive an error from the media stream track. // If the error is already occured before registering, the handler will be // immediately called. OnEnded(func(error)) + Kind() MediaDeviceType + // Bind binds the current track source to the given peer connection. In Pion/webrtc v3, the bind + // call will happen automatically after the SDP negotiation. Users won't need to call this manually. + Bind(*webrtc.PeerConnection) (*webrtc.Track, error) + // Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will + // be called automatically in the future. + Unbind(*webrtc.PeerConnection) error } -type LocalTrack interface { - WriteSample(s media.Sample) error - Codec() *webrtc.RTPCodec - ID() string - Kind() webrtc.RTPCodecType +type baseTrack struct { + Source + err error + onErrorHandler func(error) + mu sync.Mutex + endOnce sync.Once + kind MediaDeviceType + selector *CodecSelector + activePeerConnections map[*webrtc.PeerConnection]chan<- chan<- struct{} } -type track struct { - localTrack LocalTrack - d driver.Driver - sample samplerFunc - encoder codec.ReadCloser +func newBaseTrack(source Source, kind MediaDeviceType, selector *CodecSelector) *baseTrack { + return &baseTrack{ + Source: source, + kind: kind, + selector: selector, + activePeerConnections: make(map[*webrtc.PeerConnection]chan<- chan<- struct{}), + } +} - onErrorHandler func(error) - err error - mu sync.Mutex - endOnce sync.Once - kind MediaDeviceType +// Kind returns track's kind +func (track *baseTrack) Kind() MediaDeviceType { + return track.kind } -func newTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*track, error) { - var encoderBuilders []encoderBuilder - var rtpCodecs []*webrtc.RTPCodec - var buildSampler func(t LocalTrack) samplerFunc - var kind MediaDeviceType - var err error +// OnEnded sets an error handler. When a track has been created and started, if an +// error occurs, handler will get called with the error given to the parameter. +func (track *baseTrack) OnEnded(handler func(error)) { + track.mu.Lock() + track.onErrorHandler = handler + err := track.err + track.mu.Unlock() - err = d.Open() - if err != nil { - return nil, err + if err != nil && handler != nil { + // Already errored. + track.endOnce.Do(func() { + handler(err) + }) } +} - switch r := d.(type) { - case driver.VideoRecorder: - kind = VideoInput - rtpCodecs = opts.codecs[webrtc.RTPCodecTypeVideo] - buildSampler = newVideoSampler - encoderBuilders, err = newVideoEncoderBuilders(r, constraints) - case driver.AudioRecorder: - kind = AudioInput - rtpCodecs = opts.codecs[webrtc.RTPCodecTypeAudio] - buildSampler = func(t LocalTrack) samplerFunc { - return newAudioSampler(t, constraints.selectedMedia.Latency) - } - encoderBuilders, err = newAudioEncoderBuilders(r, constraints) - default: - err = errors.New("newTrack: invalid driver type") +// onError is a callback when an error occurs +func (track *baseTrack) onError(err error) { + track.mu.Lock() + track.err = err + handler := track.onErrorHandler + track.mu.Unlock() + + if handler != nil { + track.endOnce.Do(func() { + handler(err) + }) } +} + +func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sampler func(*webrtc.Track) samplerFunc) (*webrtc.Track, error) { + track.mu.Lock() + defer track.mu.Unlock() + webrtcTrack, err := pc.NewTrack(selectedCodec.PayloadType, rand.Uint32(), track.ID(), selectedCodec.MimeType) if err != nil { - d.Close() return nil, err } - for _, builder := range encoderBuilders { - var matchedRTPCodec *webrtc.RTPCodec - for _, rtpCodec := range rtpCodecs { - if rtpCodec.Name == builder.name { - matchedRTPCodec = rtpCodec - break - } - } + sample := sampler(webrtcTrack) + signalCh := make(chan chan<- struct{}) + track.activePeerConnections[pc] = signalCh - if matchedRTPCodec == nil { - continue - } + fmt.Println("Binding") - localTrack, err := opts.trackGenerator( - matchedRTPCodec.PayloadType, - rand.Uint32(), - d.ID(), - matchedRTPCodec.Type.String(), - matchedRTPCodec, - ) - if err != nil { - continue - } + go func() { + var doneCh chan<- struct{} + defer func() { + encodedReader.Close() - encoder, err := builder.build() - if err != nil { - continue - } + // When there's another call to unbind, it won't block since we mark the signalCh to be closed + close(signalCh) + if doneCh != nil { + close(doneCh) + } + }() - t := track{ - localTrack: localTrack, - sample: buildSampler(localTrack), - d: d, - encoder: encoder, - kind: kind, + for { + select { + case doneCh = <-signalCh: + return + default: + } + + buff, _, err := encodedReader.Read() + if err != nil { + track.onError(err) + return + } + + if err := sample(buff); err != nil { + track.onError(err) + return + } } - go t.start() - return &t, nil - } + }() - d.Close() - return nil, errors.New("newTrack: failed to find a matching codec") + return webrtcTrack, nil } -// Kind returns track's kind -func (t *track) Kind() MediaDeviceType { - return t.kind +func (track *baseTrack) unbind(pc *webrtc.PeerConnection) error { + track.mu.Lock() + defer track.mu.Unlock() + + ch, ok := track.activePeerConnections[pc] + if !ok { + return errNotFoundPeerConnection + } + + doneCh := make(chan struct{}) + ch <- doneCh + <-doneCh + delete(track.activePeerConnections, pc) + return nil } -// OnEnded sets an error handler. When a track has been created and started, if an -// error occurs, handler will get called with the error given to the parameter. -func (t *track) OnEnded(handler func(error)) { - t.mu.Lock() - t.onErrorHandler = handler - err := t.err - t.mu.Unlock() +func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) { + if err := d.Open(); err != nil { + return nil, err + } - if err != nil && handler != nil { - // Already errored. - t.endOnce.Do(func() { - handler(err) - }) + switch recorder := d.(type) { + case driver.VideoRecorder: + return newVideoTrackFromDriver(d, recorder, constraints, selector) + case driver.AudioRecorder: + return newAudioTrackFromDriver(d, recorder, constraints, selector) + default: + panic(errInvalidDriverType) } } -// onError is a callback when an error occurs -func (t *track) onError(err error) { - t.mu.Lock() - t.err = err - handler := t.onErrorHandler - t.mu.Unlock() +// VideoTrack is a specific track type that contains video source which allows multiple readers to access, and manipulate. +type VideoTrack struct { + *baseTrack + *video.Broadcaster +} - if handler != nil { - t.endOnce.Do(func() { - handler(err) - }) - } +// NewVideoTrack constructs a new VideoTrack +func NewVideoTrack(source VideoSource, selector *CodecSelector) Track { + return newVideoTrackFromReader(source, source, selector) } -// start starts the data flow from the driver all the way to the localTrack -func (t *track) start() { - for { - buff, _, err := t.encoder.Read() +func newVideoTrackFromReader(source Source, reader video.Reader, selector *CodecSelector) Track { + base := newBaseTrack(source, VideoInput, selector) + wrappedReader := video.ReaderFunc(func() (img image.Image, release func(), err error) { + img, _, err = reader.Read() if err != nil { - t.onError(err) - return + base.onError(err) } + return img, func() {}, err + }) - if err := t.sample(buff); err != nil { - t.onError(err) - return - } - } -} + // TODO: Allow users to configure broadcaster + broadcaster := video.NewBroadcaster(wrappedReader, nil) -// Stop stops the underlying driver and encoder -func (t *track) Stop() { - t.d.Close() - t.encoder.Close() + return &VideoTrack{ + baseTrack: base, + Broadcaster: broadcaster, + } } -func (t *track) Track() *webrtc.Track { - return t.localTrack.(*webrtc.Track) -} +// newVideoTrackFromDriver is an internal video track creation from driver +func newVideoTrackFromDriver(d driver.Driver, recorder driver.VideoRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) { + reader, err := recorder.VideoRecord(constraints.selectedMedia) + if err != nil { + return nil, err + } -func (t *track) LocalTrack() LocalTrack { - return t.localTrack + return newVideoTrackFromReader(d, reader, selector), nil } -// encoderBuilder is a generic encoder builder that acts as a delegator for codec.VideoEncoderBuilder and -// codec.AudioEncoderBuilder. The idea of having a delegator is to reduce redundant codes that are being -// duplicated for managing video and audio. -type encoderBuilder struct { - name string - build func() (codec.ReadCloser, error) +// Transform transforms the underlying source by applying the given fns in serial order +func (track *VideoTrack) Transform(fns ...video.TransformFunc) { + src := track.Broadcaster.Source() + track.Broadcaster.ReplaceSource(video.Merge(fns...)(src)) } -// newVideoEncoderBuilders transforms video given by VideoRecorder with the video transformer that is passed through -// constraints and create a list of generic encoder builders -func newVideoEncoderBuilders(vr driver.VideoRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) { - r, err := vr.VideoRecord(constraints.selectedMedia) +func (track *VideoTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) { + reader := track.NewReader(false) + inputProp, err := detectCurrentVideoProp(track.Broadcaster) if err != nil { return nil, err } - if constraints.VideoTransform != nil { - r = constraints.VideoTransform(r) + wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeVideo) + fmt.Println(wantCodecs) + fmt.Println(&inputProp) + encodedReader, selectedCodec, err := track.selector.selectVideoCodec(wantCodecs, reader, inputProp) + if err != nil { + return nil, err } - encoderBuilders := make([]encoderBuilder, len(constraints.VideoEncoderBuilders)) - for i, b := range constraints.VideoEncoderBuilders { - encoderBuilders[i].name = b.RTPCodec().Name - encoderBuilders[i].build = func() (codec.ReadCloser, error) { - return b.BuildVideoEncoder(r, constraints.selectedMedia) + return track.bind(pc, encodedReader, selectedCodec, newVideoSampler) +} + +func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error { + return track.unbind(pc) +} + +// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and +// manipulate. +type AudioTrack struct { + *baseTrack + *audio.Broadcaster +} + +// NewAudioTrack constructs a new VideoTrack +func NewAudioTrack(source AudioSource, selector *CodecSelector) Track { + return newAudioTrackFromReader(source, source, selector) +} + +func newAudioTrackFromReader(source Source, reader audio.Reader, selector *CodecSelector) Track { + base := newBaseTrack(source, AudioInput, selector) + wrappedReader := audio.ReaderFunc(func() (chunk wave.Audio, release func(), err error) { + chunk, _, err = reader.Read() + if err != nil { + base.onError(err) } + return chunk, func() {}, err + }) + + // TODO: Allow users to configure broadcaster + broadcaster := audio.NewBroadcaster(wrappedReader, nil) + + return &AudioTrack{ + baseTrack: base, + Broadcaster: broadcaster, } - return encoderBuilders, nil } -// newAudioEncoderBuilders transforms audio given by AudioRecorder with the audio transformer that is passed through -// constraints and create a list of generic encoder builders -func newAudioEncoderBuilders(ar driver.AudioRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) { - r, err := ar.AudioRecord(constraints.selectedMedia) +// newAudioTrackFromDriver is an internal audio track creation from driver +func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) { + reader, err := recorder.AudioRecord(constraints.selectedMedia) if err != nil { return nil, err } - if constraints.AudioTransform != nil { - r = constraints.AudioTransform(r) + return newAudioTrackFromReader(d, reader, selector), nil +} + +// Transform transforms the underlying source by applying the given fns in serial order +func (track *AudioTrack) Transform(fns ...audio.TransformFunc) { + src := track.Broadcaster.Source() + track.Broadcaster.ReplaceSource(audio.Merge(fns...)(src)) +} + +func (track *AudioTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) { + reader := track.NewReader(false) + inputProp, err := detectCurrentAudioProp(track.Broadcaster) + if err != nil { + return nil, err } - encoderBuilders := make([]encoderBuilder, len(constraints.AudioEncoderBuilders)) - for i, b := range constraints.AudioEncoderBuilders { - encoderBuilders[i].name = b.RTPCodec().Name - encoderBuilders[i].build = func() (codec.ReadCloser, error) { - return b.BuildAudioEncoder(r, constraints.selectedMedia) - } + wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeAudio) + encodedReader, selectedCodec, err := track.selector.selectAudioCodec(wantCodecs, reader, inputProp) + if err != nil { + return nil, err } - return encoderBuilders, nil + + return track.bind(pc, encodedReader, selectedCodec, func(t *webrtc.Track) samplerFunc { return newAudioSampler(t, inputProp.Latency) }) +} + +func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error { + return track.unbind(pc) } diff --git a/track_test.go b/track_test.go index baf67c53..18b58697 100644 --- a/track_test.go +++ b/track_test.go @@ -10,7 +10,7 @@ func TestOnEnded(t *testing.T) { errExpected := errors.New("an error") t.Run("ErrorAfterRegister", func(t *testing.T) { - tr := &track{} + tr := &baseTrack{} called := make(chan error, 1) tr.OnEnded(func(error) { @@ -35,7 +35,7 @@ func TestOnEnded(t *testing.T) { }) t.Run("ErrorBeforeRegister", func(t *testing.T) { - tr := &track{} + tr := &baseTrack{} tr.onError(errExpected)