Skip to content

Commit

Permalink
Update readers to be memory pool friendly
Browse files Browse the repository at this point in the history
  • Loading branch information
lherman-cs committed Oct 29, 2020
1 parent f4a4edc commit 559c6a1
Show file tree
Hide file tree
Showing 50 changed files with 273 additions and 271 deletions.
16 changes: 8 additions & 8 deletions mediadevices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ type mockVideoCodec struct {
closed chan struct{}
}

func (m *mockVideoCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
func (m *mockVideoCodec) Read() ([]byte, func(), error) {
if _, _, err := m.r.Read(); err != nil {
return nil, func() {}, err
}
return make([]byte, 20), nil
return make([]byte, 20), func() {}, nil
}

func (m *mockVideoCodec) Close() error { return nil }
Expand All @@ -216,11 +216,11 @@ type mockAudioCodec struct {
closed chan struct{}
}

func (m *mockAudioCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
func (m *mockAudioCodec) Read() ([]byte, func(), error) {
if _, _, err := m.r.Read(); err != nil {
return nil, func() {}, err
}
return make([]byte, 20), nil
return make([]byte, 20), func() {}, nil
}
func (m *mockAudioCodec) Close() error { return nil }

Expand Down
4 changes: 2 additions & 2 deletions meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func detectCurrentVideoProp(broadcaster *video.Broadcaster) (prop.Media, error)
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read()
_, _, err := metaReader.Read()

return currentProp, err
}
Expand All @@ -29,7 +29,7 @@ func detectCurrentAudioProp(broadcaster *audio.Broadcaster) (prop.Media, error)
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = audio.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read()
_, _, err := metaReader.Read()

return currentProp, err
}
16 changes: 8 additions & 8 deletions meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func TestDetectCurrentVideoProp(t *testing.T) {
second.Pix[0] = 2

isFirst := true
source := video.ReaderFunc(func() (image.Image, error) {
source := video.ReaderFunc(func() (image.Image, func(), error) {
if isFirst {
isFirst = true
return first, nil
return first, func() {}, nil
} else {
return second, nil
return second, func() {}, nil
}
})

Expand All @@ -42,7 +42,7 @@ func TestDetectCurrentVideoProp(t *testing.T) {
}

reader := broadcaster.NewReader(false)
img, err := reader.Read()
img, _, err := reader.Read()
if err != nil {
t.Fatal(err)
}
Expand All @@ -65,12 +65,12 @@ func TestDetectCurrentAudioProp(t *testing.T) {
second.Data[0] = 2

isFirst := true
source := audio.ReaderFunc(func() (wave.Audio, error) {
source := audio.ReaderFunc(func() (wave.Audio, func(), error) {
if isFirst {
isFirst = true
return first, nil
return first, func() {}, nil
} else {
return second, nil
return second, func() {}, nil
}
})

Expand All @@ -86,7 +86,7 @@ func TestDetectCurrentAudioProp(t *testing.T) {
}

reader := broadcaster.NewReader(false)
chunk, err := reader.Read()
chunk, _, err := reader.Read()
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/avfoundation/avfoundation_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func (rc *ReadCloser) dataCb(data []byte) {
// Read reads raw data, the format is determined by the media type and property:
// - For video, each call will return a frame.
// - For audio, each call will return a chunk which its size configured by Latency
func (rc *ReadCloser) Read() ([]byte, error) {
func (rc *ReadCloser) Read() ([]byte, func(), error) {
data, ok := <-rc.dataChan
if !ok {
return nil, io.EOF
return nil, func() {}, io.EOF
}
return data, nil
return data, func() {}, nil
}

// Close closes the capturing session, and no data will flow anymore
Expand Down
2 changes: 1 addition & 1 deletion pkg/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type VideoEncoderBuilder interface {

// ReadCloser is an io.ReadCloser with methods for rate limiting: SetBitRate and ForceKeyFrame
type ReadCloser interface {
Read() ([]byte, error)
Read() (b []byte, release func(), err error)
Close() error
// SetBitRate sets current target bitrate, lower bitrate means smaller data will be transmitted
// but this also means that the quality will also be lower.
Expand Down
12 changes: 6 additions & 6 deletions pkg/codec/mmal/mmal.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}

func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()

if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}

img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imgReal := img.(*image.YCbCr)
var y, cb, cr C.Slice
Expand All @@ -79,15 +79,15 @@ func (e *encoder) Read() ([]byte, error) {
var encodedBuffer *C.MMAL_BUFFER_HEADER_T
status := C.enc_encode(&e.engine, y, cb, cr, &encodedBuffer)
if status.code != 0 {
return nil, statusToErr(&status)
return nil, func() {}, statusToErr(&status)
}

// GoBytes copies the C array to Go slice. After this, it's safe to release the C array
encoded := C.GoBytes(unsafe.Pointer(encodedBuffer.data), C.int(encodedBuffer.length))
// Release the buffer so that mmal can reuse this memory
C.mmal_buffer_header_release(encodedBuffer)

return encoded, err
return encoded, func() {}, err
}

func (e *encoder) SetBitRate(b int) error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/codec/openh264/openh264.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
}, nil
}

func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()

if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}

img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}

yuvImg := img.(*image.YCbCr)
Expand All @@ -74,11 +74,11 @@ func (e *encoder) Read() ([]byte, error) {
width: C.int(bounds.Max.X - bounds.Min.X),
}, &rv)
if err := errResult(rv); err != nil {
return nil, fmt.Errorf("failed in encoding: %v", err)
return nil, func() {}, fmt.Errorf("failed in encoding: %v", err)
}

encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, nil
return encoded, func() {}, nil
}

func (e *encoder) SetBitRate(b int) error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/codec/opus/opus.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,22 @@ func newEncoder(r audio.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}

func (e *encoder) Read() ([]byte, error) {
buff, err := e.reader.Read()
func (e *encoder) Read() ([]byte, func(), error) {
buff, _, err := e.reader.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}

encoded := make([]byte, 1024)
switch b := buff.(type) {
case *wave.Int16Interleaved:
n, err := e.engine.Encode(b.Data, encoded)
return encoded[:n:n], err
return encoded[:n:n], func() {}, err
case *wave.Float32Interleaved:
n, err := e.engine.EncodeFloat32(b.Data, encoded)
return encoded[:n:n], err
return encoded[:n:n], func() {}, err
default:
return nil, errors.New("unknown type of audio buffer")
return nil, func() {}, errors.New("unknown type of audio buffer")
}
}

Expand Down
43 changes: 21 additions & 22 deletions pkg/codec/vaapi/vp8.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"unsafe"

"github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
Expand Down Expand Up @@ -296,17 +295,17 @@ func newVP8Encoder(r video.Reader, p prop.Media, params ParamsVP8) (codec.ReadCl
return e, nil
}

func (e *encoderVP8) Read() ([]byte, error) {
func (e *encoderVP8) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()

if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}

img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)

Expand Down Expand Up @@ -348,7 +347,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
}
}
if e.picParam.reconstructed_frame == C.VA_INVALID_SURFACE {
return nil, errors.New("no available surface")
return nil, func() {}, errors.New("no available surface")
}

C.setForceKFFlagVP8(&e.picParam, 0)
Expand Down Expand Up @@ -416,7 +415,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
C.size_t(uintptr(p.src)),
&id,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
}
buffs = append(buffs, id)
}
Expand All @@ -426,17 +425,17 @@ func (e *encoderVP8) Read() ([]byte, error) {
e.display, e.ctxID,
e.surfs[surfaceVP8Input],
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
}

// Upload image
var vaImg C.VAImage
var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP8Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// TODO: use vaImg.pitches to support padding
C.memcpy(
Expand All @@ -452,49 +451,49 @@ func (e *encoderVP8) Read() ([]byte, error) {
unsafe.Pointer(&yuvImg.Cr[0]), C.size_t(len(yuvImg.Cr)),
)
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
}

if s := C.vaRenderPicture(
e.display, e.ctxID,
&buffs[1], // 0 is for ouput
C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaEndPicture(
e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
}

// Load encoded data
for retry := 3; retry >= 0; retry-- {
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
}
var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
}
if surfStat == C.VASurfaceReady {
break
}
if retry == 0 {
return nil, fmt.Errorf("failed to sync surface: %d", surfStat)
return nil, func() {}, fmt.Errorf("failed to sync surface: %d", surfStat)
}
}
var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if seg.status&C.VA_CODED_BUF_STATUS_SLICE_OVERFLOW_MASK != 0 {
return nil, errors.New("buffer size too small")
return nil, func() {}, errors.New("buffer size too small")
}

if cap(e.frame) < int(seg.size) {
Expand All @@ -507,13 +506,13 @@ func (e *encoderVP8) Read() ([]byte, error) {
)

if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}

// Destroy buffers
for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
}
}

Expand All @@ -538,7 +537,7 @@ func (e *encoderVP8) Read() ([]byte, error) {

encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}

func (e *encoderVP8) SetBitRate(b int) error {
Expand Down
Loading

0 comments on commit 559c6a1

Please sign in to comment.