Skip to content

Commit

Permalink
SmartPtr: Use shared ptr for live source.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 14, 2024
1 parent d38af02 commit 88f9224
Show file tree
Hide file tree
Showing 15 changed files with 101 additions and 95 deletions.
16 changes: 10 additions & 6 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,11 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(trd);
}

srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r)
srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge* e, SrsRequest* r)
{
source_ = s;
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();

edge = e;
req = r;

Expand Down Expand Up @@ -747,9 +749,11 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
return queue->set_queue_size(queue_size);
}

srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r)
srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge* e, SrsRequest* r)
{
source_ = s;
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();

edge = e;
req = r;

Expand Down Expand Up @@ -956,7 +960,7 @@ SrsPlayEdge::~SrsPlayEdge()
srs_freep(ingester);
}

srs_error_t SrsPlayEdge::initialize(SrsLiveSource* source, SrsRequest* req)
srs_error_t SrsPlayEdge::initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -1048,7 +1052,7 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
return forwarder->set_queue_size(queue_size);
}

srs_error_t SrsPublishEdge::initialize(SrsLiveSource* source, SrsRequest* req)
srs_error_t SrsPublishEdge::initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req)
{
srs_error_t err = srs_success;

Expand Down
9 changes: 5 additions & 4 deletions trunk/src/app/srs_app_edge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <srs_core.hpp>

#include <srs_app_st.hpp>
#include <srs_core_autofree.hpp>

#include <string>

Expand Down Expand Up @@ -152,7 +153,7 @@ class SrsEdgeIngester : public ISrsCoroutineHandler
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
public:
virtual srs_error_t initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
virtual std::string get_curr_origin();
Expand Down Expand Up @@ -195,7 +196,7 @@ class SrsEdgeForwarder : public ISrsCoroutineHandler
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
// Interface ISrsReusableThread2Handler
Expand All @@ -220,7 +221,7 @@ class SrsPlayEdge
// Always use the req of source,
// For we assume all client to edge is invalid,
// if auth open, edge must valid it from origin, then service it.
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req);
// When client play stream on edge.
virtual srs_error_t on_client_play();
// When all client stopped play, disconnect to origin.
Expand All @@ -243,7 +244,7 @@ class SrsPublishEdge
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req);
virtual bool can_publish();
// When client publish stream on edge.
virtual srs_error_t on_client_publish();
Expand Down
14 changes: 7 additions & 7 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ srs_error_t SrsBufferCache::cycle()
return err;
}

SrsLiveSource* live_source = _srs_sources->fetch(req);
if (!live_source) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

Expand Down Expand Up @@ -661,8 +661,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK);

SrsLiveSource* live_source = _srs_sources->fetch(req);
if (!live_source) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

Expand Down Expand Up @@ -1136,11 +1136,11 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}
}

SrsLiveSource* live_source = NULL;
if ((err = _srs_sources->fetch_or_create(r, server, &live_source)) != srs_success) {
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(r, server, live_source)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(live_source != NULL);
srs_assert(live_source.get() != NULL);

bool enabled_cache = _srs_config->get_gop_cache(r->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void SrsQueueRecvThread::on_stop()
}

SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid)
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr<SrsLiveSource> source, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
rtmp = rtmp_sdk;
Expand Down
5 changes: 3 additions & 2 deletions trunk/src/app/srs_app_recv_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <srs_protocol_stream.hpp>
#include <srs_core_performance.hpp>
#include <srs_app_reload.hpp>
#include <srs_core_autofree.hpp>

class SrsRtmpServer;
class SrsCommonMessage;
Expand Down Expand Up @@ -146,15 +147,15 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler
srs_error_t recv_error;
SrsRtmpConn* _conn;
// The params for conn callback.
SrsLiveSource* source_;
SrsSharedPtr<SrsLiveSource> source_;
// The error timeout cond
srs_cond_t error;
// The merged context id.
SrsContextId cid;
SrsContextId ncid;
public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid);
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr<SrsLiveSource> source, SrsContextId parent_cid);
virtual ~SrsPublishRecvThread();
public:
// Wait for error for some timeout.
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa

// For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728
if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) {
SrsLiveSource* live_source = _srs_sources->fetch(ruc->req_);
if (live_source && !live_source->inactive()) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(ruc->req_);
if (live_source.get() && !live_source->inactive()) {
return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str());
}
}
Expand Down
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1202,8 +1202,8 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
source_->set_publish_stream(this);

// TODO: FIMXE: Check it in SrsRtcConnection::add_publisher?
SrsLiveSource* live_source = _srs_sources->fetch(r);
if (live_source && !live_source->can_publish(false)) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(r);
if (live_source.get() && !live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str());
}

Expand All @@ -1227,7 +1227,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
if (rtc_to_rtmp) {
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) {
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

Expand Down
22 changes: 11 additions & 11 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);

// find a source to serve.
SrsLiveSource* live_source = NULL;
if ((err = _srs_sources->fetch_or_create(req, server, &live_source)) != srs_success) {
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(req, server, live_source)) != srs_success) {
return srs_error_wrap(err, "rtmp: fetch source");
}
srs_assert(live_source != NULL);
srs_assert(live_source.get() != NULL);

bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost);
Expand Down Expand Up @@ -699,7 +699,7 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost)
return err;
}

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -786,7 +786,7 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
return err;
}

srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -923,7 +923,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
return err;
}

srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -969,7 +969,7 @@ srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
return err;
}

srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPublishRecvThread* rtrd)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -1073,7 +1073,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
return err;
}

srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -1141,7 +1141,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
return err;
}

void SrsRtmpConn::release_publish(SrsLiveSource* source)
void SrsRtmpConn::release_publish(SrsSharedPtr<SrsLiveSource> source)
{
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
Expand All @@ -1152,7 +1152,7 @@ void SrsRtmpConn::release_publish(SrsLiveSource* source)
}
}

srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -1193,7 +1193,7 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommon
return err;
}

srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;

Expand Down
17 changes: 9 additions & 8 deletions trunk/src/app/srs_app_rtmp_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <srs_app_reload.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_rtmp_conn.hpp>
#include <srs_core_autofree.hpp>

class SrsServer;
class SrsRtmpServer;
Expand Down Expand Up @@ -145,14 +146,14 @@ class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsRelo
// The stream(play/publish) service cycle, identify client first.
virtual srs_error_t stream_service_cycle();
virtual srs_error_t check_vhost(bool try_default_vhost);
virtual srs_error_t playing(SrsLiveSource* source);
virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsLiveSource* source);
virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsLiveSource* source);
virtual void release_publish(SrsLiveSource* source);
virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t playing(SrsSharedPtr<SrsLiveSource> source);
virtual srs_error_t do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsSharedPtr<SrsLiveSource> source);
virtual srs_error_t do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsSharedPtr<SrsLiveSource> source);
virtual void release_publish(SrsSharedPtr<SrsLiveSource> source);
virtual srs_error_t handle_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg);
virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg);
virtual void set_sock_options();
private:
Expand Down
Loading

0 comments on commit 88f9224

Please sign in to comment.