Skip to content

Commit

Permalink
SmartPtr: Support shared ptr for RTC source.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 13, 2024
1 parent 242152b commit ca9b5de
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 86 deletions.
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
// Whether RTC stream is active.
bool is_rtc_stream_active = false;
if (true) {
SrsRtcSource* source = _srs_rtc_sources->fetch(ruc->req_);
is_rtc_stream_active = (source && !source->can_publish());
SrsSharedPtr<SrsRtcSource> source = _srs_rtc_sources->fetch(ruc->req_);
is_rtc_stream_active = (source.get() && !source->can_publish());
}

// For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728
Expand Down
43 changes: 18 additions & 25 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,12 @@ std::string SrsRtcAsyncCallOnStop::to_string()
return std::string("");
}

SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) : source_(new SrsRtcSource())
{
cid_ = cid;
trd_ = NULL;

req_ = NULL;
source_ = NULL;

is_started = false;
session_ = s;
Expand Down Expand Up @@ -485,7 +484,7 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, Srs
return srs_error_wrap(err, "rtc: stat client");
}

if ((err = _srs_rtc_sources->fetch_or_create(req_, &source_)) != srs_success) {
if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) {
return srs_error_wrap(err, "rtc fetch source failed");
}

Expand Down Expand Up @@ -642,11 +641,12 @@ srs_error_t SrsRtcPlayStream::cycle()
{
srs_error_t err = srs_success;

SrsRtcSource* source = source_;
SrsSharedPtr<SrsRtcSource>& source = source_;
srs_assert(source.get());

SrsRtcConsumer* consumer = NULL;
SrsAutoFree(SrsRtcConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
if ((err = source->create_consumer(source_, consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());
}

Expand Down Expand Up @@ -933,9 +933,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
{
srs_error_t err = srs_success;

// The source MUST exists, when PLI thread is running.
srs_assert(source_);

ISrsRtcPublishStream* publisher = source_->publish_stream();
if (!publisher) {
return err;
Expand Down Expand Up @@ -1076,7 +1073,7 @@ std::string SrsRtcAsyncCallOnUnpublish::to_string()
return std::string("");
}

SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) : source_(new SrsRtcSource())
{
cid_ = cid;
is_started = false;
Expand All @@ -1086,7 +1083,6 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
twcc_epp_ = new SrsErrorPithyPrint(3.0);

req_ = NULL;
source = NULL;
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
nack_no_copy_ = false;
Expand All @@ -1113,11 +1109,8 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(timer_rtcp_);
srs_freep(timer_twcc_);

// TODO: FIXME: Should remove and delete source.
if (source) {
source->set_publish_stream(NULL);
source->on_unpublish();
}
source_->set_publish_stream(NULL);
source_->on_unpublish();

for (int i = 0; i < (int)video_tracks_.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks_.at(i);
Expand Down Expand Up @@ -1203,10 +1196,10 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
}

// Setup the publish stream in source to enable PLI as such.
if ((err = _srs_rtc_sources->fetch_or_create(req_, &source)) != srs_success) {
if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->set_publish_stream(this);
source_->set_publish_stream(this);

// TODO: FIMXE: Check it in SrsRtcConnection::add_publisher?
SrsLiveSource *rtmp = _srs_sources->fetch(r);
Expand Down Expand Up @@ -1250,7 +1243,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
return srs_error_wrap(err, "create bridge");
}

source->set_bridge(bridge);
source_->set_bridge(bridge);
}
#endif

Expand All @@ -1265,7 +1258,7 @@ srs_error_t SrsRtcPublishStream::start()
return err;
}

if ((err = source->on_publish()) != srs_success) {
if ((err = source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "on publish");
}

Expand Down Expand Up @@ -1447,12 +1440,12 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff
SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc);
if (audio_track) {
pkt->frame_type = SrsFrameTypeAudio;
if ((err = audio_track->on_rtp(source, pkt)) != srs_success) {
if ((err = audio_track->on_rtp(source_, pkt)) != srs_success) {
return srs_error_wrap(err, "on audio");
}
} else if (video_track) {
pkt->frame_type = SrsFrameTypeVideo;
if ((err = video_track->on_rtp(source, pkt)) != srs_success) {
if ((err = video_track->on_rtp(source_, pkt)) != srs_success) {
return srs_error_wrap(err, "on video");
}
} else {
Expand Down Expand Up @@ -1956,8 +1949,8 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRtcUserConfig* ruc, SrsSdp& local
return srs_error_wrap(err, "generate local sdp");
}

SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

Expand Down Expand Up @@ -3056,8 +3049,8 @@ srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRtcUserConfig* ruc, s
// TODO: FIME: Should check packetization-mode=1 also.
bool has_42e01f = srs_sdp_has_h264_profile(remote_sdp, "42e01f");

SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "fetch rtc source");
}

Expand Down
5 changes: 3 additions & 2 deletions trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <srs_protocol_conn.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_async_call.hpp>
#include <srs_core_autofree.hpp>

#include <string>
#include <map>
Expand Down Expand Up @@ -217,7 +218,7 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
SrsRtcPLIWorker* pli_worker_;
private:
SrsRequest* req_;
SrsRtcSource* source_;
SrsSharedPtr<SrsRtcSource> source_;
// key: publish_ssrc, value: send track to process rtp/rtcp
std::map<uint32_t, SrsRtcAudioSendTrack*> audio_tracks_;
std::map<uint32_t, SrsRtcVideoSendTrack*> video_tracks_;
Expand Down Expand Up @@ -343,7 +344,7 @@ class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler
SrsErrorPithyPrint* pli_epp;
private:
SrsRequest* req_;
SrsRtcSource* source;
SrsSharedPtr<SrsRtcSource> source_;
// Simulators.
int nn_simulate_nack_drop;
private:
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ srs_error_t SrsRtcServer::create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sd

SrsRequest* req = ruc->req_;

SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

Expand Down
79 changes: 52 additions & 27 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ ISrsRtcSourceChangeCallback::~ISrsRtcSourceChangeCallback()
{
}

SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s)
SrsRtcConsumer::SrsRtcConsumer(SrsSharedPtr<SrsRtcSource> s)
{
source = s;
source_ = s;
should_update_source_id = false;
handler_ = NULL;

Expand All @@ -167,7 +167,7 @@ SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s)

SrsRtcConsumer::~SrsRtcConsumer()
{
source->on_consumer_destroy(this);
source_->on_consumer_destroy(this);

vector<SrsRtpPacket*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
Expand Down Expand Up @@ -205,7 +205,7 @@ srs_error_t SrsRtcConsumer::dump_packet(SrsRtpPacket** ppkt)
srs_error_t err = srs_success;

if (should_update_source_id) {
srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str());
srs_trace("update source_id=%s/%s", source_->source_id().c_str(), source_->pre_source_id().c_str());
should_update_source_id = false;
}

Expand Down Expand Up @@ -251,58 +251,73 @@ SrsRtcSourceManager::~SrsRtcSourceManager()
srs_mutex_destroy(lock);
}

srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps)
srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsRtcSource>& pps)
{
srs_error_t err = srs_success;

// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

SrsRtcSource* source = NULL;
if ((source = fetch(r)) != NULL) {
string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);

if (it != pool.end()) {
SrsSharedPtr<SrsRtcSource> source = it->second;

// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->update_auth(r);
*pps = source;
pps = source;

return err;
}

string stream_url = r->get_stream_url();
string vhost = r->vhost;

// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());

SrsSharedPtr<SrsRtcSource> source = SrsSharedPtr<SrsRtcSource>(new SrsRtcSource());
srs_trace("new rtc source, stream_url=%s", stream_url.c_str());

source = new SrsRtcSource();
if ((err = source->initialize(r)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}

pool[stream_url] = source;

*pps = source;
pps = source;

return err;
}

SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r)
SrsSharedPtr<SrsRtcSource> SrsRtcSourceManager::fetch(SrsRequest* r)
{
SrsRtcSource* source = NULL;
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);

source = pool[stream_url];
SrsSharedPtr<SrsRtcSource> source;
if (it == pool.end()) {
return source;
}

source = it->second;
return source;
}

void SrsRtcSourceManager::eliminate(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
}
}

SrsRtcSourceManager* _srs_rtc_sources = NULL;

ISrsRtcPublishStream::ISrsRtcPublishStream()
Expand Down Expand Up @@ -471,11 +486,11 @@ void SrsRtcSource::set_bridge(ISrsStreamBridge* bridge)
#endif
}

srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
srs_error_t SrsRtcSource::create_consumer(SrsSharedPtr<SrsRtcSource> source, SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;

consumer = new SrsRtcConsumer(this);
consumer = new SrsRtcConsumer(source);
consumers.push_back(consumer);

// TODO: FIXME: Implements edge cluster.
Expand Down Expand Up @@ -508,6 +523,11 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)
h->on_consumers_finished();
}
}

// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
}
}

bool SrsRtcSource::can_publish()
Expand Down Expand Up @@ -607,6 +627,11 @@ void SrsRtcSource::on_unpublish()

SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);

// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
}
}

void SrsRtcSource::subscribe(ISrsRtcSourceEventHandler* h)
Expand Down Expand Up @@ -2552,7 +2577,7 @@ void SrsRtcAudioRecvTrack::on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer
*ppt = SrsRtspPacketPayloadTypeRaw;
}

srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt)
srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsSharedPtr<SrsRtcSource>& source, SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -2611,7 +2636,7 @@ void SrsRtcVideoRecvTrack::on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer
}
}

srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt)
srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsSharedPtr<SrsRtcSource>& source, SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;

Expand Down
Loading

0 comments on commit ca9b5de

Please sign in to comment.