diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 908d1a6c9c..ba9e12f21d 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -415,9 +415,11 @@ SrsEdgeIngester::~SrsEdgeIngester() srs_freep(trd); } -srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r) +srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr s, SrsPlayEdge* e, SrsRequest* r) { - source_ = s; + // Because source references to this object, so we should directly use the source ptr. + source_ = s.get(); + edge = e; req = r; @@ -747,9 +749,11 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size) return queue->set_queue_size(queue_size); } -srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r) +srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr s, SrsPublishEdge* e, SrsRequest* r) { - source_ = s; + // Because source references to this object, so we should directly use the source ptr. + source_ = s.get(); + edge = e; req = r; @@ -956,7 +960,7 @@ SrsPlayEdge::~SrsPlayEdge() srs_freep(ingester); } -srs_error_t SrsPlayEdge::initialize(SrsLiveSource* source, SrsRequest* req) +srs_error_t SrsPlayEdge::initialize(SrsSharedPtr source, SrsRequest* req) { srs_error_t err = srs_success; @@ -1048,7 +1052,7 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size) return forwarder->set_queue_size(queue_size); } -srs_error_t SrsPublishEdge::initialize(SrsLiveSource* source, SrsRequest* req) +srs_error_t SrsPublishEdge::initialize(SrsSharedPtr source, SrsRequest* req) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 1298a4f325..a1f049c49f 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -152,7 +153,7 @@ class SrsEdgeIngester : public ISrsCoroutineHandler SrsEdgeIngester(); virtual ~SrsEdgeIngester(); public: - virtual srs_error_t initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r); + virtual srs_error_t initialize(SrsSharedPtr s, SrsPlayEdge* e, SrsRequest* r); virtual srs_error_t start(); virtual void stop(); virtual std::string get_curr_origin(); @@ -195,7 +196,7 @@ class SrsEdgeForwarder : public ISrsCoroutineHandler public: virtual void set_queue_size(srs_utime_t queue_size); public: - virtual srs_error_t initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r); + virtual srs_error_t initialize(SrsSharedPtr s, SrsPublishEdge* e, SrsRequest* r); virtual srs_error_t start(); virtual void stop(); // Interface ISrsReusableThread2Handler @@ -220,7 +221,7 @@ class SrsPlayEdge // Always use the req of source, // For we assume all client to edge is invalid, // if auth open, edge must valid it from origin, then service it. - virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req); + virtual srs_error_t initialize(SrsSharedPtr source, SrsRequest* req); // When client play stream on edge. virtual srs_error_t on_client_play(); // When all client stopped play, disconnect to origin. @@ -243,7 +244,7 @@ class SrsPublishEdge public: virtual void set_queue_size(srs_utime_t queue_size); public: - virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req); + virtual srs_error_t initialize(SrsSharedPtr source, SrsRequest* req); virtual bool can_publish(); // When client publish stream on edge. virtual srs_error_t on_client_publish(); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index b61a4b4fc9..914cefd5c6 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -106,8 +106,8 @@ srs_error_t SrsBufferCache::cycle() return err; } - SrsLiveSource* live_source = _srs_sources->fetch(req); - if (!live_source) { + SrsSharedPtr live_source = _srs_sources->fetch(req); + if (!live_source.get()) { return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); } @@ -661,8 +661,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Enter chunked mode, because we didn't set the content-length. w->write_header(SRS_CONSTS_HTTP_OK); - SrsLiveSource* live_source = _srs_sources->fetch(req); - if (!live_source) { + SrsSharedPtr live_source = _srs_sources->fetch(req); + if (!live_source.get()) { return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); } @@ -1136,11 +1136,11 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle } } - SrsLiveSource* live_source = NULL; - if ((err = _srs_sources->fetch_or_create(r, server, &live_source)) != srs_success) { + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(r, server, live_source)) != srs_success) { return srs_error_wrap(err, "source create"); } - srs_assert(live_source != NULL); + srs_assert(live_source.get() != NULL); bool enabled_cache = _srs_config->get_gop_cache(r->vhost); int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost); diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 8dc3c9a34a..b25fdc9f1b 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -259,7 +259,7 @@ void SrsQueueRecvThread::on_stop() } SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid) + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr source, SrsContextId parent_cid) : trd(this, rtmp_sdk, tm, parent_cid) { rtmp = rtmp_sdk; diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 7682dab84d..5fa011db57 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -16,6 +16,7 @@ #include #include #include +#include class SrsRtmpServer; class SrsCommonMessage; @@ -146,7 +147,7 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler srs_error_t recv_error; SrsRtmpConn* _conn; // The params for conn callback. - SrsLiveSource* source_; + SrsSharedPtr source_; // The error timeout cond srs_cond_t error; // The merged context id. @@ -154,7 +155,7 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler SrsContextId ncid; public: SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid); + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr source, SrsContextId parent_cid); virtual ~SrsPublishRecvThread(); public: // Wait for error for some timeout. diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index d7fc861f61..1f2880633f 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -224,8 +224,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa // For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728 if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) { - SrsLiveSource* live_source = _srs_sources->fetch(ruc->req_); - if (live_source && !live_source->inactive()) { + SrsSharedPtr live_source = _srs_sources->fetch(ruc->req_); + if (live_source.get() && !live_source->inactive()) { return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str()); } } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index b809ada9ef..52d142a4ca 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1202,8 +1202,8 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti source_->set_publish_stream(this); // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? - SrsLiveSource* live_source = _srs_sources->fetch(r); - if (live_source && !live_source->can_publish(false)) { + SrsSharedPtr live_source = _srs_sources->fetch(r); + if (live_source.get() && !live_source->can_publish(false)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); } @@ -1227,7 +1227,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost); if (rtc_to_rtmp) { - if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), live_source)) != srs_success) { return srs_error_wrap(err, "create source"); } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index a457ea45ec..fd01a831ef 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -571,11 +571,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle() rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); // find a source to serve. - SrsLiveSource* live_source = NULL; - if ((err = _srs_sources->fetch_or_create(req, server, &live_source)) != srs_success) { + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(req, server, live_source)) != srs_success) { return srs_error_wrap(err, "rtmp: fetch source"); } - srs_assert(live_source != NULL); + srs_assert(live_source.get() != NULL); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost); @@ -699,7 +699,7 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost) return err; } -srs_error_t SrsRtmpConn::playing(SrsLiveSource* source) +srs_error_t SrsRtmpConn::playing(SrsSharedPtr source) { srs_error_t err = srs_success; @@ -786,7 +786,7 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source) return err; } -srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd) +srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd) { srs_error_t err = srs_success; @@ -923,7 +923,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons return err; } -srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source) +srs_error_t SrsRtmpConn::publishing(SrsSharedPtr source) { srs_error_t err = srs_success; @@ -969,7 +969,7 @@ srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source) return err; } -srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd) +srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr source, SrsPublishRecvThread* rtrd) { srs_error_t err = srs_success; @@ -1073,7 +1073,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre return err; } -srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) +srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) { srs_error_t err = srs_success; @@ -1141,7 +1141,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) return err; } -void SrsRtmpConn::release_publish(SrsLiveSource* source) +void SrsRtmpConn::release_publish(SrsSharedPtr source) { // when edge, notice edge to change state. // when origin, notice all service to unpublish. @@ -1152,7 +1152,7 @@ void SrsRtmpConn::release_publish(SrsLiveSource* source) } } -srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) +srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg) { srs_error_t err = srs_success; @@ -1193,7 +1193,7 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommon return err; } -srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) +srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 9b86f4fab2..8c86627fec 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -16,6 +16,7 @@ #include #include #include +#include class SrsServer; class SrsRtmpServer; @@ -145,14 +146,14 @@ class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsRelo // The stream(play/publish) service cycle, identify client first. virtual srs_error_t stream_service_cycle(); virtual srs_error_t check_vhost(bool try_default_vhost); - virtual srs_error_t playing(SrsLiveSource* source); - virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd); - virtual srs_error_t publishing(SrsLiveSource* source); - virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd); - virtual srs_error_t acquire_publish(SrsLiveSource* source); - virtual void release_publish(SrsLiveSource* source); - virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg); - virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg); + virtual srs_error_t playing(SrsSharedPtr source); + virtual srs_error_t do_playing(SrsSharedPtr source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd); + virtual srs_error_t publishing(SrsSharedPtr source); + virtual srs_error_t do_publishing(SrsSharedPtr source, SrsPublishRecvThread* trd); + virtual srs_error_t acquire_publish(SrsSharedPtr source); + virtual void release_publish(SrsSharedPtr source); + virtual srs_error_t handle_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg); + virtual srs_error_t process_publish_message(SrsSharedPtr& source, SrsCommonMessage* msg); virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg); virtual void set_sock_options(); private: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 6b4238d710..8e84d24aab 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -861,12 +861,13 @@ SrsOriginHub::~SrsOriginHub() #endif } -srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r) +srs_error_t SrsOriginHub::initialize(SrsSharedPtr s, SrsRequest* r) { srs_error_t err = srs_success; req_ = r; - source_ = s; + // Because source references to this object, so we should directly use the source ptr. + source_ = s.get(); if ((err = hls->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "hls initialize"); @@ -1759,7 +1760,7 @@ srs_error_t SrsLiveSourceManager::initialize() return setup_ticks(); } -srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps) +srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsSharedPtr& pps) { srs_error_t err = srs_success; @@ -1769,33 +1770,32 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH SrsLocker(lock); string stream_url = r->get_stream_url(); - std::map::iterator it = pool.find(stream_url); + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); if (it != pool.end()) { - SrsLiveSource* source = it->second; + SrsSharedPtr& source = it->second; // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. source->update_auth(r); - *pps = source; + pps = source; return err; } - - SrsLiveSource* source = new SrsLiveSource(); + + SrsSharedPtr source = new SrsLiveSource(); srs_trace("new live source, stream_url=%s", stream_url.c_str()); - if ((err = source->initialize(r, h)) != srs_success) { - srs_freep(source); + if ((err = source->initialize(source, r, h)) != srs_success) { return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); } pool[stream_url] = source; - *pps = source; + pps = source; return err; } -SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r) +SrsSharedPtr SrsLiveSourceManager::fetch(SrsRequest* r) { // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 @@ -1803,21 +1803,21 @@ SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r) SrsLocker(lock); string stream_url = r->get_stream_url(); - std::map::iterator it = pool.find(stream_url); + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); if (it == pool.end()) { - return NULL; + return SrsSharedPtr(NULL); } - SrsLiveSource* source = it->second; + SrsSharedPtr& source = it->second; return source; } void SrsLiveSourceManager::dispose() { - std::map::iterator it; + std::map< std::string, SrsSharedPtr >::iterator it; for (it = pool.begin(); it != pool.end(); ++it) { - SrsLiveSource* source = it->second; + SrsSharedPtr& source = it->second; source->dispose(); } return; @@ -1842,9 +1842,9 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut { srs_error_t err = srs_success; - std::map::iterator it; + std::map< std::string, SrsSharedPtr >::iterator it; for (it = pool.begin(); it != pool.end();) { - SrsLiveSource* source = it->second; + SrsSharedPtr& source = it->second; // Do cycle source to cleanup components, such as hls dispose. if ((err = source->cycle()) != srs_success) { @@ -1854,19 +1854,11 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut // See SrsSrtSource::on_consumer_destroy // TODO: FIXME: support source cleanup. // @see https://github.com/ossrs/srs/issues/713 -#if 0 +#if 1 // When source expired, remove it. if (source->stream_is_dead()) { - int cid = source->source_id(); - if (cid == -1 && source->pre_source_id() > 0) { - cid = source->pre_source_id(); - } - if (cid > 0) { - _srs_context->set_id(cid); - } - srs_trace("cleanup die source, total=%d", (int)pool.size()); - - srs_freep(source); + const SrsContextId& cid = source->source_id(); + srs_trace("cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size()); pool.erase(it++); } else { ++it; @@ -1881,11 +1873,6 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut void SrsLiveSourceManager::destroy() { - std::map::iterator it; - for (it = pool.begin(); it != pool.end(); ++it) { - SrsLiveSource* source = it->second; - srs_freep(source); - } pool.clear(); } @@ -1993,7 +1980,7 @@ bool SrsLiveSource::publisher_is_idle_for(srs_utime_t timeout) return false; } -srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) +srs_error_t SrsLiveSource::initialize(SrsSharedPtr wrapper, SrsRequest* r, ISrsLiveSourceHandler* h) { srs_error_t err = srs_success; @@ -2011,14 +1998,14 @@ srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) // Setup the SPS/PPS parsing strategy. format_->try_annexb_first = _srs_config->try_annexb_first(r->vhost); - if ((err = hub->initialize(this, req)) != srs_success) { + if ((err = hub->initialize(wrapper, req)) != srs_success) { return srs_error_wrap(err, "hub"); } - if ((err = play_edge->initialize(this, req)) != srs_success) { + if ((err = play_edge->initialize(wrapper, req)) != srs_success) { return srs_error_wrap(err, "edge(play)"); } - if ((err = publish_edge->initialize(this, req)) != srs_success) { + if ((err = publish_edge->initialize(wrapper, req)) != srs_success) { return srs_error_wrap(err, "edge(publish)"); } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 9c48c769cb..802c2cb202 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -19,6 +19,7 @@ #include #include #include +#include class SrsFormat; class SrsRtmpFormat; @@ -345,7 +346,7 @@ class SrsOriginHub : public ISrsReloadHandler public: // Initialize the hub with source and request. // @param r The request object, managed by source. - virtual srs_error_t initialize(SrsLiveSource* s, SrsRequest* r); + virtual srs_error_t initialize(SrsSharedPtr s, SrsRequest* r); // Dispose the hub, release utilities resource, // For example, delete all HLS pieces. virtual void dispose(); @@ -447,7 +448,7 @@ class SrsLiveSourceManager : public ISrsHourGlass { private: srs_mutex_t lock; - std::map pool; + std::map< std::string, SrsSharedPtr > pool; SrsHourGlass* timer_; public: SrsLiveSourceManager(); @@ -458,10 +459,10 @@ class SrsLiveSourceManager : public ISrsHourGlass // @param r the client request. // @param h the event handler for source. // @param pps the matched source, if success never be NULL. - virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps); + virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsSharedPtr& pps); public: // Get the exists source, NULL when not exists. - virtual SrsLiveSource* fetch(SrsRequest* r); + virtual SrsSharedPtr fetch(SrsRequest* r); public: // dispose and cycle all sources. virtual void dispose(); @@ -543,7 +544,7 @@ class SrsLiveSource : public ISrsReloadHandler bool publisher_is_idle_for(srs_utime_t timeout); public: // Initialize the hls with handlers. - virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h); + virtual srs_error_t initialize(SrsSharedPtr wrapper, SrsRequest* r, ISrsLiveSourceHandler* h); // Bridge to other source, forward packets to it. void set_bridge(ISrsStreamBridge* v); // Interface ISrsReloadHandler diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 73a4915dd3..47d3428953 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -368,16 +368,16 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() } // Check rtmp stream is busy. - SrsLiveSource* live_source = _srs_sources->fetch(req_); - if (live_source && !live_source->can_publish(false)) { + SrsSharedPtr live_source = _srs_sources->fetch(req_); + if (live_source.get() && !live_source->can_publish(false)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); } - if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), live_source)) != srs_success) { return srs_error_wrap(err, "create source"); } - srs_assert(live_source != NULL); + srs_assert(live_source.get() != NULL); bool enabled_cache = _srs_config->get_gop_cache(req_->vhost); int gcmf = _srs_config->get_gop_cache_max_frames(req_->vhost); diff --git a/trunk/src/app/srs_app_stream_bridge.cpp b/trunk/src/app/srs_app_stream_bridge.cpp index 353799210e..d043384159 100644 --- a/trunk/src/app/srs_app_stream_bridge.cpp +++ b/trunk/src/app/srs_app_stream_bridge.cpp @@ -25,7 +25,7 @@ ISrsStreamBridge::~ISrsStreamBridge() { } -SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsLiveSource* source) +SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsSharedPtr source) { source_ = source; } diff --git a/trunk/src/app/srs_app_stream_bridge.hpp b/trunk/src/app/srs_app_stream_bridge.hpp index 2abdb45413..e78aad96b8 100644 --- a/trunk/src/app/srs_app_stream_bridge.hpp +++ b/trunk/src/app/srs_app_stream_bridge.hpp @@ -42,9 +42,9 @@ class ISrsStreamBridge class SrsFrameToRtmpBridge : public ISrsStreamBridge { private: - SrsLiveSource* source_; + SrsSharedPtr source_; public: - SrsFrameToRtmpBridge(SrsLiveSource* source); + SrsFrameToRtmpBridge(SrsSharedPtr source); virtual ~SrsFrameToRtmpBridge(); public: srs_error_t initialize(SrsRequest* r); diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 2344e911ef..155621be0b 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -139,6 +139,17 @@ VOID TEST(CoreLogger, SharedPtrReset) } } +SrsSharedPtr mock_create_from_ptr(SrsSharedPtr p) { + return p; +} + +VOID TEST(CoreLogger, SharedPtrContructor) +{ + int* p = new int(100); + SrsSharedPtr q = mock_create_from_ptr(p); + EXPECT_EQ(100, *q); +} + VOID TEST(CoreLogger, SharedPtrObject) { SrsSharedPtr p(new MyNormalObject(100));