From 81ad021599f586248e9f2c74150abfbbeec33a88 Mon Sep 17 00:00:00 2001 From: zhanghe2 <zhanghe2@hongxin.com> Date: Fri, 22 Dec 2023 15:14:32 +0800 Subject: [PATCH] add video support --- pkg/media/h264/h264.go | 48 ++++++++++++++++++++++++++++ pkg/sip/inbound.go | 69 ++++++++++++++++++++++++++++------------- pkg/sip/outbound.go | 57 +++++++++++++++++++++++----------- pkg/sip/room.go | 58 +++++++++++++++++++++++++--------- pkg/sip/service_test.go | 2 +- pkg/sip/signaling.go | 38 +++++++++++++++++++---- 6 files changed, 212 insertions(+), 60 deletions(-) create mode 100644 pkg/media/h264/h264.go diff --git a/pkg/media/h264/h264.go b/pkg/media/h264/h264.go new file mode 100644 index 00000000..b0d6c738 --- /dev/null +++ b/pkg/media/h264/h264.go @@ -0,0 +1,48 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package h264 + +import ( + m "github.com/livekit/sip/pkg/media" + "github.com/pion/webrtc/v3/pkg/media" + "time" +) + +type Sample []byte + +type Encoder struct { + w m.Writer[Sample] + buf Sample +} + +func (e *Encoder) WriteSample(in Sample) error { + return e.w.WriteSample(in) +} + +func Encode(w m.Writer[Sample]) m.Writer[Sample] { + return &Encoder{w: w} +} + +type SampleWriter interface { + WriteSample(sample media.Sample) error +} + +func BuildSampleWriter[T ~[]byte](w SampleWriter, sampleDur time.Duration) m.Writer[T] { + return m.WriterFunc[T](func(in T) error { + data := make([]byte, len(in)) + copy(data, in) + return w.WriteSample(media.Sample{Data: data}) + }) +} diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 759a212b..3d58a3bf 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -17,6 +17,7 @@ package sip import ( "context" "fmt" + "github.com/livekit/sip/pkg/media/h264" "sync/atomic" "time" @@ -54,7 +55,6 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr h := req.GetHeader("Proxy-Authorization") if h == nil { - logger.Infow(fmt.Sprintf("Requesting inbound auth for %s", from), "from", from) inviteState.challenge = digest.Challenge{ Realm: UserAgent, Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()), @@ -117,14 +117,12 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { src := req.Source() s.mon.InviteReq(false, from.Address.String(), to.Address.String()) - logger.Infow(fmt.Sprintf("INVITE from %q to %q", from.Address.String(), to.Address.String()), - "tag", tag, "from", from, "to", to) + logger.Infow("INVITE", "tag", tag, "from", from, "to", to) username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src) if err != nil { s.mon.InviteError(false, from.Address.String(), to.Address.String(), "no-rule") - logger.Warnw(fmt.Sprintf("Rejecting inbound call to %q, doesn't match any Trunks", to.Address.String()), err, - "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host) + logger.Warnw("Rejecting inbound call, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host) sipErrorResponse(tx, req) return } @@ -166,8 +164,10 @@ type inboundCall struct { from *sip.FromHeader to *sip.ToHeader src string - rtpConn *MediaConn + audioRtpConn *MediaConn + videoRtpConn *MediaConn audioHandler atomic.Pointer[rtp.Handler] + videoHandler atomic.Pointer[rtp.Handler] dtmf chan byte // buffered; DTMF digits as characters lkRoom *Room // LiveKit room; only active after correct pin is entered done atomic.Bool @@ -256,12 +256,20 @@ func (c *inboundCall) sendBye() { } func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answerData []byte, _ error) { - conn := NewMediaConn() - conn.OnRTP(c) - if err := conn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil { + audioConn := NewMediaConn() + audioConn.OnRTP(c) + if err := audioConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil { return nil, err } - c.rtpConn = conn + c.audioRtpConn = audioConn + + videoConn := NewMediaConn() + videoConn.OnRTP(c) + if err := videoConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil { + return nil, err + } + c.videoRtpConn = videoConn + offer := sdp.SessionDescription{} if err := offer.Unmarshal(offerData); err != nil { return nil, err @@ -269,10 +277,13 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe // Encoding pipeline (LK -> SIP) // Need to be created earlier to send the pin prompts. - s := rtp.NewMediaStreamOut[ulaw.Sample](conn, rtpPacketDur) - c.lkRoom.SetOutput(ulaw.Encode(s)) + aus := rtp.NewMediaStreamOut[ulaw.Sample](audioConn, rtpPacketDur) + c.lkRoom.SetAudioOutput(ulaw.Encode(aus)) + + vis := rtp.NewMediaStreamOut[h264.Sample](videoConn, rtpPacketDur) + c.lkRoom.SetVideoOutput(h264.Encode(vis)) - return sdpGenerateAnswer(offer, c.s.signalingIp, conn.LocalAddr().Port) + return sdpGenerateAnswer(offer, c.s.signalingIp, audioConn.LocalAddr().Port, videoConn.LocalAddr().Port) } func (c *inboundCall) pinPrompt(ctx context.Context) { @@ -343,9 +354,13 @@ func (c *inboundCall) closeMedia() { p.Close() c.lkRoom = nil } - if c.rtpConn != nil { - c.rtpConn.Close() - c.rtpConn = nil + if c.audioRtpConn != nil { + c.audioRtpConn.Close() + c.audioRtpConn = nil + } + if c.videoRtpConn != nil { + c.videoRtpConn.Close() + c.videoRtpConn = nil } close(c.dtmf) } @@ -356,9 +371,17 @@ func (c *inboundCall) HandleRTP(p *rtp.Packet) error { return nil } // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? - if h := c.audioHandler.Load(); h != nil { - return (*h).HandleRTP(p) + //logger.Infow("------------ pt == ", p.PayloadType) + if p.PayloadType < 96 { + if h := c.audioHandler.Load(); h != nil { + return (*h).HandleRTP(p) + } + } else { + if h := c.videoHandler.Load(); h != nil { + return (*h).HandleRTP(p) + } } + return nil } @@ -372,17 +395,21 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, roomName, pa if err != nil { return err } - local, err := c.lkRoom.NewParticipant() + audioTrack, videoTrack, err := c.lkRoom.NewParticipant() if err != nil { _ = c.lkRoom.Close() return err } - // Decoding pipeline (SIP -> LK) - law := ulaw.Decode(local) + // Decoding pipeline (SIP -> LK) audio + law := ulaw.Decode(audioTrack) + //构造NewMediaStreamIn 通过readLoop方法传入rtp byte数据,通过law decode为int16数据传给local (pw) var h rtp.Handler = rtp.NewMediaStreamIn(law) c.audioHandler.Store(&h) + var vh rtp.Handler = rtp.NewMediaStreamIn(videoTrack) + c.videoHandler.Store(&vh) + return nil } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 1b6c7294..8f73f19e 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -17,6 +17,7 @@ package sip import ( "context" "fmt" + "github.com/livekit/sip/pkg/media/h264" "sync" "github.com/emiago/sipgo/sip" @@ -40,13 +41,15 @@ type sipOutboundConfig struct { type outboundCall struct { c *Client participantID string - rtpConn *MediaConn + audioRtpConn *MediaConn + videoRtpConn *MediaConn mu sync.RWMutex mediaRunning bool lkCur lkRoomConfig lkRoom *Room - lkRoomIn media.Writer[media.PCM16Sample] + lkRoomAudioIn media.Writer[media.PCM16Sample] + lkRoomVideoIn media.Writer[h264.Sample] sipCur sipOutboundConfig sipInviteReq *sip.Request sipInviteResp *sip.Response @@ -79,7 +82,8 @@ func (c *Client) newCall(participantId string) *outboundCall { call := &outboundCall{ c: c, participantID: participantId, - rtpConn: NewMediaConn(), + audioRtpConn: NewMediaConn(), + videoRtpConn: NewMediaConn(), } return call } @@ -92,11 +96,14 @@ func (c *outboundCall) Close() error { } func (c *outboundCall) close() { - c.rtpConn.OnRTP(nil) - c.lkRoom.SetOutput(nil) + c.audioRtpConn.OnRTP(nil) + c.videoRtpConn.OnRTP(nil) + c.lkRoom.SetAudioOutput(nil) + c.lkRoom.SetVideoOutput(nil) if c.mediaRunning { - _ = c.rtpConn.Close() + _ = c.audioRtpConn.Close() + _ = c.videoRtpConn.Close() } c.mediaRunning = false @@ -104,7 +111,8 @@ func (c *outboundCall) close() { _ = c.lkRoom.Close() } c.lkRoom = nil - c.lkRoomIn = nil + c.lkRoomAudioIn = nil + c.lkRoomVideoIn = nil c.lkCur = lkRoomConfig{} c.stopSIP() @@ -155,7 +163,10 @@ func (c *outboundCall) startMedia(conf *config.Config) error { if c.mediaRunning { return nil } - if err := c.rtpConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil { + if err := c.audioRtpConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil { + return err + } + if err := c.videoRtpConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil { return err } c.mediaRunning = true @@ -169,19 +180,21 @@ func (c *outboundCall) updateRoom(lkNew lkRoomConfig) error { if c.lkRoom != nil { _ = c.lkRoom.Close() c.lkRoom = nil - c.lkRoomIn = nil + c.lkRoomAudioIn = nil + c.lkRoomVideoIn = nil } r, err := ConnectToRoom(c.c.conf, lkNew.roomName, lkNew.identity) if err != nil { return err } - local, err := r.NewParticipant() + audioTrack, videoTrack, err := r.NewParticipant() if err != nil { _ = r.Close() return err } c.lkRoom = r - c.lkRoomIn = local + c.lkRoomAudioIn = audioTrack + c.lkRoomVideoIn = videoTrack c.lkCur = lkNew return nil } @@ -201,17 +214,25 @@ func (c *outboundCall) updateSIP(sipNew sipOutboundConfig) error { func (c *outboundCall) relinkMedia() { if c.lkRoom == nil || !c.mediaRunning { - c.lkRoom.SetOutput(nil) - c.rtpConn.OnRTP(nil) + c.lkRoom.SetAudioOutput(nil) + c.lkRoom.SetVideoOutput(nil) + c.audioRtpConn.OnRTP(nil) + c.videoRtpConn.OnRTP(nil) return } // Encoding pipeline (LK -> SIP) - s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur) - c.lkRoom.SetOutput(ulaw.Encode(s)) + aus := rtp.NewMediaStreamOut[ulaw.Sample](c.audioRtpConn, rtpPacketDur) + c.lkRoom.SetAudioOutput(ulaw.Encode(aus)) + + vis := rtp.NewMediaStreamOut[h264.Sample](c.videoRtpConn, rtpPacketDur) + c.lkRoom.SetVideoOutput(h264.Encode(vis)) // Decoding pipeline (SIP -> LK) - law := ulaw.Decode(c.lkRoomIn) - c.rtpConn.OnRTP(rtp.NewMediaStreamIn(law)) + law := ulaw.Decode(c.lkRoomAudioIn) + c.audioRtpConn.OnRTP(rtp.NewMediaStreamIn(law)) + + var vh rtp.Handler = rtp.NewMediaStreamIn(c.lkRoomVideoIn) + c.videoRtpConn.OnRTP(vh) } func (c *outboundCall) SendDTMF(ctx context.Context, digits string) error { @@ -250,7 +271,7 @@ func (c *outboundCall) stopSIP() { } func (c *outboundCall) sipSignal(conf sipOutboundConfig) error { - offer, err := sdpGenerateOffer(c.c.signalingIp, c.rtpConn.LocalAddr().Port) + offer, err := sdpGenerateOffer(c.c.signalingIp, c.audioRtpConn.LocalAddr().Port, c.videoRtpConn.LocalAddr().Port) if err != nil { return err } diff --git a/pkg/sip/room.go b/pkg/sip/room.go index dadfbcf9..3f41fd41 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -16,6 +16,7 @@ package sip import ( "context" + "github.com/livekit/sip/pkg/media/h264" "github.com/livekit/protocol/logger" lksdk "github.com/livekit/server-sdk-go" @@ -31,7 +32,8 @@ import ( type Room struct { room *lksdk.Room mix *mixer.Mixer - out media.SwitchWriter[media.PCM16Sample] + audioOut media.SwitchWriter[media.PCM16Sample] + videoOut media.SwitchWriter[h264.Sample] identity string } @@ -42,7 +44,7 @@ type lkRoomConfig struct { func NewRoom() *Room { r := &Room{} - r.mix = mixer.NewMixer(&r.out, sampleRate) + r.mix = mixer.NewMixer(&r.audioOut, sampleRate) return r } @@ -102,15 +104,24 @@ func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room return r, nil } -func (r *Room) Output() media.Writer[media.PCM16Sample] { - return r.out.Get() +func (r *Room) AudioOutput() media.Writer[media.PCM16Sample] { + return r.audioOut.Get() } -func (r *Room) SetOutput(out media.Writer[media.PCM16Sample]) { +func (r *Room) VideoOutput() media.Writer[h264.Sample] { return r.videoOut.Get() } + +func (r *Room) SetAudioOutput(out media.Writer[media.PCM16Sample]) { if r == nil { return } - r.out.Set(out) + r.audioOut.Set(out) +} + +func (r *Room) SetVideoOutput(out media.Writer[h264.Sample]) { + if r == nil { + return + } + r.videoOut.Set(out) } func (r *Room) Close() error { @@ -125,22 +136,41 @@ func (r *Room) Close() error { return nil } -func (r *Room) NewParticipant() (media.Writer[media.PCM16Sample], error) { - track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") +func (r *Room) NewParticipant() (media.Writer[media.PCM16Sample], media.Writer[h264.Sample], error) { + audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, r.identity+"-audio", r.identity+"-audio-pion") if err != nil { - return nil, err + return nil, nil, err + } + + videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, r.identity+"-video", r.identity+"-video-pion") + if err != nil { + return nil, nil, err } - if _, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + + if _, err = r.room.LocalParticipant.PublishTrack(audioTrack, &lksdk.TrackPublicationOptions{ Name: r.identity, }); err != nil { - return nil, err + return nil, nil, err } - ow := media.FromSampleWriter[opus.Sample](track, sampleDur) + + if _, err = r.room.LocalParticipant.PublishTrack(videoTrack, &lksdk.TrackPublicationOptions{ + Name: r.identity + "-track", + VideoWidth: 1280, + VideoHeight: 720, + }); err != nil { + return nil, nil, err + } + + //ow方法 构造写入opus encode后的字节数据方法,并通过audioTrack写入livekit + ow := media.FromSampleWriter[opus.Sample](audioTrack, sampleDur) + //pw方法 构造输入int16 opus encode后输出[]byte数据 pw, err := opus.Encode(ow, sampleRate, channels) if err != nil { - return nil, err + return nil, nil, err } - return pw, nil + + vw := h264.BuildSampleWriter[h264.Sample](videoTrack, sampleDur) + return pw, vw, nil } func (r *Room) NewTrack() *Track { diff --git a/pkg/sip/service_test.go b/pkg/sip/service_test.go index 6a778a94..e1729fb0 100644 --- a/pkg/sip/service_test.go +++ b/pkg/sip/service_test.go @@ -64,7 +64,7 @@ func TestService_AuthFailure(t *testing.T) { sipClient, err := sipgo.NewClient(sipUserAgent) require.NoError(t, err) - offer, err := sdpGenerateOffer(localIP, 0xB0B) + offer, err := sdpGenerateOffer(localIP, 0xB0B, 0xB0B) require.NoError(t, err) inviteRecipent := &sip.Uri{User: expectedToUser, Host: sipServerAddress} diff --git a/pkg/sip/signaling.go b/pkg/sip/signaling.go index 30fd81d0..9f1d79dc 100644 --- a/pkg/sip/signaling.go +++ b/pkg/sip/signaling.go @@ -20,7 +20,7 @@ import ( "github.com/pion/sdp/v2" ) -func sdpMediaDesc(rtpListenerPort int) []*sdp.MediaDescription { +func sdpMediaDesc(audioListenerPort int, videoListenerPort int) []*sdp.MediaDescription { // Static compiler check for sample rate hardcoded below. var _ = [1]struct{}{}[8000-sampleRate] @@ -28,7 +28,7 @@ func sdpMediaDesc(rtpListenerPort int) []*sdp.MediaDescription { { MediaName: sdp.MediaName{ Media: "audio", - Port: sdp.RangedPort{Value: rtpListenerPort}, + Port: sdp.RangedPort{Value: audioListenerPort}, Protos: []string{"RTP", "AVP"}, Formats: []string{"0", "101"}, }, @@ -40,11 +40,36 @@ func sdpMediaDesc(rtpListenerPort int) []*sdp.MediaDescription { {Key: "maxptime", Value: "150"}, {Key: "sendrecv"}, }, + }, { + MediaName: sdp.MediaName{ + Media: "video", + Port: sdp.RangedPort{Value: videoListenerPort}, + Protos: []string{"RTP", "AVP"}, + Formats: []string{"102","97","125"}, + }, + Attributes: []sdp.Attribute{ + {Key: "rtpmap", Value: "102 H264/90000"}, + {Key: "fmtp", Value: "102 profile-level-id=42001f"}, + {Key: "rtpmap", Value: "97 H264/90000"}, + {Key: "fmtp", Value: "97 profile-level-id=42801F"}, + /*{Key: "rtpmap", Value: "104 H264/90000"}, + {Key: "fmtp", Value: "104 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f"}, + {Key: "rtpmap", Value: "106 H264/90000"}, + {Key: "fmtp", Value: "106 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"}, + {Key: "rtpmap", Value: "108 H264/90000"}, + {Key: "fmtp", Value: "108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f"}, + {Key: "rtpmap", Value: "112 H264/90000"}, + {Key: "fmtp", Value: "112 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=64001f"},*/ + {Key: "rtpmap", Value: "125 H264/90000"}, + {Key: "fmtp", Value: "125 profile-level-id=42801E;packetization-mode=0"}, + /*{Key: "rtpmap", Value: "127 H264/90000"}, + {Key: "fmtp", Value: "127 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=4d001f"},*/ + }, }, } } -func sdpGenerateOffer(publicIp string, rtpListenerPort int) ([]byte, error) { +func sdpGenerateOffer(publicIp string, audioRtpListenerPort int, videoRtpListenerPort int) ([]byte, error) { sessId := rand.Uint64() // TODO: do we need to track these? answer := sdp.SessionDescription{ @@ -71,13 +96,13 @@ func sdpGenerateOffer(publicIp string, rtpListenerPort int) ([]byte, error) { }, }, }, - MediaDescriptions: sdpMediaDesc(rtpListenerPort), + MediaDescriptions: sdpMediaDesc(audioRtpListenerPort, videoRtpListenerPort), } return answer.Marshal() } -func sdpGenerateAnswer(offer sdp.SessionDescription, publicIp string, rtpListenerPort int) ([]byte, error) { +func sdpGenerateAnswer(offer sdp.SessionDescription, publicIp string, audioListenerPort int, videoListenerPort int) ([]byte, error) { answer := sdp.SessionDescription{ Version: 0, @@ -103,8 +128,9 @@ func sdpGenerateAnswer(offer sdp.SessionDescription, publicIp string, rtpListene }, }, }, - MediaDescriptions: sdpMediaDesc(rtpListenerPort), + MediaDescriptions: sdpMediaDesc(audioListenerPort, videoListenerPort), } return answer.Marshal() } +