From 15a94279a9e3bae491e56a07573c2fe71029f363 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 19 Jun 2024 17:40:57 +0800 Subject: [PATCH] SmartPtr: Support RTC reconnect load test. --- trunk/3rdparty/srs-bench/README.md | 9 +++ trunk/3rdparty/srs-bench/srs/publisher.go | 21 +++++- trunk/3rdparty/srs-bench/srs/srs.go | 13 +++- trunk/src/app/srs_app_rtc_source.cpp | 91 +++++++++++++++++++---- trunk/src/app/srs_app_rtc_source.hpp | 18 ++++- trunk/src/app/srs_app_server.cpp | 11 ++- 6 files changed, 134 insertions(+), 29 deletions(-) diff --git a/trunk/3rdparty/srs-bench/README.md b/trunk/3rdparty/srs-bench/README.md index 359d8c7059..8bda975415 100644 --- a/trunk/3rdparty/srs-bench/README.md +++ b/trunk/3rdparty/srs-bench/README.md @@ -144,6 +144,15 @@ for ((i=0;;i++)); do done ``` +WebRTC重连测试: + +```bash +for ((i=0;;i++)); do + ./objs/srs_bench -sfu=rtc -pr=webrtc://localhost/live${i}/livestream -sn=1000 -cap=true; + sleep 10; +done +``` + ## Regression Test 回归测试需要先启动[SRS](https://github.com/ossrs/srs/issues/307),支持WebRTC推拉流: diff --git a/trunk/3rdparty/srs-bench/srs/publisher.go b/trunk/3rdparty/srs-bench/srs/publisher.go index 8cf7f450ae..6ef381c7ee 100644 --- a/trunk/3rdparty/srs-bench/srs/publisher.go +++ b/trunk/3rdparty/srs-bench/srs/publisher.go @@ -34,7 +34,7 @@ import ( ) // @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go -func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error { +func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC, closeAfterPublished bool) error { ctx = logger.WithContext(ctx) logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v", @@ -77,10 +77,13 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i return nil, err } - if sourceAudio != "" { + // For CAP, we always add audio track, because both audio and video are disabled for CAP, which will + // cause failed when exchange SDP. + if sourceAudio != "" || closeAfterPublished { aIngester = newAudioIngester(sourceAudio) registry.Add(&rtpInteceptorFactory{aIngester.audioLevelInterceptor}) } + if sourceVideo != "" { vIngester = newVideoIngester(sourceVideo) registry.Add(&rtpInteceptorFactory{vIngester.markerInterceptor}) @@ -178,6 +181,7 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i // Wait for event from context or tracks. var wg sync.WaitGroup + defer wg.Wait() wg.Add(1) go func() { @@ -186,6 +190,18 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i doClose() // Interrupt the RTCP read. }() + // If CAP, directly close the connection after published. + if closeAfterPublished { + select { + case <-ctx.Done(): + case <-pcDoneCtx.Done(): + } + + logger.Tf(ctx, "Close connection after published") + cancel() + return nil + } + wg.Add(1) go func() { defer wg.Done() @@ -295,6 +311,5 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i } }() - wg.Wait() return nil } diff --git a/trunk/3rdparty/srs-bench/srs/srs.go b/trunk/3rdparty/srs-bench/srs/srs.go index 6bacaa5ce5..62f6f998b8 100644 --- a/trunk/3rdparty/srs-bench/srs/srs.go +++ b/trunk/3rdparty/srs-bench/srs/srs.go @@ -46,6 +46,8 @@ var clients, streams, delay int var statListen string +var closeAfterPublished bool + func Parse(ctx context.Context) { fl := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) @@ -71,6 +73,8 @@ func Parse(ctx context.Context) { fl.StringVar(&statListen, "stat", "", "") + fl.BoolVar(&closeAfterPublished, "cap", false, "") + fl.Usage = func() { fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0])) fmt.Println(fmt.Sprintf("Options:")) @@ -95,6 +99,7 @@ func Parse(ctx context.Context) { fmt.Println(fmt.Sprintf(" -fps [Optional] The fps of .h264 source file.")) fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty.")) fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty.")) + fmt.Println(fmt.Sprintf(" -cap Whether to close connection after publish. Default: false")) fmt.Println(fmt.Sprintf("\n例如,1个播放,1个推流:")) fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream", os.Args[0])) fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0])) @@ -118,7 +123,7 @@ func Parse(ctx context.Context) { if sr == "" && pr == "" { showHelp = true } - if pr != "" && (sourceAudio == "" && sourceVideo == "") { + if pr != "" && !closeAfterPublished && (sourceAudio == "" && sourceVideo == "") { showHelp = true } if showHelp { @@ -135,8 +140,8 @@ func Parse(ctx context.Context) { summaryDesc = fmt.Sprintf("%v, play(url=%v, da=%v, dv=%v, pli=%v)", summaryDesc, sr, dumpAudio, dumpVideo, pli) } if pr != "" { - summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v)", - summaryDesc, pr, sourceAudio, sourceVideo, fps) + summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v, cap=%v)", + summaryDesc, pr, sourceAudio, sourceVideo, fps, closeAfterPublished) } logger.Tf(ctx, "Run benchmark with %v", summaryDesc) @@ -271,7 +276,7 @@ func Run(ctx context.Context) error { gStatRTC.Publishers.Alive-- }() - if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC); err != nil { + if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC, closeAfterPublished); err != nil { if errors.Cause(err) != context.Canceled { logger.Wf(ctx, "Run err %+v", err) } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index b63322f3cf..21e53c1751 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -71,6 +71,9 @@ using namespace std; const int kRtpMaxPayloadSize = kRtpPacketSize - 300; #endif +// the time to cleanup source. +#define SRS_RTC_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS) + // TODO: Add this function into SrsRtpMux class. srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf) { @@ -244,11 +247,56 @@ void SrsRtcConsumer::on_stream_change(SrsRtcSourceDescription* desc) SrsRtcSourceManager::SrsRtcSourceManager() { lock = srs_mutex_new(); + timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS); } SrsRtcSourceManager::~SrsRtcSourceManager() { srs_mutex_destroy(lock); + srs_freep(timer_); +} + +srs_error_t SrsRtcSourceManager::initialize() +{ + return setup_ticks(); +} + +srs_error_t SrsRtcSourceManager::setup_ticks() +{ + srs_error_t err = srs_success; + + if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "timer"); + } + + return err; +} + +srs_error_t SrsRtcSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + std::map< std::string, SrsSharedPtr >::iterator it; + for (it = pool.begin(); it != pool.end();) { + SrsSharedPtr& source = it->second; + + // When source expired, remove it. + // @see https://github.com/ossrs/srs/issues/713 + if (source->stream_is_dead()) { + SrsContextId cid = source->source_id(); + if (cid.empty()) cid = source->pre_source_id(); + srs_trace("RTC: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size()); + pool.erase(it++); + } else { + ++it; + } + } + + return err; } srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr& pps) @@ -305,19 +353,6 @@ SrsSharedPtr SrsRtcSourceManager::fetch(SrsRequest* r) return source; } -void SrsRtcSourceManager::eliminate(SrsRequest* r) -{ - // Use lock to protect coroutine switch. - // @bug https://github.com/ossrs/srs/issues/1230 - SrsLocker(lock); - - string stream_url = r->get_stream_url(); - std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); - if (it != pool.end()) { - pool.erase(it); - } -} - SrsRtcSourceManager* _srs_rtc_sources = NULL; ISrsRtcPublishStream::ISrsRtcPublishStream() @@ -351,6 +386,7 @@ SrsRtcSource::SrsRtcSource() #endif pli_for_rtmp_ = pli_elapsed_ = 0; + stream_die_at_ = 0; } SrsRtcSource::~SrsRtcSource() @@ -384,6 +420,27 @@ srs_error_t SrsRtcSource::initialize(SrsRequest* r) return err; } +bool SrsRtcSource::stream_is_dead() +{ + // still publishing? + if (is_created_) { + return false; + } + + // has any consumers? + if (!consumers.empty()) { + return false; + } + + // Delay cleanup source. + srs_utime_t now = srs_get_system_time(); + if (now < stream_die_at_ + SRS_RTC_SOURCE_CLEANUP) { + return false; + } + + return true; +} + void SrsRtcSource::init_for_play_before_publishing() { // If the stream description has already been setup by RTC publisher, @@ -497,6 +554,8 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) consumer = new SrsRtcConsumer(this); consumers.push_back(consumer); + stream_die_at_ = 0; + // TODO: FIXME: Implements edge cluster. return err; @@ -530,7 +589,7 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer) // Destroy and cleanup source when no publishers and consumers. if (!is_created_ && consumers.empty()) { - _srs_rtc_sources->eliminate(req); + stream_die_at_ = srs_get_system_time(); } } @@ -633,8 +692,8 @@ void SrsRtcSource::on_unpublish() stat->on_stream_close(req); // Destroy and cleanup source when no publishers and consumers. - if (!is_created_ && consumers.empty()) { - _srs_rtc_sources->eliminate(req); + if (consumers.empty()) { + stream_die_at_ = srs_get_system_time(); } } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 00328527b0..e917316d59 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -111,14 +111,21 @@ class SrsRtcConsumer void on_stream_change(SrsRtcSourceDescription* desc); }; -class SrsRtcSourceManager +class SrsRtcSourceManager : public ISrsHourGlass { private: srs_mutex_t lock; std::map< std::string, SrsSharedPtr > pool; + SrsHourGlass* timer_; public: SrsRtcSourceManager(); virtual ~SrsRtcSourceManager(); +public: + virtual srs_error_t initialize(); +// interface ISrsHourGlass +private: + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); public: // create source when fetch from cache failed. // @param r the client request. @@ -127,9 +134,6 @@ class SrsRtcSourceManager public: // Get the exists source, NULL when not exists. virtual SrsSharedPtr fetch(SrsRequest* r); -public: - // Dispose and destroy the source. - virtual void eliminate(SrsRequest* r); }; // Global singleton instance. @@ -195,11 +199,17 @@ class SrsRtcSource : public ISrsFastTimer // The PLI for RTC2RTMP. srs_utime_t pli_for_rtmp_; srs_utime_t pli_elapsed_; +private: + // The last die time, while die means neither publishers nor players. + srs_utime_t stream_die_at_; public: SrsRtcSource(); virtual ~SrsRtcSource(); public: virtual srs_error_t initialize(SrsRequest* r); +public: + // Whether stream is dead, which is no publisher or player. + virtual bool stream_is_dead(); private: void init_for_play_before_publishing(); public: diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 828b3758c0..472b7114b3 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -40,6 +40,7 @@ using namespace std; #ifdef SRS_RTC #include #include +#include #endif #ifdef SRS_GB28181 #include @@ -809,12 +810,18 @@ srs_error_t SrsServer::start(SrsWaitGroup* wg) srs_error_t err = srs_success; if ((err = _srs_sources->initialize()) != srs_success) { - return srs_error_wrap(err, "sources"); + return srs_error_wrap(err, "live sources"); } #ifdef SRS_SRT if ((err = _srs_srt_sources->initialize()) != srs_success) { - return srs_error_wrap(err, "sources"); + return srs_error_wrap(err, "srt sources"); + } +#endif + +#ifdef SRS_RTC + if ((err = _srs_rtc_sources->initialize()) != srs_success) { + return srs_error_wrap(err, "rtc sources"); } #endif