diff --git a/examples/proxy/client.go b/examples/proxy/client.go index 7a9956c5..9cecacd9 100644 --- a/examples/proxy/client.go +++ b/examples/proxy/client.go @@ -12,7 +12,7 @@ import ( ) const ( - existingStream = "rtsp://x.x.x.x:8554/mystream" + existingStream = "rtsp://u37001:p37001@192.168.26.202/onvif-media/media.amp?profile=profile_1_h264&sessiontimeout=60&streamtype=unicast" reconnectPause = 2 * time.Second ) @@ -62,6 +62,22 @@ func (c *client) read() error { return err } + // HACK!!! + // HACK. Add backchannel into desc before it is passed to the Server with setStreamReady() + // HACK!!! + var extraMedia = new(description.Media) + extraMedia.IsBackChannel = true + extraMedia.Type = description.MediaTypeAudio + extraMedia.Control = "track=extrabackchannel" + extraMedia.Formats = []format.Format{&format.G711{ + PayloadTyp: 0, // 0 = MULAW 8 = ALAW + MULaw: true, + SampleRate: 8000, + ChannelCount: 1, + }} + + desc.Medias = append(desc.Medias, extraMedia) + stream := c.s.setStreamReady(desc) defer c.s.setStreamUnready() diff --git a/examples/proxy/server.go b/examples/proxy/server.go index 680817d4..f8ee423a 100644 --- a/examples/proxy/server.go +++ b/examples/proxy/server.go @@ -2,11 +2,14 @@ package main import ( "log" + "net" "sync" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/pion/rtp" ) type server struct { @@ -52,6 +55,21 @@ func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { log.Printf("describe request") + // Check for backchannel + requestBackchannel := false + for _, value := range ctx.Request.Header["Require"] { + if value == "www.onvif.org/ver20/backchannel" { + requestBackchannel = true + } + } + + if requestBackchannel { + log.Printf("backchanel requested") + } + + // HACK!!! + // Can we add in the extra Media description only when backchannel is requested? + s.mutex.Lock() defer s.mutex.Unlock() @@ -90,6 +108,40 @@ func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { log.Printf("play request") + // assign the OnPacketRTPAny callback function so we can receive the backchannel audio + ctx.Session.OnPacketRTPAny(func(medi *description.Media, format format.Format, pkt *rtp.Packet) { + log.Printf("Got %s RTP data from VMS at %d size %d", format.Codec(), pkt.Timestamp, len(pkt.Payload)) + + // Marshal the RTP Packet back into bytes and send via UDP + bytes, err := pkt.Marshal() + if err != nil { + return + } + + // Send the RTP Payload from Port 9002 to destination port 127.0.0.1:9004 (note port numbers must be Even for RTP) + // Play the audio with + // gst-launch-1.0 udpsrc port=9004 caps=application/x-rtp ! queue ! rtppcmudepay ! autoaudiosink sync=false + // or with + // ffplay -reorder_queue_size 0 rtp://0.0.0.0:9004 (use reorder_queue_size because if RTP packets stop then resume, FFMPEG complains RTP packets are too late) + // + conn, err := net.ListenPacket("udp", ":9002") // TODO - keep this open + if err != nil { + return + } + + defer conn.Close() + + dst, err := net.ResolveUDPAddr("udp", "127.0.0.1:9004") + if err != nil { + return + } + + _, err = conn.WriteTo(bytes, dst) + if err != nil { + return + } + }) + return &base.Response{ StatusCode: base.StatusOK, }, nil diff --git a/server_session.go b/server_session.go index affc7433..c6c7a113 100644 --- a/server_session.go +++ b/server_session.go @@ -1199,7 +1199,9 @@ func (ss *ServerSession) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) { func (ss *ServerSession) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) { sm := ss.setuppedMedias[medi] st := sm.formats[forma.PayloadType()] - st.onPacketRTP = cb + if st != nil { + st.onPacketRTP = cb // RJH HACK. Don't understand. I think this function is called when RTP comes in from the upstream rtsp camera AND also when backchannel data comes in + } } // OnPacketRTCP sets the callback that is called when a RTCP packet is read. diff --git a/server_session_format.go b/server_session_format.go index 7988dc80..cb0adf91 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -24,7 +24,7 @@ type serverSessionFormat struct { } func (sf *serverSessionFormat) start() { - if sf.sm.ss.state != ServerSessionStatePlay { + if sf.sm.ss.state != ServerSessionStatePlay /* Do this mean ServerSessionStateRecord */ || (sf.sm.ss.state == ServerSessionStatePlay && sf.sm.media.IsBackChannel) { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { sf.udpReorderer = rtpreorderer.New() } else { diff --git a/server_session_media.go b/server_session_media.go index 858d9114..e7030bd9 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -26,13 +26,13 @@ type serverSessionMedia struct { tcpRTPFrame *base.InterleavedFrame tcpRTCPFrame *base.InterleavedFrame tcpBuffer []byte - formats map[uint8]*serverSessionFormat // record only + formats map[uint8]*serverSessionFormat // record or backchannel writePacketRTPInQueue func([]byte) writePacketRTCPInQueue func([]byte) } func (sm *serverSessionMedia) initialize() { - if sm.ss.state == ServerSessionStatePreRecord { + if sm.ss.state == ServerSessionStatePreRecord || (sm.ss.state == ServerSessionStatePrePlay && sm.media.IsBackChannel) { // RJH HACK sm.formats = make(map[uint8]*serverSessionFormat) for _, forma := range sm.media.Formats { sm.formats[forma.PayloadType()] = &serverSessionFormat{ @@ -60,7 +60,9 @@ func (sm *serverSessionMedia) start() { if sm.ss.state == ServerSessionStatePlay { // firewall opening is performed with RTCP sender reports generated by ServerStream - // readers can send RTCP packets only + if sm.media.IsBackChannel { + sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readRTPUDPPlay) // back channel + } sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readRTCPUDPPlay) } else { // open the firewall by sending empty packets to the counterpart. @@ -161,6 +163,44 @@ func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error { return nil } +func (sm *serverSessionMedia) readRTPUDPPlay(payload []byte) { + plen := len(payload) + + // HACK!!! + // Hack. I received some small RTP packets. Perhaps these are NAT punchthrough packets. + // They made gortsplib report problems with SSRC + // Ignore these small packets + // HACK!!! + if plen < 100 { + return + } + + atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) + + if plen == (udpMaxPayloadSize + 1) { + sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) + return + } + + pkt := &rtp.Packet{} + err := pkt.Unmarshal(payload) + if err != nil { + sm.ss.onDecodeError(err) + return + } + + forma, ok := sm.formats[pkt.PayloadType] + if !ok { + sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + return + } + + now := sm.ss.s.timeNow() + atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) + + forma.readRTPUDP(pkt, now) +} + func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) { plen := len(payload)