Skip to content

Commit

Permalink
Fix test fail.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Nov 16, 2023
1 parent 975ef2f commit a2d393a
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 102 deletions.
92 changes: 5 additions & 87 deletions trunk/3rdparty/srs-bench/srs/rtmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ import (
"testing"
"time"

"github.com/pkg/errors"

"github.com/ossrs/go-oryx-lib/avc"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/go-oryx-lib/rtmp"
"github.com/pion/interceptor"
"github.com/pkg/errors"
)

func TestRtmpPublishPlay(t *testing.T) {
Expand Down Expand Up @@ -560,90 +559,6 @@ func TestRtmpPublish_HttpFlvPlayNoVideo(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)

var r0, r1 error
err := func() error {
publisher := NewRTMPPublisher()
defer publisher.Close()

// Set publisher to drop video.
publisher.hasVideo = false

player := NewFLVPlayer()
defer player.Close()

// Connect to RTMP URL.
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
flvUrl := fmt.Sprintf("http://%v/live/%v.flv", *srsHttpServer, streamSuffix)

if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}

if err := player.Play(ctx, flvUrl); err != nil {
return err
}

// Check packets.
var wg sync.WaitGroup
defer wg.Wait()

publisherReady, publisherReadyCancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
logger.Wf(ctx, "wait for publisher to push sequence header")
time.Sleep(1000 * time.Millisecond) // Wait for publisher to push sequence header.
publisherReadyCancel()
}()

wg.Add(1)
go func() {
defer wg.Done()

var nnPackets int
player.onRecvHeader = func(hasAudio, hasVideo bool) error {
return nil
}
player.onRecvTag = func(tp flv.TagType, size, ts uint32, tag []byte) error {
if tp == flv.TagTypeVideo {
return errors.New("should no video")
}
logger.Tf(ctx, "got %v tag, %v %vms %vB", nnPackets, tp, ts, len(tag))
if nnPackets += 1; nnPackets > 50 {
cancel()
}
return nil
}
if r1 = player.Consume(ctx); r1 != nil {
cancel()
}
}()

wg.Add(1)
go func() {
<-publisherReady.Done()
defer wg.Done()
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(1 * time.Millisecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
cancel()
}
}()

return nil
}()
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}

func TestRtmpPublish_HttpFlvPlayNoVideo_PublishFirst(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)

var r0, r1 error
err := func() error {
publisher := NewRTMPPublisher()
Expand Down Expand Up @@ -708,7 +623,10 @@ func TestRtmpPublish_HttpFlvPlayNoVideo_PublishFirst(t *testing.T) {
go func() {
defer wg.Done()
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(1 * time.Millisecond)
// Note that must greater than the cost of ffmpeg-opus, which is about 4ms, otherwise,
// the publisher will always get audio frames to transcode and won't accept new players
// connection and finally failed the case.
time.Sleep(5 * time.Millisecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
Expand Down
4 changes: 2 additions & 2 deletions trunk/ide/srs_clion/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ ProcessorCount(JOBS)
# We should always configure SRS for switching between branches.
IF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
EXECUTE_PROCESS(
COMMAND ./configure --osx --srt=on --gb28181=on --apm=on --h265=on --utest=on --ffmpeg-opus=on --jobs=${JOBS}
COMMAND ./configure --osx --srt=on --gb28181=on --apm=on --h265=on --utest=on --ffmpeg-opus=off --jobs=${JOBS}
WORKING_DIRECTORY ${SRS_DIR} RESULT_VARIABLE ret)
ELSE ()
EXECUTE_PROCESS(
COMMAND ./configure --srt=on --gb28181=on --apm=on --h265=on --utest=on --ffmpeg-opus=on --jobs=${JOBS}
COMMAND ./configure --srt=on --gb28181=on --apm=on --h265=on --utest=on --ffmpeg-opus=off --jobs=${JOBS}
WORKING_DIRECTORY ${SRS_DIR} RESULT_VARIABLE ret)
ENDIF ()
if(NOT ret EQUAL 0)
Expand Down
1 change: 0 additions & 1 deletion trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
return srs_error_wrap(err, "consumer dump packets");
}

srs_trace("@trace # FLV %s, dump count=%d", entry->pattern.c_str(), count);
// TODO: FIXME: Support merged-write wait.
if (count <= 0) {
// Directly use sleep, donot use consumer wait, because we couldn't awake consumer.
Expand Down
11 changes: 0 additions & 11 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,11 @@ srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc,

if (!atc) {
if ((err = jitter->correct(msg, ag)) != srs_success) {
srs_trace("@trace # source %s enque correct failed, timestamp=%ld", source->req->get_stream_url().c_str(), shared_msg->timestamp);
return srs_error_wrap(err, "consume message");
}
}

srs_trace("@trace # source %s enque, timestamp=%ld", source->req->get_stream_url().c_str(), shared_msg->timestamp);
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
srs_trace("@trace # source %s enque failed, timestamp=%ld", source->req->get_stream_url().c_str(), shared_msg->timestamp);
return srs_error_wrap(err, "enqueue message");
}

Expand Down Expand Up @@ -2233,8 +2230,6 @@ srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio)
{
srs_error_t err = srs_success;

srs_trace("@trace # source %s recv audio, timestamp=%ld", req->get_stream_url().c_str(), shared_audio->header.timestamp);

// Detect where stream is monotonically increasing.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) {
Expand Down Expand Up @@ -2300,7 +2295,6 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
// Ignore if no format->acodec, it means the codec is not parsed, or unsupport/unknown codec
// such as G.711 codec
if (!format_->acodec) {
srs_trace("@trace # %s:%d", __func__, __LINE__);
return err;
}

Expand All @@ -2318,21 +2312,18 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)

// Copy to hub to all utilities.
if ((err = hub->on_audio(msg)) != srs_success) {
srs_trace("@trace # %s:%d", __func__, __LINE__);
return srs_error_wrap(err, "consume audio");
}

// For bridge to consume the message.
if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) {
srs_trace("@trace # %s:%d", __func__, __LINE__);
return srs_error_wrap(err, "bridge consume audio");
}

// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsLiveConsumer* consumer = consumers.at(i);
srs_trace("@trace # source %s recv audio, enque to consumer, timestamp=%ld", req->get_stream_url().c_str(), msg->timestamp);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
Expand Down Expand Up @@ -2373,8 +2364,6 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video)
{
srs_error_t err = srs_success;

srs_trace("@trace # source %s recv video, timestamp=%ld", req->get_stream_url().c_str(), shared_video->header.timestamp);

// Detect where stream is monotonically increasing.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ extern SrsLiveSourceManager* _srs_sources;
class SrsLiveSource : public ISrsReloadHandler
{
friend class SrsOriginHub;
public:
private:
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
Expand Down

0 comments on commit a2d393a

Please sign in to comment.