From 559c6a13a18560b6de7c4b19de95d32248f7b888 Mon Sep 17 00:00:00 2001 From: Lukas Herman Date: Wed, 28 Oct 2020 23:12:59 -0700 Subject: [PATCH] Update readers to be memory pool friendly --- mediadevices_test.go | 16 ++++---- meta.go | 4 +- meta_test.go | 16 ++++---- pkg/avfoundation/avfoundation_darwin.go | 6 +-- pkg/codec/codec.go | 2 +- pkg/codec/mmal/mmal.go | 12 +++--- pkg/codec/openh264/openh264.go | 12 +++--- pkg/codec/opus/opus.go | 12 +++--- pkg/codec/vaapi/vp8.go | 43 ++++++++++----------- pkg/codec/vaapi/vp9.go | 37 +++++++++--------- pkg/codec/vpx/vpx.go | 14 +++---- pkg/codec/x264/x264.go | 12 +++--- pkg/driver/audiotest/dummy.go | 6 +-- pkg/driver/camera/camera_darwin.go | 6 +-- pkg/driver/camera/camera_linux.go | 12 +++--- pkg/driver/camera/camera_windows.go | 6 +-- pkg/driver/microphone/microphone_linux.go | 6 +-- pkg/driver/microphone/microphone_windows.go | 8 ++-- pkg/driver/screen/x11_linux.go | 4 +- pkg/driver/videotest/dummy.go | 6 +-- pkg/frame/compressed.go | 5 ++- pkg/frame/frame.go | 6 +-- pkg/frame/yuv.go | 12 +++--- pkg/frame/yuv_cgo.go | 12 +++--- pkg/frame/yuv_nocgo.go | 12 +++--- pkg/frame/yuv_test.go | 6 +-- pkg/io/audio/audio.go | 9 +++-- pkg/io/audio/broadcast.go | 16 ++++---- pkg/io/audio/broadcast_test.go | 10 ++--- pkg/io/audio/buffer.go | 14 +++---- pkg/io/audio/buffer_test.go | 8 ++-- pkg/io/audio/detect.go | 8 ++-- pkg/io/audio/detect_test.go | 8 ++-- pkg/io/audio/mixer.go | 12 +++--- pkg/io/audio/mixer_test.go | 8 ++-- pkg/io/broadcast.go | 4 +- pkg/io/broadcast_test.go | 6 +-- pkg/io/reader.go | 9 +++-- pkg/io/video/broadcast.go | 16 ++++---- pkg/io/video/broadcast_test.go | 10 ++--- pkg/io/video/convert.go | 18 ++++----- pkg/io/video/convert_test.go | 24 ++++++------ pkg/io/video/detect.go | 8 ++-- pkg/io/video/detect_test.go | 14 +++---- pkg/io/video/scale.go | 12 +++--- pkg/io/video/scale_test.go | 12 +++--- pkg/io/video/throttle.go | 8 ++-- pkg/io/video/throttle_test.go | 6 +-- pkg/io/video/video.go | 9 +++-- track.go | 2 +- 50 files changed, 273 insertions(+), 271 deletions(-) diff --git a/mediadevices_test.go b/mediadevices_test.go index 23bee29a..834f49b9 100644 --- a/mediadevices_test.go +++ b/mediadevices_test.go @@ -201,11 +201,11 @@ type mockVideoCodec struct { closed chan struct{} } -func (m *mockVideoCodec) Read() ([]byte, error) { - if _, err := m.r.Read(); err != nil { - return nil, err +func (m *mockVideoCodec) Read() ([]byte, func(), error) { + if _, _, err := m.r.Read(); err != nil { + return nil, func() {}, err } - return make([]byte, 20), nil + return make([]byte, 20), func() {}, nil } func (m *mockVideoCodec) Close() error { return nil } @@ -216,11 +216,11 @@ type mockAudioCodec struct { closed chan struct{} } -func (m *mockAudioCodec) Read() ([]byte, error) { - if _, err := m.r.Read(); err != nil { - return nil, err +func (m *mockAudioCodec) Read() ([]byte, func(), error) { + if _, _, err := m.r.Read(); err != nil { + return nil, func() {}, err } - return make([]byte, 20), nil + return make([]byte, 20), func() {}, nil } func (m *mockAudioCodec) Close() error { return nil } diff --git a/meta.go b/meta.go index 3c5c4029..69098e38 100644 --- a/meta.go +++ b/meta.go @@ -15,7 +15,7 @@ func detectCurrentVideoProp(broadcaster *video.Broadcaster) (prop.Media, error) // in any case. metaReader := broadcaster.NewReader(false) metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader) - _, err := metaReader.Read() + _, _, err := metaReader.Read() return currentProp, err } @@ -29,7 +29,7 @@ func detectCurrentAudioProp(broadcaster *audio.Broadcaster) (prop.Media, error) // in any case. metaReader := broadcaster.NewReader(false) metaReader = audio.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader) - _, err := metaReader.Read() + _, _, err := metaReader.Read() return currentProp, err } diff --git a/meta_test.go b/meta_test.go index 1cb0fa4a..7faee1a1 100644 --- a/meta_test.go +++ b/meta_test.go @@ -17,12 +17,12 @@ func TestDetectCurrentVideoProp(t *testing.T) { second.Pix[0] = 2 isFirst := true - source := video.ReaderFunc(func() (image.Image, error) { + source := video.ReaderFunc(func() (image.Image, func(), error) { if isFirst { isFirst = true - return first, nil + return first, func() {}, nil } else { - return second, nil + return second, func() {}, nil } }) @@ -42,7 +42,7 @@ func TestDetectCurrentVideoProp(t *testing.T) { } reader := broadcaster.NewReader(false) - img, err := reader.Read() + img, _, err := reader.Read() if err != nil { t.Fatal(err) } @@ -65,12 +65,12 @@ func TestDetectCurrentAudioProp(t *testing.T) { second.Data[0] = 2 isFirst := true - source := audio.ReaderFunc(func() (wave.Audio, error) { + source := audio.ReaderFunc(func() (wave.Audio, func(), error) { if isFirst { isFirst = true - return first, nil + return first, func() {}, nil } else { - return second, nil + return second, func() {}, nil } }) @@ -86,7 +86,7 @@ func TestDetectCurrentAudioProp(t *testing.T) { } reader := broadcaster.NewReader(false) - chunk, err := reader.Read() + chunk, _, err := reader.Read() if err != nil { t.Fatal(err) } diff --git a/pkg/avfoundation/avfoundation_darwin.go b/pkg/avfoundation/avfoundation_darwin.go index f33f43a2..d432e896 100644 --- a/pkg/avfoundation/avfoundation_darwin.go +++ b/pkg/avfoundation/avfoundation_darwin.go @@ -110,12 +110,12 @@ func (rc *ReadCloser) dataCb(data []byte) { // Read reads raw data, the format is determined by the media type and property: // - For video, each call will return a frame. // - For audio, each call will return a chunk which its size configured by Latency -func (rc *ReadCloser) Read() ([]byte, error) { +func (rc *ReadCloser) Read() ([]byte, func(), error) { data, ok := <-rc.dataChan if !ok { - return nil, io.EOF + return nil, func() {}, io.EOF } - return data, nil + return data, func() {}, nil } // Close closes the capturing session, and no data will flow anymore diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 846166c2..322d9bca 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -58,7 +58,7 @@ type VideoEncoderBuilder interface { // ReadCloser is an io.ReadCloser with methods for rate limiting: SetBitRate and ForceKeyFrame type ReadCloser interface { - Read() ([]byte, error) + Read() (b []byte, release func(), err error) Close() error // SetBitRate sets current target bitrate, lower bitrate means smaller data will be transmitted // but this also means that the quality will also be lower. diff --git a/pkg/codec/mmal/mmal.go b/pkg/codec/mmal/mmal.go index 40cfa920..576adf30 100644 --- a/pkg/codec/mmal/mmal.go +++ b/pkg/codec/mmal/mmal.go @@ -55,17 +55,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser, return &e, nil } -func (e *encoder) Read() ([]byte, error) { +func (e *encoder) Read() ([]byte, func(), error) { e.mu.Lock() defer e.mu.Unlock() if e.closed { - return nil, io.EOF + return nil, func() {}, io.EOF } - img, err := e.r.Read() + img, _, err := e.r.Read() if err != nil { - return nil, err + return nil, func() {}, err } imgReal := img.(*image.YCbCr) var y, cb, cr C.Slice @@ -79,7 +79,7 @@ func (e *encoder) Read() ([]byte, error) { var encodedBuffer *C.MMAL_BUFFER_HEADER_T status := C.enc_encode(&e.engine, y, cb, cr, &encodedBuffer) if status.code != 0 { - return nil, statusToErr(&status) + return nil, func() {}, statusToErr(&status) } // GoBytes copies the C array to Go slice. After this, it's safe to release the C array @@ -87,7 +87,7 @@ func (e *encoder) Read() ([]byte, error) { // Release the buffer so that mmal can reuse this memory C.mmal_buffer_header_release(encodedBuffer) - return encoded, err + return encoded, func() {}, err } func (e *encoder) SetBitRate(b int) error { diff --git a/pkg/codec/openh264/openh264.go b/pkg/codec/openh264/openh264.go index c0d4cd27..f13e60cb 100644 --- a/pkg/codec/openh264/openh264.go +++ b/pkg/codec/openh264/openh264.go @@ -50,17 +50,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser, }, nil } -func (e *encoder) Read() ([]byte, error) { +func (e *encoder) Read() ([]byte, func(), error) { e.mu.Lock() defer e.mu.Unlock() if e.closed { - return nil, io.EOF + return nil, func() {}, io.EOF } - img, err := e.r.Read() + img, _, err := e.r.Read() if err != nil { - return nil, err + return nil, func() {}, err } yuvImg := img.(*image.YCbCr) @@ -74,11 +74,11 @@ func (e *encoder) Read() ([]byte, error) { width: C.int(bounds.Max.X - bounds.Min.X), }, &rv) if err := errResult(rv); err != nil { - return nil, fmt.Errorf("failed in encoding: %v", err) + return nil, func() {}, fmt.Errorf("failed in encoding: %v", err) } encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len) - return encoded, nil + return encoded, func() {}, nil } func (e *encoder) SetBitRate(b int) error { diff --git a/pkg/codec/opus/opus.go b/pkg/codec/opus/opus.go index f8f7d553..a76b14f3 100644 --- a/pkg/codec/opus/opus.go +++ b/pkg/codec/opus/opus.go @@ -72,22 +72,22 @@ func newEncoder(r audio.Reader, p prop.Media, params Params) (codec.ReadCloser, return &e, nil } -func (e *encoder) Read() ([]byte, error) { - buff, err := e.reader.Read() +func (e *encoder) Read() ([]byte, func(), error) { + buff, _, err := e.reader.Read() if err != nil { - return nil, err + return nil, func() {}, err } encoded := make([]byte, 1024) switch b := buff.(type) { case *wave.Int16Interleaved: n, err := e.engine.Encode(b.Data, encoded) - return encoded[:n:n], err + return encoded[:n:n], func() {}, err case *wave.Float32Interleaved: n, err := e.engine.EncodeFloat32(b.Data, encoded) - return encoded[:n:n], err + return encoded[:n:n], func() {}, err default: - return nil, errors.New("unknown type of audio buffer") + return nil, func() {}, errors.New("unknown type of audio buffer") } } diff --git a/pkg/codec/vaapi/vp8.go b/pkg/codec/vaapi/vp8.go index 5a0ef681..5c878602 100644 --- a/pkg/codec/vaapi/vp8.go +++ b/pkg/codec/vaapi/vp8.go @@ -64,7 +64,6 @@ import ( "unsafe" "github.com/pion/mediadevices/pkg/codec" - mio "github.com/pion/mediadevices/pkg/io" "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/prop" ) @@ -296,17 +295,17 @@ func newVP8Encoder(r video.Reader, p prop.Media, params ParamsVP8) (codec.ReadCl return e, nil } -func (e *encoderVP8) Read() ([]byte, error) { +func (e *encoderVP8) Read() ([]byte, func(), error) { e.mu.Lock() defer e.mu.Unlock() if e.closed { - return nil, io.EOF + return nil, func() {}, io.EOF } - img, err := e.r.Read() + img, _, err := e.r.Read() if err != nil { - return nil, err + return nil, func() {}, err } yuvImg := img.(*image.YCbCr) @@ -348,7 +347,7 @@ func (e *encoderVP8) Read() ([]byte, error) { } } if e.picParam.reconstructed_frame == C.VA_INVALID_SURFACE { - return nil, errors.New("no available surface") + return nil, func() {}, errors.New("no available surface") } C.setForceKFFlagVP8(&e.picParam, 0) @@ -416,7 +415,7 @@ func (e *encoderVP8) Read() ([]byte, error) { C.size_t(uintptr(p.src)), &id, ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s))) } buffs = append(buffs, id) } @@ -426,17 +425,17 @@ func (e *encoderVP8) Read() ([]byte, error) { e.display, e.ctxID, e.surfs[surfaceVP8Input], ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s))) } // Upload image var vaImg C.VAImage var rawBuf unsafe.Pointer if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP8Input], &vaImg); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) } // TODO: use vaImg.pitches to support padding C.memcpy( @@ -452,10 +451,10 @@ func (e *encoderVP8) Read() ([]byte, error) { unsafe.Pointer(&yuvImg.Cr[0]), C.size_t(len(yuvImg.Cr)), ) if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaRenderPicture( @@ -463,38 +462,38 @@ func (e *encoderVP8) Read() ([]byte, error) { &buffs[1], // 0 is for ouput C.int(len(buffs)-1), ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaEndPicture( e.display, e.ctxID, ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s))) } // Load encoded data for retry := 3; retry >= 0; retry-- { if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s))) } var surfStat C.VASurfaceStatus if s := C.vaQuerySurfaceStatus( e.display, e.picParam.reconstructed_frame, &surfStat, ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s))) } if surfStat == C.VASurfaceReady { break } if retry == 0 { - return nil, fmt.Errorf("failed to sync surface: %d", surfStat) + return nil, func() {}, fmt.Errorf("failed to sync surface: %d", surfStat) } } var seg *C.VACodedBufferSegment if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) } if seg.status&C.VA_CODED_BUF_STATUS_SLICE_OVERFLOW_MASK != 0 { - return nil, errors.New("buffer size too small") + return nil, func() {}, errors.New("buffer size too small") } if cap(e.frame) < int(seg.size) { @@ -507,13 +506,13 @@ func (e *encoderVP8) Read() ([]byte, error) { ) if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) } // Destroy buffers for _, b := range buffs { if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s))) } } @@ -538,7 +537,7 @@ func (e *encoderVP8) Read() ([]byte, error) { encoded := make([]byte, len(e.frame)) copy(encoded, e.frame) - return encoded, err + return encoded, func() {}, err } func (e *encoderVP8) SetBitRate(b int) error { diff --git a/pkg/codec/vaapi/vp9.go b/pkg/codec/vaapi/vp9.go index 6f0a835a..24b863d3 100644 --- a/pkg/codec/vaapi/vp9.go +++ b/pkg/codec/vaapi/vp9.go @@ -47,7 +47,6 @@ import ( "unsafe" "github.com/pion/mediadevices/pkg/codec" - mio "github.com/pion/mediadevices/pkg/io" "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/prop" ) @@ -285,17 +284,17 @@ func newVP9Encoder(r video.Reader, p prop.Media, params ParamsVP9) (codec.ReadCl return e, nil } -func (e *encoderVP9) Read() ([]byte, error) { +func (e *encoderVP9) Read() ([]byte, func(), error) { e.mu.Lock() defer e.mu.Unlock() if e.closed { - return nil, io.EOF + return nil, func() {}, io.EOF } - img, err := e.r.Read() + img, _, err := e.r.Read() if err != nil { - return nil, err + return nil, func() {}, err } yuvImg := img.(*image.YCbCr) @@ -379,7 +378,7 @@ func (e *encoderVP9) Read() ([]byte, error) { C.size_t(uintptr(p.src)), &id, ); s != C.VA_STATUS_SUCCESS { - return 0, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s))) } buffs = append(buffs, id) } @@ -389,17 +388,17 @@ func (e *encoderVP9) Read() ([]byte, error) { e.display, e.ctxID, e.surfs[surfaceVP9Input], ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s))) } // Upload image var vaImg C.VAImage var rawBuf unsafe.Pointer if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP9Input], &vaImg); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) } // TODO: use vaImg.pitches to support padding C.copyI420toNV12( @@ -410,10 +409,10 @@ func (e *encoderVP9) Read() ([]byte, error) { C.uint(len(yuvImg.Y)), ) if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaRenderPicture( @@ -421,27 +420,27 @@ func (e *encoderVP9) Read() ([]byte, error) { &buffs[1], // 0 is for ouput C.int(len(buffs)-1), ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s))) } if s := C.vaEndPicture( e.display, e.ctxID, ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s))) } // Load encoded data if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s))) } var surfStat C.VASurfaceStatus if s := C.vaQuerySurfaceStatus( e.display, e.picParam.reconstructed_frame, &surfStat, ); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s))) } var seg *C.VACodedBufferSegment if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS { - return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) } if cap(e.frame) < int(seg.size) { e.frame = make([]byte, int(seg.size)) @@ -453,13 +452,13 @@ func (e *encoderVP9) Read() ([]byte, error) { ) if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS { - return 0, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) } // Destroy buffers for _, b := range buffs { if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS { - return 0, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s))) + return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s))) } } @@ -473,7 +472,7 @@ func (e *encoderVP9) Read() ([]byte, error) { encoded := make([]byte, len(e.frame)) copy(encoded, e.frame) - return encoded, err + return encoded, func() {}, err } func (e *encoderVP9) SetBitRate(b int) error { diff --git a/pkg/codec/vpx/vpx.go b/pkg/codec/vpx/vpx.go index 5857a635..d47a3579 100644 --- a/pkg/codec/vpx/vpx.go +++ b/pkg/codec/vpx/vpx.go @@ -204,17 +204,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params, codecIface *C.vpx_c }, nil } -func (e *encoder) Read() ([]byte, error) { +func (e *encoder) Read() ([]byte, func(), error) { e.mu.Lock() defer e.mu.Unlock() if e.closed { - return nil, io.EOF + return nil, func() {}, io.EOF } - img, err := e.r.Read() + img, _, err := e.r.Read() if err != nil { - return nil, err + return nil, func() {}, err } yuvImg := img.(*image.YCbCr) bounds := yuvImg.Bounds() @@ -230,7 +230,7 @@ func (e *encoder) Read() ([]byte, error) { if e.cfg.g_w != C.uint(width) || e.cfg.g_h != C.uint(height) { e.cfg.g_w, e.cfg.g_h = C.uint(width), C.uint(height) if ec := C.vpx_codec_enc_config_set(e.codec, e.cfg); ec != C.VPX_CODEC_OK { - return nil, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec) + return nil, func() {}, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec) } e.raw.w, e.raw.h = C.uint(width), C.uint(height) e.raw.r_w, e.raw.r_h = C.uint(width), C.uint(height) @@ -243,7 +243,7 @@ func (e *encoder) Read() ([]byte, error) { C.long(t-e.tStart), C.ulong(t-e.tLastFrame), C.long(flags), C.ulong(e.deadline), (*C.uchar)(&yuvImg.Y[0]), (*C.uchar)(&yuvImg.Cb[0]), (*C.uchar)(&yuvImg.Cr[0]), ); ec != C.VPX_CODEC_OK { - return nil, fmt.Errorf("vpx_codec_encode failed (%d)", ec) + return nil, func() {}, fmt.Errorf("vpx_codec_encode failed (%d)", ec) } e.frameIndex++ @@ -264,7 +264,7 @@ func (e *encoder) Read() ([]byte, error) { encoded := make([]byte, len(e.frame)) copy(encoded, e.frame) - return encoded, err + return encoded, func() {}, err } func (e *encoder) SetBitRate(b int) error { diff --git a/pkg/codec/x264/x264.go b/pkg/codec/x264/x264.go index 97a2eb4a..948fe8c4 100644 --- a/pkg/codec/x264/x264.go +++ b/pkg/codec/x264/x264.go @@ -94,17 +94,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser, return &e, nil } -func (e *encoder) Read() ([]byte, error) { +func (e *encoder) Read() ([]byte, func(), error) { e.mu.Lock() defer e.mu.Unlock() if e.closed { - return nil, io.EOF + return nil, func() {}, io.EOF } - img, err := e.r.Read() + img, _, err := e.r.Read() if err != nil { - return nil, err + return nil, func() {}, err } yuvImg := img.(*image.YCbCr) @@ -117,11 +117,11 @@ func (e *encoder) Read() ([]byte, error) { &rc, ) if err := errFromC(rc); err != nil { - return nil, err + return nil, func() {}, err } encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len) - return encoded, err + return encoded, func() {}, err } func (e *encoder) SetBitRate(b int) error { diff --git a/pkg/driver/audiotest/dummy.go b/pkg/driver/audiotest/dummy.go index fc9ddaa9..6e839083 100644 --- a/pkg/driver/audiotest/dummy.go +++ b/pkg/driver/audiotest/dummy.go @@ -52,10 +52,10 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) { closed := d.closed - reader := audio.ReaderFunc(func() (wave.Audio, error) { + reader := audio.ReaderFunc(func() (wave.Audio, func(), error) { select { case <-closed: - return nil, io.EOF + return nil, func() {}, io.EOF default: } @@ -78,7 +78,7 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) { a.SetFloat32(i, ch, wave.Float32Sample(sin[phase])) } } - return a, nil + return a, func() {}, nil }) return reader, nil } diff --git a/pkg/driver/camera/camera_darwin.go b/pkg/driver/camera/camera_darwin.go index 4cd26a74..07d09628 100644 --- a/pkg/driver/camera/camera_darwin.go +++ b/pkg/driver/camera/camera_darwin.go @@ -56,10 +56,10 @@ func (cam *camera) VideoRecord(property prop.Media) (video.Reader, error) { if err != nil { return nil, err } - r := video.ReaderFunc(func() (image.Image, error) { - frame, err := rc.Read() + r := video.ReaderFunc(func() (image.Image, func(), error) { + frame, _, err := rc.Read() if err != nil { - return nil, err + return nil, func() {}, err } return decoder.Decode(frame, property.Width, property.Height) }) diff --git a/pkg/driver/camera/camera_linux.go b/pkg/driver/camera/camera_linux.go index a642d366..eaf10106 100644 --- a/pkg/driver/camera/camera_linux.go +++ b/pkg/driver/camera/camera_linux.go @@ -182,7 +182,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel var buf []byte - r := video.ReaderFunc(func() (img image.Image, err error) { + r := video.ReaderFunc(func() (img image.Image, release func(), err error) { // Lock to avoid accessing the buffer after StopStreaming() c.mutex.Lock() defer c.mutex.Unlock() @@ -191,23 +191,23 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) { for i := 0; i < maxEmptyFrameCount; i++ { if ctx.Err() != nil { // Return EOF if the camera is already closed. - return nil, io.EOF + return nil, func() {}, io.EOF } err := cam.WaitForFrame(5) // 5 seconds switch err.(type) { case nil: case *webcam.Timeout: - return nil, errReadTimeout + return nil, func() {}, errReadTimeout default: // Camera has been stopped. - return nil, err + return nil, func() {}, err } b, err := cam.ReadFrame() if err != nil { // Camera has been stopped. - return nil, err + return nil, func() {}, err } // Frame is empty. @@ -227,7 +227,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) { n := copy(buf, b) return decoder.Decode(buf[:n], p.Width, p.Height) } - return nil, errEmptyFrame + return nil, func() {}, errEmptyFrame }) return r, nil diff --git a/pkg/driver/camera/camera_windows.go b/pkg/driver/camera/camera_windows.go index 41f753db..3596957b 100644 --- a/pkg/driver/camera/camera_windows.go +++ b/pkg/driver/camera/camera_windows.go @@ -116,10 +116,10 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) { img := &image.YCbCr{} - r := video.ReaderFunc(func() (image.Image, error) { + r := video.ReaderFunc(func() (image.Image, func(), error) { b, ok := <-c.ch if !ok { - return nil, io.EOF + return nil, func() {}, io.EOF } img.Y = b[:nPix] img.Cb = b[nPix : nPix+nPix/2] @@ -128,7 +128,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) { img.CStride = p.Width / 2 img.SubsampleRatio = image.YCbCrSubsampleRatio422 img.Rect = image.Rect(0, 0, p.Width, p.Height) - return img, nil + return img, func() {}, nil }) return r, nil } diff --git a/pkg/driver/microphone/microphone_linux.go b/pkg/driver/microphone/microphone_linux.go index 0af6cc2d..0292e6ec 100644 --- a/pkg/driver/microphone/microphone_linux.go +++ b/pkg/driver/microphone/microphone_linux.go @@ -97,11 +97,11 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) { return nil, err } - reader := audio.ReaderFunc(func() (wave.Audio, error) { + reader := audio.ReaderFunc(func() (wave.Audio, func(), error) { buff, ok := <-samplesChan if !ok { stream.Close() - return nil, io.EOF + return nil, func() {}, io.EOF } a := wave.NewInt16Interleaved( @@ -112,7 +112,7 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) { ) copy(a.Data, buff) - return a, nil + return a, func() {}, nil }) stream.Start() diff --git a/pkg/driver/microphone/microphone_windows.go b/pkg/driver/microphone/microphone_windows.go index b9f11274..bf006e01 100644 --- a/pkg/driver/microphone/microphone_windows.go +++ b/pkg/driver/microphone/microphone_windows.go @@ -194,10 +194,10 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) { // TODO: detect microphone device disconnection and return EOF - reader := audio.ReaderFunc(func() (wave.Audio, error) { + reader := audio.ReaderFunc(func() (wave.Audio, func(), error) { b, ok := <-m.chBuf if !ok { - return nil, io.EOF + return nil, func() {}, io.EOF } select { @@ -210,7 +210,7 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) { uintptr(unsafe.Sizeof(b.waveHdr)), ) if err := errWinmm[ret]; err != nil { - return nil, err + return nil, func() {}, err } } @@ -229,7 +229,7 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) { } } - return a, nil + return a, func() {}, nil }) return reader, nil } diff --git a/pkg/driver/screen/x11_linux.go b/pkg/driver/screen/x11_linux.go index c07db90d..726aa319 100644 --- a/pkg/driver/screen/x11_linux.go +++ b/pkg/driver/screen/x11_linux.go @@ -68,9 +68,9 @@ func (s *screen) VideoRecord(p prop.Media) (video.Reader, error) { var dst image.RGBA reader := s.reader - r := video.ReaderFunc(func() (image.Image, error) { + r := video.ReaderFunc(func() (image.Image, func(), error) { <-s.tick.C - return reader.Read().ToRGBA(&dst), nil + return reader.Read().ToRGBA(&dst), func() {}, nil }) return r, nil } diff --git a/pkg/driver/videotest/dummy.go b/pkg/driver/videotest/dummy.go index c7341367..8619cee0 100644 --- a/pkg/driver/videotest/dummy.go +++ b/pkg/driver/videotest/dummy.go @@ -103,10 +103,10 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) { d.tick = tick closed := d.closed - r := video.ReaderFunc(func() (image.Image, error) { + r := video.ReaderFunc(func() (image.Image, func(), error) { select { case <-closed: - return nil, io.EOF + return nil, func() {}, io.EOF default: } @@ -130,7 +130,7 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) { CStride: p.Width / 2, SubsampleRatio: image.YCbCrSubsampleRatio422, Rect: image.Rect(0, 0, p.Width, p.Height), - }, nil + }, func() {}, nil }) return r, nil diff --git a/pkg/frame/compressed.go b/pkg/frame/compressed.go index 386fd255..11adc197 100644 --- a/pkg/frame/compressed.go +++ b/pkg/frame/compressed.go @@ -6,6 +6,7 @@ import ( "image/jpeg" ) -func decodeMJPEG(frame []byte, width, height int) (image.Image, error) { - return jpeg.Decode(bytes.NewReader(frame)) +func decodeMJPEG(frame []byte, width, height int) (image.Image, func(), error) { + img, err := jpeg.Decode(bytes.NewReader(frame)) + return img, func() {}, err } diff --git a/pkg/frame/frame.go b/pkg/frame/frame.go index fdfd405c..315b98c7 100644 --- a/pkg/frame/frame.go +++ b/pkg/frame/frame.go @@ -3,12 +3,12 @@ package frame import "image" type Decoder interface { - Decode(frame []byte, width, height int) (image.Image, error) + Decode(frame []byte, width, height int) (image.Image, func(), error) } // DecoderFunc is a proxy type for Decoder -type decoderFunc func(frame []byte, width, height int) (image.Image, error) +type decoderFunc func(frame []byte, width, height int) (image.Image, func(), error) -func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, error) { +func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, func(), error) { return f(frame, width, height) } diff --git a/pkg/frame/yuv.go b/pkg/frame/yuv.go index 965d8f8f..e9fac013 100644 --- a/pkg/frame/yuv.go +++ b/pkg/frame/yuv.go @@ -5,13 +5,13 @@ import ( "image" ) -func decodeI420(frame []byte, width, height int) (image.Image, error) { +func decodeI420(frame []byte, width, height int) (image.Image, func(), error) { yi := width * height cbi := yi + width*height/4 cri := cbi + width*height/4 if cri > len(frame) { - return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri) + return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri) } return &image.YCbCr{ @@ -22,15 +22,15 @@ func decodeI420(frame []byte, width, height int) (image.Image, error) { CStride: width / 2, SubsampleRatio: image.YCbCrSubsampleRatio420, Rect: image.Rect(0, 0, width, height), - }, nil + }, func() {}, nil } -func decodeNV21(frame []byte, width, height int) (image.Image, error) { +func decodeNV21(frame []byte, width, height int) (image.Image, func(), error) { yi := width * height ci := yi + width*height/2 if ci > len(frame) { - return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci) + return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci) } var cb, cr []byte @@ -47,5 +47,5 @@ func decodeNV21(frame []byte, width, height int) (image.Image, error) { CStride: width / 2, SubsampleRatio: image.YCbCrSubsampleRatio420, Rect: image.Rect(0, 0, width, height), - }, nil + }, func() {}, nil } diff --git a/pkg/frame/yuv_cgo.go b/pkg/frame/yuv_cgo.go index 368dd222..fe14c896 100644 --- a/pkg/frame/yuv_cgo.go +++ b/pkg/frame/yuv_cgo.go @@ -12,13 +12,13 @@ import ( // void decodeUYVYCGO(uint8_t* y, uint8_t* cb, uint8_t* cr, uint8_t* uyvy, int width, int height); import "C" -func decodeYUY2(frame []byte, width, height int) (image.Image, error) { +func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) { yi := width * height ci := yi / 2 fi := yi + 2*ci if len(frame) != fi { - return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) + return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) } y := make([]byte, yi) @@ -41,16 +41,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) { CStride: width / 2, SubsampleRatio: image.YCbCrSubsampleRatio422, Rect: image.Rect(0, 0, width, height), - }, nil + }, func() {}, nil } -func decodeUYVY(frame []byte, width, height int) (image.Image, error) { +func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) { yi := width * height ci := yi / 2 fi := yi + 2*ci if len(frame) != fi { - return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) + return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) } y := make([]byte, yi) @@ -73,5 +73,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) { CStride: width / 2, SubsampleRatio: image.YCbCrSubsampleRatio422, Rect: image.Rect(0, 0, width, height), - }, nil + }, func() {}, nil } diff --git a/pkg/frame/yuv_nocgo.go b/pkg/frame/yuv_nocgo.go index c13f751d..e1993571 100644 --- a/pkg/frame/yuv_nocgo.go +++ b/pkg/frame/yuv_nocgo.go @@ -7,13 +7,13 @@ import ( "image" ) -func decodeYUY2(frame []byte, width, height int) (image.Image, error) { +func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) { yi := width * height ci := yi / 2 fi := yi + 2*ci if len(frame) != fi { - return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) + return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) } y := make([]byte, yi) @@ -39,16 +39,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) { CStride: width / 2, SubsampleRatio: image.YCbCrSubsampleRatio422, Rect: image.Rect(0, 0, width, height), - }, nil + }, func() {}, nil } -func decodeUYVY(frame []byte, width, height int) (image.Image, error) { +func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) { yi := width * height ci := yi / 2 fi := yi + 2*ci if len(frame) != fi { - return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) + return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) } y := make([]byte, yi) @@ -74,5 +74,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) { CStride: width / 2, SubsampleRatio: image.YCbCrSubsampleRatio422, Rect: image.Rect(0, 0, width, height), - }, nil + }, func() {}, nil } diff --git a/pkg/frame/yuv_test.go b/pkg/frame/yuv_test.go index 0074700e..b60ffc24 100644 --- a/pkg/frame/yuv_test.go +++ b/pkg/frame/yuv_test.go @@ -27,7 +27,7 @@ func TestDecodeYUY2(t *testing.T) { Rect: image.Rect(0, 0, width, height), } - img, err := decodeYUY2(input, width, height) + img, _, err := decodeYUY2(input, width, height) if err != nil { t.Fatal(err) } @@ -56,7 +56,7 @@ func TestDecodeUYVY(t *testing.T) { Rect: image.Rect(0, 0, width, height), } - img, err := decodeUYVY(input, width, height) + img, _, err := decodeUYVY(input, width, height) if err != nil { t.Fatal(err) } @@ -77,7 +77,7 @@ func BenchmarkDecodeYUY2(b *testing.B) { b.Run(fmt.Sprintf("%dx%d", sz.width, sz.height), func(b *testing.B) { input := make([]byte, sz.width*sz.height*2) for i := 0; i < b.N; i++ { - _, err := decodeYUY2(input, sz.width, sz.height) + _, _, err := decodeYUY2(input, sz.width, sz.height) if err != nil { b.Fatal(err) } diff --git a/pkg/io/audio/audio.go b/pkg/io/audio/audio.go index d1eee76a..6caed79d 100644 --- a/pkg/io/audio/audio.go +++ b/pkg/io/audio/audio.go @@ -5,13 +5,14 @@ import ( ) type Reader interface { - Read() (wave.Audio, error) + Read() (chunk wave.Audio, release func(), err error) } -type ReaderFunc func() (wave.Audio, error) +type ReaderFunc func() (chunk wave.Audio, release func(), err error) -func (rf ReaderFunc) Read() (wave.Audio, error) { - return rf() +func (rf ReaderFunc) Read() (chunk wave.Audio, release func(), err error) { + chunk, release, err = rf() + return } // TransformFunc produces a new Reader that will produces a transformed audio diff --git a/pkg/io/audio/broadcast.go b/pkg/io/audio/broadcast.go index 6ab97cbe..6f6fe84b 100644 --- a/pkg/io/audio/broadcast.go +++ b/pkg/io/audio/broadcast.go @@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster { coreConfig = config.Core } - broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) { + broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) { return source.Read() }), coreConfig) @@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader { } reader := broadcaster.ioBroadcaster.NewReader(copyFn) - return ReaderFunc(func() (wave.Audio, error) { - data, err := reader.Read() + return ReaderFunc(func() (wave.Audio, func(), error) { + data, _, err := reader.Read() chunk, _ := data.(wave.Audio) - return chunk, err + return chunk, func() {}, err }) } // ReplaceSource replaces the underlying source. This operation is thread safe. func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { - return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) { + return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) { return source.Read() })) } @@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { // Source retrieves the underlying source. This operation is thread safe. func (broadcaster *Broadcaster) Source() Reader { source := broadcaster.ioBroadcaster.Source() - return ReaderFunc(func() (wave.Audio, error) { - data, err := source.Read() + return ReaderFunc(func() (wave.Audio, func(), error) { + data, _, err := source.Read() img, _ := data.(wave.Audio) - return img, err + return img, func() {}, err }) } diff --git a/pkg/io/audio/broadcast_test.go b/pkg/io/audio/broadcast_test.go index a87c8013..e6ef8666 100644 --- a/pkg/io/audio/broadcast_test.go +++ b/pkg/io/audio/broadcast_test.go @@ -14,18 +14,18 @@ func TestBroadcast(t *testing.T) { SamplingRate: 48000, }) - source := ReaderFunc(func() (wave.Audio, error) { - return chunk, nil + source := ReaderFunc(func() (wave.Audio, func(), error) { + return chunk, func() {}, nil }) broadcaster := NewBroadcaster(source, nil) readerWithoutCopy1 := broadcaster.NewReader(false) readerWithoutCopy2 := broadcaster.NewReader(false) - actualWithoutCopy1, err := readerWithoutCopy1.Read() + actualWithoutCopy1, _, err := readerWithoutCopy1.Read() if err != nil { t.Fatal(err) } - actualWithoutCopy2, err := readerWithoutCopy2.Read() + actualWithoutCopy2, _, err := readerWithoutCopy2.Read() if err != nil { t.Fatal(err) } @@ -39,7 +39,7 @@ func TestBroadcast(t *testing.T) { } readerWithCopy := broadcaster.NewReader(true) - actualWithCopy, err := readerWithCopy.Read() + actualWithCopy, _, err := readerWithCopy.Read() if err != nil { t.Fatal(err) } diff --git a/pkg/io/audio/buffer.go b/pkg/io/audio/buffer.go index 0df6d8f9..720fd8b2 100644 --- a/pkg/io/audio/buffer.go +++ b/pkg/io/audio/buffer.go @@ -13,15 +13,15 @@ func NewBuffer(nSamples int) TransformFunc { var inBuff wave.Audio return func(r Reader) Reader { - return ReaderFunc(func() (wave.Audio, error) { + return ReaderFunc(func() (wave.Audio, func(), error) { for { if inBuff != nil && inBuff.ChunkInfo().Len >= nSamples { break } - buff, err := r.Read() + buff, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } switch b := buff.(type) { case *wave.Float32Interleaved: @@ -59,7 +59,7 @@ func NewBuffer(nSamples int) TransformFunc { ib.Size.Len += b.Size.Len default: - return nil, errUnsupported + return nil, func() {}, errUnsupported } } switch ib := inBuff.(type) { @@ -71,7 +71,7 @@ func NewBuffer(nSamples int) TransformFunc { copy(ibCopy.Data, ib.Data) ib.Data = ib.Data[n:] ib.Size.Len -= nSamples - return &ibCopy, nil + return &ibCopy, func() {}, nil case *wave.Float32Interleaved: ibCopy := *ib @@ -81,9 +81,9 @@ func NewBuffer(nSamples int) TransformFunc { copy(ibCopy.Data, ib.Data) ib.Data = ib.Data[n:] ib.Size.Len -= nSamples - return &ibCopy, nil + return &ibCopy, func() {}, nil } - return nil, errUnsupported + return nil, func() {}, errUnsupported }) } } diff --git a/pkg/io/audio/buffer_test.go b/pkg/io/audio/buffer_test.go index 9f4b5114..90edc409 100644 --- a/pkg/io/audio/buffer_test.go +++ b/pkg/io/audio/buffer_test.go @@ -49,16 +49,16 @@ func TestBuffer(t *testing.T) { trans := NewBuffer(3) var iSent int - r := trans(ReaderFunc(func() (wave.Audio, error) { + r := trans(ReaderFunc(func() (wave.Audio, func(), error) { if iSent < len(input) { iSent++ - return input[iSent-1], nil + return input[iSent-1], func() {}, nil } - return nil, io.EOF + return nil, func() {}, io.EOF })) for i := 0; ; i++ { - a, err := r.Read() + a, _, err := r.Read() if err != nil { if err == io.EOF && i >= len(expected) { break diff --git a/pkg/io/audio/detect.go b/pkg/io/audio/detect.go index 454e5d6c..6a6e2e83 100644 --- a/pkg/io/audio/detect.go +++ b/pkg/io/audio/detect.go @@ -13,12 +13,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF return func(r Reader) Reader { var currentProp prop.Media var chunkCount uint - return ReaderFunc(func() (wave.Audio, error) { + return ReaderFunc(func() (wave.Audio, func(), error) { var dirty bool - chunk, err := r.Read() + chunk, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } info := chunk.ChunkInfo() @@ -40,7 +40,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF } chunkCount++ - return chunk, nil + return chunk, func() {}, nil }) } } diff --git a/pkg/io/audio/detect_test.go b/pkg/io/audio/detect_test.go index 450e4020..178eb4a5 100644 --- a/pkg/io/audio/detect_test.go +++ b/pkg/io/audio/detect_test.go @@ -11,12 +11,12 @@ import ( func TestDetectChanges(t *testing.T) { buildSource := func(p prop.Media) (Reader, func(prop.Media)) { - return ReaderFunc(func() (wave.Audio, error) { + return ReaderFunc(func() (wave.Audio, func(), error) { return wave.NewFloat32Interleaved(wave.ChunkInfo{ Len: 0, Channels: p.ChannelCount, SamplingRate: p.SampleRate, - }), nil + }), func() {}, nil }), func(newProp prop.Media) { p = newProp } @@ -34,7 +34,7 @@ func TestDetectChanges(t *testing.T) { detectBeforeFirstChunk = true })(src) - _, err := src.Read() + _, _, err := src.Read() if err != nil { t.Fatal(err) } @@ -63,7 +63,7 @@ func TestDetectChanges(t *testing.T) { expected.ChannelCount = channelCount expected.SampleRate = sampleRate update(expected) - _, err := src.Read() + _, _, err := src.Read() if err != nil { t.Fatal(err) } diff --git a/pkg/io/audio/mixer.go b/pkg/io/audio/mixer.go index 32592944..9ec85ffd 100644 --- a/pkg/io/audio/mixer.go +++ b/pkg/io/audio/mixer.go @@ -8,14 +8,14 @@ import ( // NewChannelMixer creates audio transform to mix audio channels. func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc { return func(r Reader) Reader { - return ReaderFunc(func() (wave.Audio, error) { - buff, err := r.Read() + return ReaderFunc(func() (wave.Audio, func(), error) { + buff, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } ci := buff.ChunkInfo() if ci.Channels == channels { - return buff, nil + return buff, func() {}, nil } ci.Channels = channels @@ -32,9 +32,9 @@ func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc { mixed = wave.NewFloat32NonInterleaved(ci) } if err := mixer.Mix(mixed, buff); err != nil { - return nil, err + return nil, func() {}, err } - return mixed, nil + return mixed, func() {}, nil }) } } diff --git a/pkg/io/audio/mixer_test.go b/pkg/io/audio/mixer_test.go index a31ace92..3669daeb 100644 --- a/pkg/io/audio/mixer_test.go +++ b/pkg/io/audio/mixer_test.go @@ -34,16 +34,16 @@ func TestMixer(t *testing.T) { trans := NewChannelMixer(1, &mixer.MonoMixer{}) var iSent int - r := trans(ReaderFunc(func() (wave.Audio, error) { + r := trans(ReaderFunc(func() (wave.Audio, func(), error) { if iSent < len(input) { iSent++ - return input[iSent-1], nil + return input[iSent-1], func() {}, nil } - return nil, io.EOF + return nil, func() {}, io.EOF })) for i := 0; ; i++ { - a, err := r.Read() + a, _, err := r.Read() if err != nil { if err == io.EOF && i >= len(expected) { break diff --git a/pkg/io/broadcast.go b/pkg/io/broadcast.go index 86cfcadc..ce296e08 100644 --- a/pkg/io/broadcast.go +++ b/pkg/io/broadcast.go @@ -127,10 +127,10 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster { func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader { currentCount := broadcaster.buffer.lastCount() - return ReaderFunc(func() (data interface{}, err error) { + return ReaderFunc(func() (data interface{}, release func(), err error) { currentCount++ if push := broadcaster.buffer.acquire(currentCount); push != nil { - data, err = broadcaster.source.Load().(Reader).Read() + data, _, err = broadcaster.source.Load().(Reader).Read() push(&broadcasterData{ data: data, err: err, diff --git a/pkg/io/broadcast_test.go b/pkg/io/broadcast_test.go index 1bf177b4..5d14df55 100644 --- a/pkg/io/broadcast_test.go +++ b/pkg/io/broadcast_test.go @@ -57,7 +57,7 @@ func TestBroadcast(t *testing.T) { frameCount := 0 frameSent := 0 lastSend := time.Now() - src = ReaderFunc(func() (interface{}, error) { + src = ReaderFunc(func() (interface{}, func(), error) { if pauseCond.src && frameSent == 30 { time.Sleep(time.Second) } @@ -74,7 +74,7 @@ func TestBroadcast(t *testing.T) { frame := frames[frameCount] frameCount++ frameSent++ - return frame, nil + return frame, func() {}, nil }) broadcaster := NewBroadcaster(src, nil) var done uint32 @@ -95,7 +95,7 @@ func TestBroadcast(t *testing.T) { if pauseCond.dst && count == 30 { time.Sleep(time.Second) } - frame, err := reader.Read() + frame, _, err := reader.Read() if err != nil { t.Error(err) } diff --git a/pkg/io/reader.go b/pkg/io/reader.go index 3bd676a9..8d65fa2d 100644 --- a/pkg/io/reader.go +++ b/pkg/io/reader.go @@ -3,12 +3,13 @@ package io // Reader is a generic data reader. In the future, interface{} should be replaced by a generic type // to provide strong type. type Reader interface { - Read() (interface{}, error) + Read() (data interface{}, release func(), err error) } // ReaderFunc is a proxy type for Reader -type ReaderFunc func() (interface{}, error) +type ReaderFunc func() (data interface{}, release func(), err error) -func (f ReaderFunc) Read() (interface{}, error) { - return f() +func (f ReaderFunc) Read() (data interface{}, release func(), err error) { + data, release, err = f() + return } diff --git a/pkg/io/video/broadcast.go b/pkg/io/video/broadcast.go index efe2a0aa..b0424853 100644 --- a/pkg/io/video/broadcast.go +++ b/pkg/io/video/broadcast.go @@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster { coreConfig = config.Core } - broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) { + broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) { return source.Read() }), coreConfig) @@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader { } reader := broadcaster.ioBroadcaster.NewReader(copyFn) - return ReaderFunc(func() (image.Image, error) { - data, err := reader.Read() + return ReaderFunc(func() (image.Image, func(), error) { + data, _, err := reader.Read() img, _ := data.(image.Image) - return img, err + return img, func() {}, err }) } // ReplaceSource replaces the underlying source. This operation is thread safe. func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { - return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) { + return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) { return source.Read() })) } @@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { // Source retrieves the underlying source. This operation is thread safe. func (broadcaster *Broadcaster) Source() Reader { source := broadcaster.ioBroadcaster.Source() - return ReaderFunc(func() (image.Image, error) { - data, err := source.Read() + return ReaderFunc(func() (image.Image, func(), error) { + data, _, err := source.Read() img, _ := data.(image.Image) - return img, err + return img, func() {}, err }) } diff --git a/pkg/io/video/broadcast_test.go b/pkg/io/video/broadcast_test.go index fbf237f3..b84d5beb 100644 --- a/pkg/io/video/broadcast_test.go +++ b/pkg/io/video/broadcast_test.go @@ -9,18 +9,18 @@ import ( func TestBroadcast(t *testing.T) { resolution := image.Rect(0, 0, 1920, 1080) img := image.NewGray(resolution) - source := ReaderFunc(func() (image.Image, error) { - return img, nil + source := ReaderFunc(func() (image.Image, func(), error) { + return img, func() {}, nil }) broadcaster := NewBroadcaster(source, nil) readerWithoutCopy1 := broadcaster.NewReader(false) readerWithoutCopy2 := broadcaster.NewReader(false) - actualWithoutCopy1, err := readerWithoutCopy1.Read() + actualWithoutCopy1, _, err := readerWithoutCopy1.Read() if err != nil { t.Fatal(err) } - actualWithoutCopy2, err := readerWithoutCopy2.Read() + actualWithoutCopy2, _, err := readerWithoutCopy2.Read() if err != nil { t.Fatal(err) } @@ -34,7 +34,7 @@ func TestBroadcast(t *testing.T) { } readerWithCopy := broadcaster.NewReader(true) - actualWithCopy, err := readerWithCopy.Read() + actualWithCopy, _, err := readerWithCopy.Read() if err != nil { t.Fatal(err) } diff --git a/pkg/io/video/convert.go b/pkg/io/video/convert.go index c79e0853..c2eb38ef 100644 --- a/pkg/io/video/convert.go +++ b/pkg/io/video/convert.go @@ -63,10 +63,10 @@ func imageToYCbCr(dst *image.YCbCr, src image.Image) { // ToI420 converts r to a new reader that will output images in I420 format func ToI420(r Reader) Reader { var yuvImg image.YCbCr - return ReaderFunc(func() (image.Image, error) { - img, err := r.Read() + return ReaderFunc(func() (image.Image, func(), error) { + img, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } imageToYCbCr(&yuvImg, img) @@ -79,11 +79,11 @@ func ToI420(r Reader) Reader { i422ToI420(&yuvImg) case image.YCbCrSubsampleRatio420: default: - return nil, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio) + return nil, func() {}, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio) } yuvImg.SubsampleRatio = image.YCbCrSubsampleRatio420 - return &yuvImg, nil + return &yuvImg, func() {}, nil }) } @@ -130,13 +130,13 @@ func imageToRGBA(dst *image.RGBA, src image.Image) { // ToRGBA converts r to a new reader that will output images in RGBA format func ToRGBA(r Reader) Reader { var dst image.RGBA - return ReaderFunc(func() (image.Image, error) { - img, err := r.Read() + return ReaderFunc(func() (image.Image, func(), error) { + img, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } imageToRGBA(&dst, img) - return &dst, nil + return &dst, func() {}, nil }) } diff --git a/pkg/io/video/convert_test.go b/pkg/io/video/convert_test.go index 2950fef0..e1eafc55 100644 --- a/pkg/io/video/convert_test.go +++ b/pkg/io/video/convert_test.go @@ -144,10 +144,10 @@ func TestToI420(t *testing.T) { for name, c := range cases { c := c t.Run(name, func(t *testing.T) { - r := ToI420(ReaderFunc(func() (image.Image, error) { - return c.src, nil + r := ToI420(ReaderFunc(func() (image.Image, func(), error) { + return c.src, func() {}, nil })) - out, err := r.Read() + out, _, err := r.Read() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -199,10 +199,10 @@ func TestToRGBA(t *testing.T) { for name, c := range cases { c := c t.Run(name, func(t *testing.T) { - r := ToRGBA(ReaderFunc(func() (image.Image, error) { - return c.src, nil + r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) { + return c.src, func() {}, nil })) - out, err := r.Read() + out, _, err := r.Read() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -225,12 +225,12 @@ func BenchmarkToI420(b *testing.B) { for name, img := range cases { img := img b.Run(name, func(b *testing.B) { - r := ToI420(ReaderFunc(func() (image.Image, error) { - return img, nil + r := ToI420(ReaderFunc(func() (image.Image, func(), error) { + return img, func() {}, nil })) for i := 0; i < b.N; i++ { - _, err := r.Read() + _, _, err := r.Read() if err != nil { b.Fatalf("Unexpected error: %v", err) } @@ -253,12 +253,12 @@ func BenchmarkToRGBA(b *testing.B) { for name, img := range cases { img := img b.Run(name, func(b *testing.B) { - r := ToRGBA(ReaderFunc(func() (image.Image, error) { - return img, nil + r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) { + return img, func() {}, nil })) for i := 0; i < b.N; i++ { - _, err := r.Read() + _, _, err := r.Read() if err != nil { b.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/io/video/detect.go b/pkg/io/video/detect.go index 078f7da6..384b2872 100644 --- a/pkg/io/video/detect.go +++ b/pkg/io/video/detect.go @@ -14,12 +14,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF var currentProp prop.Media var lastTaken time.Time var frames uint - return ReaderFunc(func() (image.Image, error) { + return ReaderFunc(func() (image.Image, func(), error) { var dirty bool - img, err := r.Read() + img, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } bounds := img.Bounds() @@ -52,7 +52,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF } frames++ - return img, nil + return img, func() {}, nil }) } } diff --git a/pkg/io/video/detect_test.go b/pkg/io/video/detect_test.go index f20e6492..ea2ef1cc 100644 --- a/pkg/io/video/detect_test.go +++ b/pkg/io/video/detect_test.go @@ -12,8 +12,8 @@ import ( func BenchmarkDetectChanges(b *testing.B) { var src Reader - src = ReaderFunc(func() (image.Image, error) { - return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), nil + src = ReaderFunc(func() (image.Image, func(), error) { + return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), func() {}, nil }) b.Run("WithoutDetectChanges", func(b *testing.B) { @@ -40,8 +40,8 @@ func BenchmarkDetectChanges(b *testing.B) { func TestDetectChanges(t *testing.T) { buildSource := func(p prop.Media) (Reader, func(prop.Media)) { - return ReaderFunc(func() (image.Image, error) { - return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), nil + return ReaderFunc(func() (image.Image, func(), error) { + return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), func() {}, nil }), func(newProp prop.Media) { p = newProp } @@ -86,7 +86,7 @@ func TestDetectChanges(t *testing.T) { detectBeforeFirstFrame = true })(src) - frame, err := src.Read() + frame, _, err := src.Read() if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestDetectChanges(t *testing.T) { expected.Width = width expected.Height = height update(expected) - frame, err := src.Read() + frame, _, err := src.Read() if err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func TestDetectChanges(t *testing.T) { })(src) for count < 3 { - frame, err := src.Read() + frame, _, err := src.Read() if err != nil { t.Fatal(err) } diff --git a/pkg/io/video/scale.go b/pkg/io/video/scale.go index 89f61176..5a91cf19 100644 --- a/pkg/io/video/scale.go +++ b/pkg/io/video/scale.go @@ -156,10 +156,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc { } } - return ReaderFunc(func() (image.Image, error) { - img, err := r.Read() + return ReaderFunc(func() (image.Image, func(), error) { + img, _, err := r.Read() if err != nil { - return nil, err + return nil, func() {}, err } switch v := img.(type) { @@ -169,7 +169,7 @@ func Scale(width, height int, scaler Scaler) TransformFunc { scalerCached.Scale(dst, rect, v, v.Rect, draw.Src, nil) cloned := *dst // clone metadata - return &cloned, nil + return &cloned, func() {}, nil case *image.YCbCr: ycbcrRealloc(v) @@ -184,10 +184,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc { scalerCached.Scale(dst, dst.Bounds(), src, src.Bounds(), draw.Src, nil) cloned := *(imgScaled.(*image.YCbCr)) // clone metadata - return &cloned, nil + return &cloned, func() {}, nil default: - return nil, errUnsupportedImageType + return nil, func() {}, errUnsupportedImageType } }) } diff --git a/pkg/io/video/scale_test.go b/pkg/io/video/scale_test.go index 893153a9..b5ddc96f 100644 --- a/pkg/io/video/scale_test.go +++ b/pkg/io/video/scale_test.go @@ -215,11 +215,11 @@ func TestScale(t *testing.T) { c := c t.Run(name, func(t *testing.T) { trans := Scale(c.width, c.height, algo) - r := trans(ReaderFunc(func() (image.Image, error) { - return c.src, nil + r := trans(ReaderFunc(func() (image.Image, func(), error) { + return c.src, func() {}, nil })) for i := 0; i < 4; i++ { - out, err := r.Read() + out, _, err := r.Read() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -261,12 +261,12 @@ func BenchmarkScale(b *testing.B) { img := img b.Run(name, func(b *testing.B) { trans := Scale(640, 360, algo) - r := trans(ReaderFunc(func() (image.Image, error) { - return img, nil + r := trans(ReaderFunc(func() (image.Image, func(), error) { + return img, func() {}, nil })) for i := 0; i < b.N; i++ { - _, err := r.Read() + _, _, err := r.Read() if err != nil { b.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/io/video/throttle.go b/pkg/io/video/throttle.go index 45707ce0..131573e2 100644 --- a/pkg/io/video/throttle.go +++ b/pkg/io/video/throttle.go @@ -10,16 +10,16 @@ import ( func Throttle(rate float32) TransformFunc { return func(r Reader) Reader { ticker := time.NewTicker(time.Duration(int64(float64(time.Second) / float64(rate)))) - return ReaderFunc(func() (image.Image, error) { + return ReaderFunc(func() (image.Image, func(), error) { for { - img, err := r.Read() + img, _, err := r.Read() if err != nil { ticker.Stop() - return nil, err + return nil, func() {}, err } select { case <-ticker.C: - return img, nil + return img, func() {}, nil default: } } diff --git a/pkg/io/video/throttle_test.go b/pkg/io/video/throttle_test.go index c9031fc2..cac55541 100644 --- a/pkg/io/video/throttle_test.go +++ b/pkg/io/video/throttle_test.go @@ -19,14 +19,14 @@ func TestThrottle(t *testing.T) { var cntPush int trans := Throttle(50) - r := trans(ReaderFunc(func() (image.Image, error) { + r := trans(ReaderFunc(func() (image.Image, func(), error) { <-ticker.C cntPush++ - return img, nil + return img, func() {}, nil })) for i := 0; i < 20; i++ { - _, err := r.Read() + _, _, err := r.Read() if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/io/video/video.go b/pkg/io/video/video.go index 5148087f..b7a2a9db 100644 --- a/pkg/io/video/video.go +++ b/pkg/io/video/video.go @@ -5,13 +5,14 @@ import ( ) type Reader interface { - Read() (img image.Image, err error) + Read() (img image.Image, release func(), err error) } -type ReaderFunc func() (img image.Image, err error) +type ReaderFunc func() (img image.Image, release func(), err error) -func (rf ReaderFunc) Read() (img image.Image, err error) { - return rf() +func (rf ReaderFunc) Read() (img image.Image, release func(), err error) { + img, release, err = rf() + return } // TransformFunc produces a new Reader that will produces a transformed video diff --git a/track.go b/track.go index e16d35f7..c7b03da4 100644 --- a/track.go +++ b/track.go @@ -160,7 +160,7 @@ func (t *track) onError(err error) { // start starts the data flow from the driver all the way to the localTrack func (t *track) start() { for { - buff, err := t.encoder.Read() + buff, _, err := t.encoder.Read() if err != nil { t.onError(err) return