Skip to content

Receive backchannel audio in Server #597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion examples/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
existingStream = "rtsp://x.x.x.x:8554/mystream"
existingStream = "rtsp://u37001:[email protected]/onvif-media/media.amp?profile=profile_1_h264&sessiontimeout=60&streamtype=unicast"
reconnectPause = 2 * time.Second
)

Expand Down Expand Up @@ -62,6 +62,22 @@ func (c *client) read() error {
return err
}

// HACK!!!
// HACK. Add backchannel into desc before it is passed to the Server with setStreamReady()
// HACK!!!
var extraMedia = new(description.Media)
extraMedia.IsBackChannel = true
extraMedia.Type = description.MediaTypeAudio
extraMedia.Control = "track=extrabackchannel"
extraMedia.Formats = []format.Format{&format.G711{
PayloadTyp: 0, // 0 = MULAW 8 = ALAW
MULaw: true,
SampleRate: 8000,
ChannelCount: 1,
}}

desc.Medias = append(desc.Medias, extraMedia)

stream := c.s.setStreamReady(desc)
defer c.s.setStreamUnready()

Expand Down
52 changes: 52 additions & 0 deletions examples/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package main

import (
"log"
"net"
"sync"

"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
)

type server struct {
Expand Down Expand Up @@ -52,6 +55,21 @@ func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("describe request")

// Check for backchannel
requestBackchannel := false
for _, value := range ctx.Request.Header["Require"] {
if value == "www.onvif.org/ver20/backchannel" {
requestBackchannel = true
}
}

if requestBackchannel {
log.Printf("backchanel requested")
}

// HACK!!!
// Can we add in the extra Media description only when backchannel is requested?

s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down Expand Up @@ -90,6 +108,40 @@ func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response
func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("play request")

// assign the OnPacketRTPAny callback function so we can receive the backchannel audio
ctx.Session.OnPacketRTPAny(func(medi *description.Media, format format.Format, pkt *rtp.Packet) {
log.Printf("Got %s RTP data from VMS at %d size %d", format.Codec(), pkt.Timestamp, len(pkt.Payload))

// Marshal the RTP Packet back into bytes and send via UDP
bytes, err := pkt.Marshal()
if err != nil {
return
}

// Send the RTP Payload from Port 9002 to destination port 127.0.0.1:9004 (note port numbers must be Even for RTP)
// Play the audio with
// gst-launch-1.0 udpsrc port=9004 caps=application/x-rtp ! queue ! rtppcmudepay ! autoaudiosink sync=false
// or with
// ffplay -reorder_queue_size 0 rtp://0.0.0.0:9004 (use reorder_queue_size because if RTP packets stop then resume, FFMPEG complains RTP packets are too late)
//
conn, err := net.ListenPacket("udp", ":9002") // TODO - keep this open
if err != nil {
return
}

defer conn.Close()

dst, err := net.ResolveUDPAddr("udp", "127.0.0.1:9004")
if err != nil {
return
}

_, err = conn.WriteTo(bytes, dst)
if err != nil {
return
}
})

return &base.Response{
StatusCode: base.StatusOK,
}, nil
Expand Down
4 changes: 3 additions & 1 deletion server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,9 @@ func (ss *ServerSession) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
func (ss *ServerSession) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) {
sm := ss.setuppedMedias[medi]
st := sm.formats[forma.PayloadType()]
st.onPacketRTP = cb
if st != nil {
st.onPacketRTP = cb // RJH HACK. Don't understand. I think this function is called when RTP comes in from the upstream rtsp camera AND also when backchannel data comes in
}
}

// OnPacketRTCP sets the callback that is called when a RTCP packet is read.
Expand Down
2 changes: 1 addition & 1 deletion server_session_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type serverSessionFormat struct {
}

func (sf *serverSessionFormat) start() {
if sf.sm.ss.state != ServerSessionStatePlay {
if sf.sm.ss.state != ServerSessionStatePlay /* Do this mean ServerSessionStateRecord */ || (sf.sm.ss.state == ServerSessionStatePlay && sf.sm.media.IsBackChannel) {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.udpReorderer = rtpreorderer.New()
} else {
Expand Down
46 changes: 43 additions & 3 deletions server_session_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ type serverSessionMedia struct {
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
formats map[uint8]*serverSessionFormat // record only
formats map[uint8]*serverSessionFormat // record or backchannel
writePacketRTPInQueue func([]byte)
writePacketRTCPInQueue func([]byte)
}

func (sm *serverSessionMedia) initialize() {
if sm.ss.state == ServerSessionStatePreRecord {
if sm.ss.state == ServerSessionStatePreRecord || (sm.ss.state == ServerSessionStatePrePlay && sm.media.IsBackChannel) { // RJH HACK
sm.formats = make(map[uint8]*serverSessionFormat)
for _, forma := range sm.media.Formats {
sm.formats[forma.PayloadType()] = &serverSessionFormat{
Expand Down Expand Up @@ -60,7 +60,9 @@ func (sm *serverSessionMedia) start() {
if sm.ss.state == ServerSessionStatePlay {
// firewall opening is performed with RTCP sender reports generated by ServerStream

// readers can send RTCP packets only
if sm.media.IsBackChannel {
sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readRTPUDPPlay) // back channel
}
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readRTCPUDPPlay)
} else {
// open the firewall by sending empty packets to the counterpart.
Expand Down Expand Up @@ -161,6 +163,44 @@ func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error {
return nil
}

func (sm *serverSessionMedia) readRTPUDPPlay(payload []byte) {
plen := len(payload)

// HACK!!!
// Hack. I received some small RTP packets. Perhaps these are NAT punchthrough packets.
// They made gortsplib report problems with SSRC
// Ignore these small packets
// HACK!!!
if plen < 100 {
return
}

atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))

if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return
}

pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
return
}

forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return
}

now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())

forma.readRTPUDP(pkt, now)
}

func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) {
plen := len(payload)

Expand Down