diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 3ad0c32175..f58ab269f1 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,7 +7,6 @@ The changelog for SRS. ## SRS 6.0 Changelog -* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080) * v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057) * v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044) * v6.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v6.0.123 (#4038) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 90d818ece0..7652be15b7 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -413,6 +413,28 @@ void SrsResourceManager::dispose(ISrsResource* c) } } +SrsLazySweepGc::SrsLazySweepGc() +{ +} + +SrsLazySweepGc::~SrsLazySweepGc() +{ +} + +srs_error_t SrsLazySweepGc::start() +{ + srs_error_t err = srs_success; + return err; +} + +void SrsLazySweepGc::remove(SrsLazyObject* c) +{ + // TODO: FIXME: MUST lazy sweep. + srs_freep(c); +} + +ISrsLazyGc* _srs_gc = NULL; + ISrsExpire::ISrsExpire() { } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index c0e965525f..b5aeb48da0 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -20,7 +20,6 @@ #include #include #include -#include class SrsWallClock; class SrsBuffer; @@ -126,66 +125,98 @@ class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManag void dispose(ISrsResource* c); }; -// This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this -// smart pointer resource, such as by implementing delayed release. -// -// It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage -// is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted -// to one another. +// A simple lazy-sweep GC, just wait for a long time to delete the disposable resources. +class SrsLazySweepGc : public ISrsLazyGc +{ +public: + SrsLazySweepGc(); + virtual ~SrsLazySweepGc(); +public: + virtual srs_error_t start(); + virtual void remove(SrsLazyObject* c); +}; + +extern ISrsLazyGc* _srs_gc; + +// A wrapper template for lazy-sweep resource. +// See https://github.com/ossrs/srs/issues/3176#lazy-sweep // -// Note that we don't need to implement the move constructor and move assignment operator, because we directly -// use SrsSharedPtr as instance member, so we can only copy it. +// Usage for resource which manages itself in coroutine cycle, see SrsLazyGbSession: +// class Resource { +// private: +// SrsLazyObjectWrapper* wrapper_; +// private: +// friend class SrsLazyObjectWrapper; +// Resource(SrsLazyObjectWrapper* wrapper) { wrapper_ = wrapper; } +// public: +// srs_error_t Resource::cycle() { +// srs_error_t err = do_cycle(); +// _srs_gb_manager->remove(wrapper_); +// return err; +// } +// }; +// SrsLazyObjectWrapper* obj = new SrsLazyObjectWrapper*(); +// _srs_gb_manager->add(obj); // Add wrapper to resource manager. +// Start a coroutine to do obj->resource()->cycle(). // -// Usage: -// SrsSharedResource* ptr = new SrsSharedResource(new MyClass()); -// (*ptr)->do_something(); +// Usage for resource managed by other object: +// class Resource { +// private: +// friend class SrsLazyObjectWrapper; +// Resource(SrsLazyObjectWrapper* /*wrapper*/) { +// } +// }; +// class Manager { +// private: +// SrsLazyObjectWrapper* wrapper_; +// public: +// Manager() { wrapper_ = new SrsLazyObjectWrapper(); } +// ~Manager() { srs_freep(wrapper_); } +// }; +// Manager* manager = new Manager(); +// srs_freep(manager); // -// ISrsResourceManager* manager = ...; -// manager->remove(ptr); +// Note that under-layer resource are destroyed by _srs_gc, which is literally equal to srs_freep. However, the root +// wrapper might be managed by other resource manager, such as _srs_gb_manager for SrsLazyGbSession. Furthermore, other +// copied out wrappers might be freed by srs_freep. All are ok, because all wrapper and resources are simply normal +// object, so if you added to manager then you should use manager to remove it, and you can also directly delete it. template -class SrsSharedResource : public ISrsResource +class SrsLazyObjectWrapper : public ISrsResource { private: - SrsSharedPtr ptr_; + T* resource_; public: - SrsSharedResource(T* ptr) : ptr_(ptr) { - } - SrsSharedResource(const SrsSharedResource& cp) : ptr_(cp.ptr_) { + SrsLazyObjectWrapper() { + init(new T(this)); } - virtual ~SrsSharedResource() { - } -public: - // Get the object. - T* get() { - return ptr_.get(); - } - // Overload the -> operator. - T* operator->() { - return ptr_.operator->(); - } - // The assign operator. - SrsSharedResource& operator=(const SrsSharedResource& cp) { - if (this != &cp) { - ptr_ = cp.ptr_; + virtual ~SrsLazyObjectWrapper() { + resource_->gc_dispose(); + if (resource_->gc_ref() == 0) { + _srs_gc->remove(resource_); } - return *this; } private: - // Overload the * operator. - T& operator*() { - return ptr_.operator*(); + SrsLazyObjectWrapper(T* resource) { + init(resource); + } + void init(T* resource) { + resource_ = resource; + resource_->gc_use(); + } +public: + SrsLazyObjectWrapper* copy() { + return new SrsLazyObjectWrapper(resource_); } - // Overload the bool operator. - operator bool() const { - return ptr_.operator bool(); + T* resource() { + return resource_; } // Interface ISrsResource public: virtual const SrsContextId& get_id() { - return ptr_->get_id(); + return resource_->get_id(); } virtual std::string desc() { - return ptr_->desc(); + return resource_->desc(); } }; diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 98bfb0d2b9..99c9dbf0cd 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -70,12 +70,11 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state) return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str()); } -SrsGbSession::SrsGbSession() : sip_(new SrsGbSipTcpConn()), media_(new SrsGbMediaTcpConn()) +SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root) { - wrapper_ = NULL; - owner_coroutine_ = NULL; - owner_cid_ = NULL; - + wrapper_root_ = wrapper_root; + sip_ = new SrsLazyObjectWrapper(); + media_ = new SrsLazyObjectWrapper(); muxer_ = new SrsGbMuxer(this); state_ = SrsGbSessionStateInit; @@ -103,43 +102,41 @@ SrsGbSession::SrsGbSession() : sip_(new SrsGbSipTcpConn()), media_(new SrsGbMedi cid_ = _srs_context->generate_id(); _srs_context->set_id(cid_); // Also change current coroutine cid as session's. + trd_ = new SrsSTCoroutine("GBS", this, cid_); } -SrsGbSession::~SrsGbSession() +SrsLazyGbSession::~SrsLazyGbSession() { + srs_freep(trd_); + srs_freep(sip_); + srs_freep(media_); srs_freep(muxer_); srs_freep(ppp_); } -void SrsGbSession::setup(SrsConfDirective* conf) +srs_error_t SrsLazyGbSession::initialize(SrsConfDirective* conf) { + srs_error_t err = srs_success; + pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf); if (candidate_ == "*") { pip_ = srs_get_public_internet_address(true); } std::string output = _srs_config->get_stream_caster_output(conf); - muxer_->setup(output); + if ((err = muxer_->initialize(output)) != srs_success) { + return srs_error_wrap(err, "muxer"); + } connecting_timeout_ = _srs_config->get_stream_caster_sip_timeout(conf); reinvite_wait_ = _srs_config->get_stream_caster_sip_reinvite(conf); srs_trace("Session: Start timeout=%dms, reinvite=%dms, candidate=%s, pip=%s, output=%s", srsu2msi(connecting_timeout_), srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str()); -} -void SrsGbSession::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) -{ - wrapper_ = wrapper; - owner_coroutine_ = owner_coroutine; - owner_cid_ = owner_cid; -} - -void SrsGbSession::on_executor_done(ISrsInterruptable* executor) -{ - owner_coroutine_ = NULL; + return err; } -void SrsGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) +void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) { // Got a new context, that is new media transport. if (media_id_ != ctx->media_id_) { @@ -198,47 +195,57 @@ void SrsGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::v } } -void SrsGbSession::on_sip_transport(SrsSharedResource sip) +void SrsLazyGbSession::on_sip_transport(SrsLazyObjectWrapper* sip) { - sip_ = sip; + srs_freep(sip_); + sip_ = sip->copy(); + // Change id of SIP and all its child coroutines. - sip_->set_cid(cid_); + sip_->resource()->set_cid(cid_); } -SrsSharedResource SrsGbSession::sip_transport() +SrsLazyObjectWrapper* SrsLazyGbSession::sip_transport() { return sip_; } -void SrsGbSession::on_media_transport(SrsSharedResource media) +void SrsLazyGbSession::on_media_transport(SrsLazyObjectWrapper* media) { - media_ = media; + srs_freep(media_); + media_ = media->copy(); // Change id of SIP and all its child coroutines. - media_->set_cid(cid_); + media_->resource()->set_cid(cid_); } -std::string SrsGbSession::pip() +std::string SrsLazyGbSession::pip() { return pip_; } -srs_error_t SrsGbSession::cycle() +srs_error_t SrsLazyGbSession::start() { srs_error_t err = srs_success; - // Update all context id to cid of session. - _srs_context->set_id(cid_); - owner_cid_->set_cid(cid_); - sip_->set_cid(cid_); - media_->set_cid(cid_); + if ((err = trd_->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } - // Drive the session cycle. - err = do_cycle(); + return err; +} + +srs_error_t SrsLazyGbSession::cycle() +{ + srs_error_t err = do_cycle(); // Interrupt the SIP and media transport when session terminated. - sip_->interrupt(); - media_->interrupt(); + sip_->resource()->interrupt(); + media_->resource()->interrupt(); + + // Note that we added wrapper to manager, so we must free the wrapper, not this connection. + SrsLazyObjectWrapper* wrapper = wrapper_root_; + srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. + _srs_gb_manager->remove(wrapper); // success. if (err == srs_success) { @@ -267,13 +274,12 @@ srs_error_t SrsGbSession::cycle() return srs_success; } -srs_error_t SrsGbSession::do_cycle() +srs_error_t SrsLazyGbSession::do_cycle() { srs_error_t err = srs_success; while (true) { - if (!owner_coroutine_) return err; - if ((err = owner_coroutine_->pull()) != srs_success) { + if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } @@ -281,7 +287,7 @@ srs_error_t SrsGbSession::do_cycle() srs_usleep(SRS_GB_SESSION_DRIVE_INTERVAL); // Client send bye, we should dispose the session. - if (sip_->is_bye()) { + if (sip_->resource()->is_bye()) { return err; } @@ -304,33 +310,35 @@ srs_error_t SrsGbSession::do_cycle() return err; } -srs_error_t SrsGbSession::drive_state() +srs_error_t SrsLazyGbSession::drive_state() { srs_error_t err = srs_success; #define SRS_GB_CHANGE_STATE_TO(state) { \ SrsGbSessionState ostate = set_state(state); \ - srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \ + srs_trace("Session: Change device=%s, state=%s", sip_->resource()->device_id().c_str(), \ srs_gb_state(ostate, state_).c_str()); \ } if (state_ == SrsGbSessionStateInit) { // Set to connecting, whatever media is connected or not, because the connecting state will handle it if media // is connected, so we don't need to handle it here. - if (sip_->is_registered()) { + if (sip_->resource()->is_registered()) { SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting); connecting_starttime_ = srs_update_system_time(); } // Invite if media is not connected. - if (sip_->is_registered() && !media_->is_connected()) { + if (sip_->resource()->is_registered() && !media_->resource()->is_connected()) { uint32_t ssrc = 0; - if ((err = sip_->invite_request(&ssrc)) != srs_success) { + if ((err = sip_->resource()->invite_request(&ssrc)) != srs_success) { return srs_error_wrap(err, "invite"); } // Now, we're able to query session by ssrc, for media packets. - _srs_gb_manager->add_with_fast_id(ssrc, wrapper_); + SrsLazyObjectWrapper* wrapper = wrapper_root_; + srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. + _srs_gb_manager->add_with_fast_id(ssrc, wrapper); } } @@ -341,32 +349,32 @@ srs_error_t SrsGbSession::drive_state() } srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(), - srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected()); - sip_->reset_to_register(); + srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); + sip_->resource()->reset_to_register(); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); } - if (sip_->is_stable() && media_->is_connected()) { + if (sip_->resource()->is_stable() && media_->resource()->is_connected()) { SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished); } } if (state_ == SrsGbSessionStateEstablished) { - if (sip_->is_bye()) { + if (sip_->resource()->is_bye()) { srs_trace("Session: Dispose for client bye"); return err; } // When media disconnected, we wait for a while then reinvite. - if (!media_->is_connected()) { + if (!media_->resource()->is_connected()) { if (!reinviting_starttime_) { reinviting_starttime_ = srs_update_system_time(); } if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) { reinviting_starttime_ = 0; srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(), - srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected()); - sip_->reset_to_register(); + srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); + sip_->resource()->reset_to_register(); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); } } @@ -375,19 +383,19 @@ srs_error_t SrsGbSession::drive_state() return err; } -SrsGbSessionState SrsGbSession::set_state(SrsGbSessionState v) +SrsGbSessionState SrsLazyGbSession::set_state(SrsGbSessionState v) { SrsGbSessionState state = state_; state_ = v; return state; } -const SrsContextId& SrsGbSession::get_id() +const SrsContextId& SrsLazyGbSession::get_id() { return cid_; } -std::string SrsGbSession::desc() +std::string SrsLazyGbSession::desc() { return "GBS"; } @@ -455,33 +463,27 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf // Handle TCP connections. if (listener == sip_listener_) { - SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn(); - raw_conn->setup(conf_, sip_listener_, media_listener_, stfd); - - SrsSharedResource* conn = new SrsSharedResource(raw_conn); - _srs_gb_manager->add(conn, NULL); + SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); + SrsLazyGbSipTcpConn* resource = dynamic_cast(conn->resource()); + resource->setup(conf_, sip_listener_, media_listener_, stfd); - SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); - raw_conn->setup_owner(conn, executor, executor); - - if ((err = executor->start()) != srs_success) { - srs_freep(executor); + if ((err = resource->start()) != srs_success) { + srs_freep(conn); return srs_error_wrap(err, "gb sip"); } - } else if (listener == media_listener_) { - SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn(); - raw_conn->setup(stfd); - SrsSharedResource* conn = new SrsSharedResource(raw_conn); _srs_gb_manager->add(conn, NULL); + } else if (listener == media_listener_) { + SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); + SrsLazyGbMediaTcpConn* resource = dynamic_cast(conn->resource()); + resource->setup(stfd); - SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); - raw_conn->setup_owner(conn, executor, executor); - - if ((err = executor->start()) != srs_success) { - srs_freep(executor); + if ((err = resource->start()) != srs_success) { + srs_freep(conn); return srs_error_wrap(err, "gb media"); } + + _srs_gb_manager->add(conn, NULL); } else { srs_warn("GB: Ignore TCP client"); srs_close_stfd(stfd); @@ -490,13 +492,9 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf return err; } -SrsGbSipTcpConn::SrsGbSipTcpConn() +SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root) { - wrapper_ = NULL; - owner_coroutine_ = NULL; - owner_cid_ = NULL; - cid_ = _srs_context->get_id(); - + wrapper_root_ = wrapper_root; session_ = NULL; state_ = SrsGbSipStateInit; register_ = new SrsSipMessage(); @@ -509,62 +507,54 @@ SrsGbSipTcpConn::SrsGbSipTcpConn() conn_ = NULL; receiver_ = NULL; sender_ = NULL; + + trd_ = new SrsSTCoroutine("sip", this); } -SrsGbSipTcpConn::~SrsGbSipTcpConn() +SrsLazyGbSipTcpConn::~SrsLazyGbSipTcpConn() { + srs_freep(trd_); srs_freep(receiver_); srs_freep(sender_); srs_freep(conn_); + srs_freep(session_); srs_freep(register_); srs_freep(invite_ok_); srs_freep(conf_); } -void SrsGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd) +void SrsLazyGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd) { srs_freep(conf_); conf_ = conf->copy(); + session_ = NULL; sip_listener_ = sip; media_listener_ = media; conn_ = new SrsTcpConnection(stfd); - receiver_ = new SrsGbSipTcpReceiver(this, conn_); - sender_ = new SrsGbSipTcpSender(conn_); -} - -void SrsGbSipTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) -{ - wrapper_ = wrapper; - owner_coroutine_ = owner_coroutine; - owner_cid_ = owner_cid; + receiver_ = new SrsLazyGbSipTcpReceiver(this, conn_); + sender_ = new SrsLazyGbSipTcpSender(conn_); } -void SrsGbSipTcpConn::on_executor_done(ISrsInterruptable* executor) -{ - owner_coroutine_ = NULL; -} - -std::string SrsGbSipTcpConn::device_id() +std::string SrsLazyGbSipTcpConn::device_id() { return register_->device_id(); } -void SrsGbSipTcpConn::set_cid(const SrsContextId& cid) +void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) { - if (owner_cid_) owner_cid_->set_cid(cid); + trd_->set_cid(cid); receiver_->set_cid(cid); sender_->set_cid(cid); - cid_ = cid; } -void SrsGbSipTcpConn::query_ports(int* sip, int* media) +void SrsLazyGbSipTcpConn::query_ports(int* sip, int* media) { if (sip) *sip = sip_listener_->port(); if (media) *media = media_listener_->port(); } -srs_error_t SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg) +srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg) { srs_error_t err = srs_success; @@ -613,7 +603,7 @@ srs_error_t SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg) return err; } -void SrsGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) +void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) { // Drive state machine when enqueue message. drive_state(msg); @@ -622,7 +612,7 @@ void SrsGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) sender_->enqueue(msg); } -void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg) +void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg) { srs_error_t err = srs_success; @@ -679,7 +669,7 @@ void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg) } } -void SrsGbSipTcpConn::register_response(SrsSipMessage* msg) +void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg) { SrsSipMessage* res = new SrsSipMessage(); @@ -696,7 +686,7 @@ void SrsGbSipTcpConn::register_response(SrsSipMessage* msg) enqueue_sip_message(res); } -void SrsGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) +void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) { SrsSipMessage* res = new SrsSipMessage(); @@ -711,9 +701,9 @@ void SrsGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) enqueue_sip_message(res); } -void SrsGbSipTcpConn::invite_ack(SrsSipMessage* msg) +void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) { - string pip = session_->pip(); // Parse from CANDIDATE + string pip = session_->resource()->pip(); // Parse from CANDIDATE int sip_port; query_ports(&sip_port, NULL); string gb_device_id = srs_fmt("sip:%s@%s", msg->to_address_user_.c_str(), msg->to_address_host_.c_str()); string branch = srs_random_str(6); @@ -732,7 +722,7 @@ void SrsGbSipTcpConn::invite_ack(SrsSipMessage* msg) enqueue_sip_message(req); } -void SrsGbSipTcpConn::bye_response(SrsSipMessage* msg) +void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg) { SrsSipMessage* res = new SrsSipMessage(); @@ -747,7 +737,7 @@ void SrsGbSipTcpConn::bye_response(SrsSipMessage* msg) enqueue_sip_message(res); } -srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc) +srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) { srs_error_t err = srs_success; @@ -775,7 +765,7 @@ srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc) if (pssrc) *pssrc = ssrc_v_; } - string pip = session_->pip(); // Parse from CANDIDATE + string pip = session_->resource()->pip(); // Parse from CANDIDATE int sip_port, media_port; query_ports(&sip_port, &media_port); string srs_device_id = srs_fmt("sip:%s@%s", register_->request_uri_user_.c_str(), register_->request_uri_host_.c_str()); string gb_device_id = srs_fmt("sip:%s@%s", register_->from_address_user_.c_str(), register_->from_address_host_.c_str()); @@ -844,59 +834,63 @@ srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc) return err; } -void SrsGbSipTcpConn::interrupt() +void SrsLazyGbSipTcpConn::interrupt() { receiver_->interrupt(); sender_->interrupt(); - if (owner_coroutine_) owner_coroutine_->interrupt(); + trd_->interrupt(); } -SrsGbSipState SrsGbSipTcpConn::state() +SrsGbSipState SrsLazyGbSipTcpConn::state() { return state_; } -void SrsGbSipTcpConn::reset_to_register() +void SrsLazyGbSipTcpConn::reset_to_register() { state_ = SrsGbSipStateRegistered; } -bool SrsGbSipTcpConn::is_registered() +bool SrsLazyGbSipTcpConn::is_registered() { return state_ >= SrsGbSipStateRegistered && state_ <= SrsGbSipStateStable; } -bool SrsGbSipTcpConn::is_stable() +bool SrsLazyGbSipTcpConn::is_stable() { return state_ == SrsGbSipStateStable; } -bool SrsGbSipTcpConn::is_bye() +bool SrsLazyGbSipTcpConn::is_bye() { return state_ == SrsGbSipStateBye; } -SrsGbSipState SrsGbSipTcpConn::set_state(SrsGbSipState v) +SrsGbSipState SrsLazyGbSipTcpConn::set_state(SrsGbSipState v) { SrsGbSipState state = state_; state_ = v; return state; } -const SrsContextId& SrsGbSipTcpConn::get_id() +const SrsContextId& SrsLazyGbSipTcpConn::get_id() { - return cid_; + return trd_->cid(); } -std::string SrsGbSipTcpConn::desc() +std::string SrsLazyGbSipTcpConn::desc() { return "GB-SIP-TCP"; } -srs_error_t SrsGbSipTcpConn::cycle() +srs_error_t SrsLazyGbSipTcpConn::start() { srs_error_t err = srs_success; + if ((err = trd_->start()) != srs_success) { + return srs_error_wrap(err, "sip"); + } + if ((err = receiver_->start()) != srs_success) { return srs_error_wrap(err, "receiver"); } @@ -905,13 +899,22 @@ srs_error_t SrsGbSipTcpConn::cycle() return srs_error_wrap(err, "sender"); } - // Wait for the SIP connection to be terminated. - err = do_cycle(); + return err; +} + +srs_error_t SrsLazyGbSipTcpConn::cycle() +{ + srs_error_t err = do_cycle(); // Interrupt the receiver and sender coroutine. receiver_->interrupt(); sender_->interrupt(); + // Note that we added wrapper to manager, so we must free the wrapper, not this connection. + SrsLazyObjectWrapper* wrapper = wrapper_root_; + srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. + _srs_gb_manager->remove(wrapper); + // success. if (err == srs_success) { srs_trace("client finished."); @@ -939,23 +942,23 @@ srs_error_t SrsGbSipTcpConn::cycle() return srs_success; } -srs_error_t SrsGbSipTcpConn::do_cycle() +srs_error_t SrsLazyGbSipTcpConn::do_cycle() { srs_error_t err = srs_success; while (true) { - if (!owner_coroutine_) return err; - if ((err = owner_coroutine_->pull()) != srs_success) { + if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } + // TODO: Handle other messages. srs_usleep(SRS_UTIME_NO_TIMEOUT); } return err; } -srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** psession) +srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession) { srs_error_t err = srs_success; @@ -965,29 +968,32 @@ srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** pse // Only create session for REGISTER request. if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err; + // The lazy-sweep wrapper for this resource. + SrsLazyObjectWrapper* wrapper = wrapper_root_; + srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine of receiver. + // Find exists session for register, might be created by another object and still alive. - SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); - SrsGbSession* raw_session = session ? (*session).get() : NULL; + SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); if (!session) { // Create new GB session. - raw_session = new SrsGbSession(); - raw_session->setup(conf_); - - session = new SrsSharedResource(raw_session); - _srs_gb_manager->add_with_id(device, session); + session = new SrsLazyObjectWrapper(); - SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session); - raw_session->setup_owner(session, executor, executor); + if ((err = session->resource()->initialize(conf_)) != srs_success) { + srs_freep(session); + return srs_error_wrap(err, "initialize"); + } - if ((err = executor->start()) != srs_success) { - srs_freep(executor); - return srs_error_wrap(err, "gb session"); + if ((err = session->resource()->start()) != srs_success) { + srs_freep(session); + return srs_error_wrap(err, "start"); } + + _srs_gb_manager->add_with_id(device, session); } // Try to load state from previous SIP connection. - SrsSharedResource pre = raw_session->sip_transport(); - if (pre.get() && pre.get() != this) { + SrsLazyGbSipTcpConn* pre = dynamic_cast(session->resource()->sip_transport()->resource()); + if (pre) { state_ = pre->state_; ssrc_str_ = pre->ssrc_str_; ssrc_v_ = pre->ssrc_v_; @@ -995,36 +1001,36 @@ srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** pse srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy(); } - // Notice session to use current SIP connection. - raw_session->on_sip_transport(*wrapper_); - *psession = raw_session; + // Notice SIP session to use current SIP connection. + session->resource()->on_sip_transport(wrapper); + *psession = session->copy(); return err; } -SrsGbSipTcpReceiver::SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn) +SrsLazyGbSipTcpReceiver::SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn) { sip_ = sip; conn_ = conn; trd_ = new SrsSTCoroutine("sip-receiver", this); } -SrsGbSipTcpReceiver::~SrsGbSipTcpReceiver() +SrsLazyGbSipTcpReceiver::~SrsLazyGbSipTcpReceiver() { srs_freep(trd_); } -void SrsGbSipTcpReceiver::interrupt() +void SrsLazyGbSipTcpReceiver::interrupt() { trd_->interrupt(); } -void SrsGbSipTcpReceiver::set_cid(const SrsContextId& cid) +void SrsLazyGbSipTcpReceiver::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); } -srs_error_t SrsGbSipTcpReceiver::start() +srs_error_t SrsLazyGbSipTcpReceiver::start() { srs_error_t err = srs_success; @@ -1035,7 +1041,7 @@ srs_error_t SrsGbSipTcpReceiver::start() return err; } -srs_error_t SrsGbSipTcpReceiver::cycle() +srs_error_t SrsLazyGbSipTcpReceiver::cycle() { srs_error_t err = do_cycle(); @@ -1047,7 +1053,7 @@ srs_error_t SrsGbSipTcpReceiver::cycle() return err; } -srs_error_t SrsGbSipTcpReceiver::do_cycle() +srs_error_t SrsLazyGbSipTcpReceiver::do_cycle() { srs_error_t err = srs_success; @@ -1086,14 +1092,14 @@ srs_error_t SrsGbSipTcpReceiver::do_cycle() return err; } -SrsGbSipTcpSender::SrsGbSipTcpSender(SrsTcpConnection* conn) +SrsLazyGbSipTcpSender::SrsLazyGbSipTcpSender(SrsTcpConnection* conn) { conn_ = conn; wait_ = srs_cond_new(); trd_ = new SrsSTCoroutine("sip-sender", this); } -SrsGbSipTcpSender::~SrsGbSipTcpSender() +SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender() { srs_freep(trd_); srs_cond_destroy(wait_); @@ -1104,23 +1110,23 @@ SrsGbSipTcpSender::~SrsGbSipTcpSender() } } -void SrsGbSipTcpSender::enqueue(SrsSipMessage* msg) +void SrsLazyGbSipTcpSender::enqueue(SrsSipMessage* msg) { msgs_.push_back(msg); srs_cond_signal(wait_); } -void SrsGbSipTcpSender::interrupt() +void SrsLazyGbSipTcpSender::interrupt() { trd_->interrupt(); } -void SrsGbSipTcpSender::set_cid(const SrsContextId& cid) +void SrsLazyGbSipTcpSender::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); } -srs_error_t SrsGbSipTcpSender::start() +srs_error_t SrsLazyGbSipTcpSender::start() { srs_error_t err = srs_success; @@ -1131,7 +1137,7 @@ srs_error_t SrsGbSipTcpSender::start() return err; } -srs_error_t SrsGbSipTcpSender::cycle() +srs_error_t SrsLazyGbSipTcpSender::cycle() { srs_error_t err = do_cycle(); @@ -1143,7 +1149,7 @@ srs_error_t SrsGbSipTcpSender::cycle() return err; } -srs_error_t SrsGbSipTcpSender::do_cycle() +srs_error_t SrsLazyGbSipTcpSender::do_cycle() { srs_error_t err = srs_success; @@ -1213,74 +1219,71 @@ ISrsPsPackHandler::~ISrsPsPackHandler() { } -SrsGbMediaTcpConn::SrsGbMediaTcpConn() +SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root) { + wrapper_root_ = wrapper_root; pack_ = new SrsPackContext(this); + trd_ = new SrsSTCoroutine("media", this); buffer_ = new uint8_t[65535]; conn_ = NULL; - wrapper_ = NULL; - owner_coroutine_ = NULL; - owner_cid_ = NULL; - cid_ = _srs_context->get_id(); - session_ = NULL; connected_ = false; nn_rtcp_ = 0; } -SrsGbMediaTcpConn::~SrsGbMediaTcpConn() +SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn() { + srs_freep(trd_); srs_freep(conn_); srs_freepa(buffer_); srs_freep(pack_); + srs_freep(session_); } -void SrsGbMediaTcpConn::setup(srs_netfd_t stfd) +void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd) { srs_freep(conn_); conn_ = new SrsTcpConnection(stfd); } -void SrsGbMediaTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +bool SrsLazyGbMediaTcpConn::is_connected() { - wrapper_ = wrapper; - owner_coroutine_ = owner_coroutine; - owner_cid_ = owner_cid; + return connected_; } -void SrsGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor) +void SrsLazyGbMediaTcpConn::interrupt() { - owner_coroutine_ = NULL; + trd_->interrupt(); } -bool SrsGbMediaTcpConn::is_connected() +void SrsLazyGbMediaTcpConn::set_cid(const SrsContextId& cid) { - return connected_; + trd_->set_cid(cid); } -void SrsGbMediaTcpConn::interrupt() +const SrsContextId& SrsLazyGbMediaTcpConn::get_id() { - if (owner_coroutine_) owner_coroutine_->interrupt(); + return _srs_context->get_id(); } -void SrsGbMediaTcpConn::set_cid(const SrsContextId& cid) +std::string SrsLazyGbMediaTcpConn::desc() { - if (owner_cid_) owner_cid_->set_cid(cid); - cid_ = cid; + return "GB-Media-TCP"; } -const SrsContextId& SrsGbMediaTcpConn::get_id() +srs_error_t SrsLazyGbMediaTcpConn::start() { - return cid_; -} + srs_error_t err = srs_success; -std::string SrsGbMediaTcpConn::desc() -{ - return "GB-Media-TCP"; + if ((err = trd_->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; } -srs_error_t SrsGbMediaTcpConn::cycle() +srs_error_t SrsLazyGbMediaTcpConn::cycle() { srs_error_t err = do_cycle(); @@ -1292,6 +1295,11 @@ srs_error_t SrsGbMediaTcpConn::cycle() connected_ = false; srs_trace("PS: Media disconnect, code=%d", srs_error_code(err)); + // Note that we added wrapper to manager, so we must free the wrapper, not this connection. + SrsLazyObjectWrapper* wrapper = wrapper_root_; + srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. + _srs_gb_manager->remove(wrapper); + // success. if (err == srs_success) { srs_trace("client finished."); @@ -1319,7 +1327,7 @@ srs_error_t SrsGbMediaTcpConn::cycle() return srs_success; } -srs_error_t SrsGbMediaTcpConn::do_cycle() +srs_error_t SrsLazyGbMediaTcpConn::do_cycle() { srs_error_t err = srs_success; @@ -1333,8 +1341,7 @@ srs_error_t SrsGbMediaTcpConn::do_cycle() uint32_t reserved = 0; for (;;) { - if (!owner_coroutine_) return err; - if ((err = owner_coroutine_->pull()) != srs_success) { + if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } @@ -1419,7 +1426,7 @@ srs_error_t SrsGbMediaTcpConn::do_cycle() return err; } -srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs) +srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs) { srs_error_t err = srs_success; @@ -1430,7 +1437,7 @@ srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vectoron_ps_pack(pack_, ps, msgs); + session_->resource()->on_ps_pack(pack_, ps, msgs); //for (vector::const_iterator it = msgs.begin(); it != msgs.end(); ++it) { // SrsTsMessage* msg = *it; @@ -1443,22 +1450,23 @@ srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector** psession) { srs_error_t err = srs_success; if (!ssrc) return err; + // The lazy-sweep wrapper for this resource. + SrsLazyObjectWrapper* wrapper = wrapper_root_; + srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. + // Find exists session for register, might be created by another object and still alive. - SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); + SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); if (!session) return err; - SrsGbSession* raw_session = (*session).get(); - srs_assert(raw_session); - - // Notice session to use current media connection. - raw_session->on_media_transport(*wrapper_); - *psession = raw_session; + _srs_gb_manager->add_with_fast_id(ssrc, session); + session->resource()->on_media_transport(wrapper); + *psession = session->copy(); return err; } @@ -1537,7 +1545,7 @@ SrsSharedPtrMessage* SrsMpegpsQueue::dequeue() return NULL; } -SrsGbMuxer::SrsGbMuxer(SrsGbSession* session) +SrsGbMuxer::SrsGbMuxer(SrsLazyGbSession* session) { sdk_ = NULL; session_ = session; @@ -1572,9 +1580,13 @@ SrsGbMuxer::~SrsGbMuxer() srs_freep(pprint_); } -void SrsGbMuxer::setup(std::string output) +srs_error_t SrsGbMuxer::initialize(std::string output) { + srs_error_t err = srs_success; + output_ = output; + + return err; } srs_error_t SrsGbMuxer::on_ts_message(SrsTsMessage* msg) @@ -2083,7 +2095,7 @@ srs_error_t SrsGbMuxer::connect() // Cleanup the data before connect again. close(); - string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->device_id()); + string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id()); srs_trace("Muxer: Convert GB to RTMP %s", url.c_str()); srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index e44b618c6d..783842505f 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -26,11 +26,11 @@ class SrsCoroutine; class SrsPackContext; class SrsBuffer; class SrsSipMessage; -class SrsGbSession; -class SrsGbSipTcpConn; -class SrsGbMediaTcpConn; -class SrsGbSipTcpReceiver; -class SrsGbSipTcpSender; +class SrsLazyGbSession; +class SrsLazyGbSipTcpConn; +class SrsLazyGbMediaTcpConn; +class SrsLazyGbSipTcpReceiver; +class SrsLazyGbSipTcpSender; class SrsAlonePithyPrint; class SrsGbMuxer; class SrsSimpleRtmpClient; @@ -51,7 +51,7 @@ class SrsRawAacStream; // established: // init: media is not connected. // dispose session: sip is bye. -// Please see SrsGbSession::drive_state for detail. +// Please see SrsLazyGbSession::drive_state for detail. enum SrsGbSessionState { SrsGbSessionStateInit = 0, @@ -76,7 +76,7 @@ std::string srs_gb_session_state(SrsGbSessionState state); // to bye: Got bye SIP message from device. // re-inviting: // to inviting: Got bye OK response from deivce. -// Please see SrsGbSipTcpConn::drive_state for detail. +// Please see SrsLazyGbSipTcpConn::drive_state for detail. enum SrsGbSipState { SrsGbSipStateInit = 0, @@ -90,23 +90,16 @@ enum SrsGbSipState std::string srs_gb_sip_state(SrsGbSipState state); // The main logic object for GB, the session. -// Each session contains a SIP object and a media object, that are managed by session. This means session always -// lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word, -// SIP and media objects use directly pointer to session, while session use shared ptr. -class SrsGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler +class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler { private: + SrsCoroutine* trd_; SrsContextId cid_; -private: - // The shared resource which own this object, we should never free it because it's managed by shared ptr. - SrsSharedResource* wrapper_; - // The owner coroutine, allow user to interrupt the loop. - ISrsInterruptable* owner_coroutine_; - ISrsContextIdSetter* owner_cid_; private: SrsGbSessionState state_; - SrsSharedResource sip_; - SrsSharedResource media_; + SrsLazyObjectWrapper* wrapper_root_; + SrsLazyObjectWrapper* sip_; + SrsLazyObjectWrapper* media_; SrsGbMuxer* muxer_; private: // The candidate for SDP in configuration. @@ -139,27 +132,26 @@ class SrsGbSession : public ISrsResource, public ISrsCoroutineHandler, public IS uint64_t media_recovered_; uint64_t media_msgs_dropped_; uint64_t media_reserved_; +private: + friend class SrsLazyObjectWrapper; + SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root); public: - SrsGbSession(); - virtual ~SrsGbSession(); + virtual ~SrsLazyGbSession(); public: // Initialize the GB session. - void setup(SrsConfDirective* conf); - // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. - void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); -// Interface ISrsExecutorHandler -public: - virtual void on_executor_done(ISrsInterruptable* executor); -public: + srs_error_t initialize(SrsConfDirective* conf); // When got a pack of messages. void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs); // When got available SIP transport. - void on_sip_transport(SrsSharedResource sip); - SrsSharedResource sip_transport(); + void on_sip_transport(SrsLazyObjectWrapper* sip); + SrsLazyObjectWrapper* sip_transport(); // When got available media transport. - void on_media_transport(SrsSharedResource media); + void on_media_transport(SrsLazyObjectWrapper* media); // Get the candidate for SDP generation, the public IP address for device to connect to. std::string pip(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); @@ -194,12 +186,12 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler }; // A GB28181 TCP SIP connection. -class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler +class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler { private: SrsGbSipState state_; - // The owner session object, note that we use the raw pointer and should never free it. - SrsGbSession* session_; + SrsLazyObjectWrapper* wrapper_root_; + SrsLazyObjectWrapper* session_; SrsSipMessage* register_; SrsSipMessage* invite_ok_; private: @@ -209,29 +201,19 @@ class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public SrsConfDirective* conf_; SrsTcpListener* sip_listener_; SrsTcpListener* media_listener_; -private: - // The shared resource which own this object, we should never free it because it's managed by shared ptr. - SrsSharedResource* wrapper_; - // The owner coroutine, allow user to interrupt the loop. - ISrsInterruptable* owner_coroutine_; - ISrsContextIdSetter* owner_cid_; - SrsContextId cid_; private: SrsTcpConnection* conn_; - SrsGbSipTcpReceiver* receiver_; - SrsGbSipTcpSender* sender_; + SrsLazyGbSipTcpReceiver* receiver_; + SrsLazyGbSipTcpSender* sender_; + SrsCoroutine* trd_; +private: + friend class SrsLazyObjectWrapper; + SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root); public: - SrsGbSipTcpConn(); - virtual ~SrsGbSipTcpConn(); + virtual ~SrsLazyGbSipTcpConn(); public: // Setup object, to keep empty constructor. void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd); - // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. - void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); -// Interface ISrsExecutorHandler -public: - virtual void on_executor_done(ISrsInterruptable* executor); -public: // Get the SIP device id. std::string device_id(); // Set the cid of all coroutines. @@ -271,26 +253,29 @@ class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public public: virtual const SrsContextId& get_id(); virtual std::string desc(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); private: - srs_error_t do_cycle(); + virtual srs_error_t do_cycle(); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(SrsSipMessage* msg, SrsGbSession** psession); + srs_error_t bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession); }; // Start a coroutine to receive SIP messages. -class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler +class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler { private: SrsCoroutine* trd_; SrsTcpConnection* conn_; - SrsGbSipTcpConn* sip_; + SrsLazyGbSipTcpConn* sip_; public: - SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn); - virtual ~SrsGbSipTcpReceiver(); + SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn); + virtual ~SrsLazyGbSipTcpReceiver(); public: // Interrupt the receiver coroutine. void interrupt(); @@ -307,7 +292,7 @@ class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler }; // Start a coroutine to send out SIP messages. -class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler +class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler { private: SrsCoroutine* trd_; @@ -316,8 +301,8 @@ class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler std::vector msgs_; srs_cond_t wait_; public: - SrsGbSipTcpSender(SrsTcpConnection* conn); - virtual ~SrsGbSipTcpSender(); + SrsLazyGbSipTcpSender(SrsTcpConnection* conn); + virtual ~SrsLazyGbSipTcpSender(); public: // Push message to queue, and sender will send out in dedicate coroutine. void enqueue(SrsSipMessage* msg); @@ -348,36 +333,27 @@ class ISrsPsPackHandler }; // A GB28181 TCP media connection, for PS stream. -class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler +class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler + , public ISrsPsPackHandler { private: bool connected_; - // The owner session object, note that we use the raw pointer and should never free it. - SrsGbSession* session_; + SrsLazyObjectWrapper* wrapper_root_; + SrsLazyObjectWrapper* session_; uint32_t nn_rtcp_; -private: - // The shared resource which own this object, we should never free it because it's managed by shared ptr. - SrsSharedResource* wrapper_; - // The owner coroutine, allow user to interrupt the loop. - ISrsInterruptable* owner_coroutine_; - ISrsContextIdSetter* owner_cid_; - SrsContextId cid_; private: SrsPackContext* pack_; SrsTcpConnection* conn_; + SrsCoroutine* trd_; uint8_t* buffer_; +private: + friend class SrsLazyObjectWrapper; + SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root); public: - SrsGbMediaTcpConn(); - virtual ~SrsGbMediaTcpConn(); + virtual ~SrsLazyGbMediaTcpConn(); public: // Setup object, to keep empty constructor. void setup(srs_netfd_t stfd); - // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. - void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); -// Interface ISrsExecutorHandler -public: - virtual void on_executor_done(ISrsInterruptable* executor); -public: // Whether media is connected. bool is_connected(); // Interrupt transport by session. @@ -388,6 +364,9 @@ class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, publ public: virtual const SrsContextId& get_id(); virtual std::string desc(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); @@ -398,7 +377,7 @@ class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, publ virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector& msgs); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(uint32_t ssrc, SrsGbSession** psession); + srs_error_t bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession); }; // The queue for mpegts over udp to send packets. @@ -423,8 +402,7 @@ class SrsMpegpsQueue class SrsGbMuxer { private: - // The owner session object, note that we use the raw pointer and should never free it. - SrsGbSession* session_; + SrsLazyGbSession* session_; std::string output_; SrsSimpleRtmpClient* sdk_; private: @@ -450,10 +428,10 @@ class SrsGbMuxer SrsMpegpsQueue* queue_; SrsPithyPrint* pprint_; public: - SrsGbMuxer(SrsGbSession* session); + SrsGbMuxer(SrsLazyGbSession* session); virtual ~SrsGbMuxer(); public: - void setup(std::string output); + srs_error_t initialize(std::string output); srs_error_t on_ts_message(SrsTsMessage* msg); private: virtual srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c5f6500751..d213f3f098 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1369,6 +1369,11 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg) } #endif + SrsLazySweepGc* gc = dynamic_cast(_srs_gc); + if ((err = gc->start()) != srs_success) { + return srs_error_wrap(err, "start gc"); + } + return err; } diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 3e21e468cd..77f581a640 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -30,30 +30,6 @@ ISrsStartable::~ISrsStartable() { } -ISrsInterruptable::ISrsInterruptable() -{ -} - -ISrsInterruptable::~ISrsInterruptable() -{ -} - -ISrsContextIdSetter::ISrsContextIdSetter() -{ -} - -ISrsContextIdSetter::~ISrsContextIdSetter() -{ -} - -ISrsContextIdGetter::ISrsContextIdGetter() -{ -} - -ISrsContextIdGetter::~ISrsContextIdGetter() -{ -} - SrsCoroutine::SrsCoroutine() { } @@ -366,69 +342,3 @@ void SrsWaitGroup::wait() } } -ISrsExecutorHandler::ISrsExecutorHandler() -{ -} - -ISrsExecutorHandler::~ISrsExecutorHandler() -{ -} - -SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb) -{ - resource_ = r; - handler_ = h; - manager_ = m; - callback_ = cb; - trd_ = new SrsSTCoroutine("ar", this, resource_->get_id()); -} - -SrsExecutorCoroutine::~SrsExecutorCoroutine() -{ - manager_->remove(resource_); - srs_freep(trd_); -} - -srs_error_t SrsExecutorCoroutine::start() -{ - return trd_->start(); -} - -void SrsExecutorCoroutine::interrupt() -{ - trd_->interrupt(); -} - -srs_error_t SrsExecutorCoroutine::pull() -{ - return trd_->pull(); -} - -const SrsContextId& SrsExecutorCoroutine::cid() -{ - return trd_->cid(); -} - -void SrsExecutorCoroutine::set_cid(const SrsContextId& cid) -{ - trd_->set_cid(cid); -} - -srs_error_t SrsExecutorCoroutine::cycle() -{ - srs_error_t err = handler_->cycle(); - if (callback_) callback_->on_executor_done(this); - manager_->remove(this); - return err; -} - -const SrsContextId& SrsExecutorCoroutine::get_id() -{ - return resource_->get_id(); -} - -std::string SrsExecutorCoroutine::desc() -{ - return resource_->desc(); -} - diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index d7315b20cd..51282bca25 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -15,10 +15,8 @@ #include #include #include -#include class SrsFastCoroutine; -class SrsExecutorCoroutine; // Each ST-coroutine must implements this interface, // to do the cycle job and handle some events. @@ -66,46 +64,21 @@ class ISrsStartable virtual srs_error_t start() = 0; }; -// Allow user to interrupt the coroutine, for example, to stop it. -class ISrsInterruptable -{ -public: - ISrsInterruptable(); - virtual ~ISrsInterruptable(); -public: - virtual void interrupt() = 0; - virtual srs_error_t pull() = 0; -}; - -// Get the context id. -class ISrsContextIdSetter -{ -public: - ISrsContextIdSetter(); - virtual ~ISrsContextIdSetter(); -public: - virtual void set_cid(const SrsContextId& cid) = 0; -}; - -// Set the context id. -class ISrsContextIdGetter -{ -public: - ISrsContextIdGetter(); - virtual ~ISrsContextIdGetter(); -public: - virtual const SrsContextId& cid() = 0; -}; - -// The coroutine object. -class SrsCoroutine : public ISrsStartable, public ISrsInterruptable - , public ISrsContextIdSetter, public ISrsContextIdGetter +// The corotine object. +class SrsCoroutine : public ISrsStartable { public: SrsCoroutine(); virtual ~SrsCoroutine(); public: virtual void stop() = 0; + virtual void interrupt() = 0; + // @return a copy of error, which should be freed by user. + // NULL if not terminated and user should pull again. + virtual srs_error_t pull() = 0; + // Get and set the context id of coroutine. + virtual const SrsContextId& cid() = 0; + virtual void set_cid(const SrsContextId& cid) = 0; }; // An empty coroutine, user can default to this object before create any real coroutine. @@ -219,7 +192,7 @@ class SrsFastCoroutine static void* pfn(void* arg); }; -// Like goroutine sync.WaitGroup. +// Like goroytine sync.WaitGroup. class SrsWaitGroup { private: @@ -233,72 +206,9 @@ class SrsWaitGroup void add(int n); // When coroutine is done. void done(); - // Wait for all coroutine to be done. + // Wait for all corotine to be done. void wait(); }; -// The callback when executor cycle done. -class ISrsExecutorHandler -{ -public: - ISrsExecutorHandler(); - virtual ~ISrsExecutorHandler(); -public: - virtual void on_executor_done(ISrsInterruptable* executor) = 0; -}; - -// Start a coroutine for resource executor, to execute the handler and delete resource and itself when -// handler cycle done. -// -// Note that the executor will free itself by manager, then free the resource by manager. This is a helper -// that is used for a goroutine to execute a handler and free itself after the cycle is done. -// -// Generally, the handler, resource, and callback generally are the same object. But we do not define a single -// interface, because shared resource is a special interface. -// -// Note that the resource may live longer than executor, because it is shared resource, so you should process -// the callback. For example, you should never use the executor after it's stopped and deleted. -// -// Usage: -// ISrsResourceManager* manager = ...; -// ISrsResource* resource, ISrsCoroutineHandler* handler, ISrsExecutorHandler* callback = ...; // Resource, handler, and callback are the same object. -// SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(manager, resource, handler); -// if ((err = executor->start()) != srs_success) { -// srs_freep(executor); -// return err; -// } -class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsInterruptable - , public ISrsContextIdSetter, public ISrsContextIdGetter, public ISrsCoroutineHandler -{ -private: - ISrsResourceManager* manager_; - ISrsResource* resource_; - ISrsCoroutineHandler* handler_; - ISrsExecutorHandler* callback_; -private: - SrsCoroutine* trd_; -public: - SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb); - virtual ~SrsExecutorCoroutine(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); -// Interface ISrsInterruptable -public: - virtual void interrupt(); - virtual srs_error_t pull(); -// Interface ISrsContextId -public: - virtual const SrsContextId& cid(); - virtual void set_cid(const SrsContextId& cid); -// Interface ISrsOneCycleThreadHandler -public: - virtual srs_error_t cycle(); -// Interface ISrsResource -public: - virtual const SrsContextId& get_id(); - virtual std::string desc(); -}; - #endif diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index d5c6e42297..b42a163be1 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -335,6 +335,7 @@ srs_error_t srs_global_initialize() #ifdef SRS_GB28181 _srs_gb_manager = new SrsResourceManager("GB", true); #endif + _srs_gc = new SrsLazySweepGc(); // Initialize global pps, which depends on _srs_clock _srs_pps_ids = new SrsPps(); diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index 28f7662767..e8f281519e 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -81,103 +81,4 @@ class impl_SrsAutoFree } }; -// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107 -// Usage: -// SrsSharedPtr ptr(new MyClass()); -// ptr->do_something(); -// -// SrsSharedPtr cp = ptr; -// cp->do_something(); -template -class SrsSharedPtr -{ -private: - // The pointer to the object. - T* ptr_; - // The reference count of the object. - uint32_t* ref_count_; -public: - // Create a shared ptr with the object. - SrsSharedPtr(T* ptr) { - ptr_ = ptr; - ref_count_ = new uint32_t(1); - } - // Copy the shared ptr. - SrsSharedPtr(const SrsSharedPtr& cp) { - copy(cp); - } - // Dispose and delete the shared ptr. - virtual ~SrsSharedPtr() { - reset(); - } -private: - // Reset the shared ptr. - void reset() { - if (!ref_count_) return; - - (*ref_count_)--; - if (*ref_count_ == 0) { - delete ptr_; - delete ref_count_; - } - - ptr_ = NULL; - ref_count_ = NULL; - } - // Copy from other shared ptr. - void copy(const SrsSharedPtr& cp) { - ptr_ = cp.ptr_; - ref_count_ = cp.ref_count_; - if (ref_count_) (*ref_count_)++; - } - // Move from other shared ptr. - void move(SrsSharedPtr& cp) { - ptr_ = cp.ptr_; - ref_count_ = cp.ref_count_; - cp.ptr_ = NULL; - cp.ref_count_ = NULL; - } -public: - // Get the object. - T* get() { - return ptr_; - } - // Overload the -> operator. - T* operator->() { - return ptr_; - } - // The assign operator. - SrsSharedPtr& operator=(const SrsSharedPtr& cp) { - if (this != &cp) { - reset(); - copy(cp); - } - return *this; - } -private: - // Overload the * operator. - T& operator*() { - return *ptr_; - } - // Overload the bool operator. - operator bool() const { - return ptr_ != NULL; - } -#if __cplusplus >= 201103L // C++11 -public: - // The move constructor. - SrsSharedPtr(SrsSharedPtr&& cp) { - move(cp); - }; - // The move assign operator. - SrsSharedPtr& operator=(SrsSharedPtr&& cp) { - if (this != &cp) { - reset(); - move(cp); - } - return *this; - }; -#endif -}; - #endif diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index e8bac6484f..d12f389b62 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 126 +#define VERSION_REVISION 125 #endif diff --git a/trunk/src/protocol/srs_protocol_conn.cpp b/trunk/src/protocol/srs_protocol_conn.cpp index b0fba2dab8..0c988b9b60 100644 --- a/trunk/src/protocol/srs_protocol_conn.cpp +++ b/trunk/src/protocol/srs_protocol_conn.cpp @@ -40,3 +40,35 @@ ISrsConnection::~ISrsConnection() { } +SrsLazyObject::SrsLazyObject() +{ + gc_ref_ = 0; +} + +SrsLazyObject::~SrsLazyObject() +{ +} + +void SrsLazyObject::gc_use() +{ + gc_ref_++; +} + +void SrsLazyObject::gc_dispose() +{ + gc_ref_--; +} + +int32_t SrsLazyObject::gc_ref() +{ + return gc_ref_; +} + +ISrsLazyGc::ISrsLazyGc() +{ +} + +ISrsLazyGc::~ISrsLazyGc() +{ +} + diff --git a/trunk/src/protocol/srs_protocol_conn.hpp b/trunk/src/protocol/srs_protocol_conn.hpp index ee17aed56e..b136716adf 100644 --- a/trunk/src/protocol/srs_protocol_conn.hpp +++ b/trunk/src/protocol/srs_protocol_conn.hpp @@ -33,9 +33,7 @@ class ISrsResourceManager ISrsResourceManager(); virtual ~ISrsResourceManager(); public: - // Remove then free the specified connection. Note that the manager always free c resource, - // in the same coroutine or another coroutine. Some manager may support add c to a map, it - // should always free it even if it's in the map. + // Remove then free the specified connection. virtual void remove(ISrsResource* c) = 0; }; @@ -50,5 +48,36 @@ class ISrsConnection : public ISrsResource virtual std::string remote_ip() = 0; }; +// Lazy-sweep resource, never sweep util all wrappers are freed. +// See https://github.com/ossrs/srs/issues/3176#lazy-sweep +class SrsLazyObject +{ +private: + // The reference count of resource, 0 is no wrapper and safe to sweep. + int32_t gc_ref_; +public: + SrsLazyObject(); + virtual ~SrsLazyObject(); +public: + // For wrapper to use this resource. + virtual void gc_use(); + // For wrapper to dispose this resource. + virtual void gc_dispose(); + // The current reference count of resource. + virtual int32_t gc_ref(); +}; + +// The lazy-sweep GC, wait for a long time to dispose resource even when resource is disposable. +// See https://github.com/ossrs/srs/issues/3176#lazy-sweep +class ISrsLazyGc +{ +public: + ISrsLazyGc(); + virtual ~ISrsLazyGc(); +public: + // Remove then free the specified resource. + virtual void remove(SrsLazyObject* c) = 0; +}; + #endif diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 2344e911ef..1b399fd771 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -8,8 +8,6 @@ using namespace std; #include -#include -#include VOID TEST(CoreAutoFreeTest, Free) { @@ -88,343 +86,3 @@ VOID TEST(CoreLogger, CheckVsnprintf) } } -VOID TEST(CoreLogger, SharedPtrTypical) -{ - if (true) { - SrsSharedPtr p(new int(100)); - EXPECT_TRUE(p); - EXPECT_EQ(100, *p); - } - - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q = p; - EXPECT_EQ(p.get(), q.get()); - } - - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q(p); - EXPECT_EQ(p.get(), q.get()); - } - - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q = p; - EXPECT_TRUE(p); - EXPECT_TRUE(q); - EXPECT_EQ(100, *p); - EXPECT_EQ(100, *q); - } -} - -VOID TEST(CoreLogger, SharedPtrReset) -{ - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q = p; - p.reset(); - EXPECT_FALSE(p); - EXPECT_TRUE(q); - EXPECT_EQ(100, *q); - } - - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q = p; - q.reset(); - EXPECT_TRUE(p); - EXPECT_FALSE(q); - EXPECT_EQ(100, *p); - } -} - -VOID TEST(CoreLogger, SharedPtrObject) -{ - SrsSharedPtr p(new MyNormalObject(100)); - EXPECT_TRUE(p); - EXPECT_EQ(100, p->id()); -} - -VOID TEST(CoreLogger, SharedPtrNullptr) -{ - SrsSharedPtr p(NULL); - EXPECT_FALSE(p); - - p.reset(); - EXPECT_FALSE(p); - - SrsSharedPtr q = p; - EXPECT_FALSE(q); -} - -class MockWrapper -{ -public: - int* ptr; -public: - MockWrapper(int* p) { - ptr = p; - *ptr = *ptr + 1; - } - ~MockWrapper() { - *ptr = *ptr - 1; - } -}; - -VOID TEST(CoreLogger, SharedPtrWrapper) -{ - int* ptr = new int(100); - SrsAutoFree(int, ptr); - EXPECT_EQ(100, *ptr); - - if (true) { - SrsSharedPtr p(new MockWrapper(ptr)); - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *p->ptr); - - SrsSharedPtr q = p; - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *p->ptr); - EXPECT_EQ(101, *q->ptr); - - SrsSharedPtr r(new MockWrapper(ptr)); - EXPECT_EQ(102, *ptr); - EXPECT_EQ(102, *p->ptr); - EXPECT_EQ(102, *q->ptr); - EXPECT_EQ(102, *r->ptr); - - SrsSharedPtr s(new MockWrapper(ptr)); - EXPECT_EQ(103, *ptr); - EXPECT_EQ(103, *p->ptr); - EXPECT_EQ(103, *q->ptr); - EXPECT_EQ(103, *r->ptr); - EXPECT_EQ(103, *s->ptr); - } - EXPECT_EQ(100, *ptr); - - if (true) { - SrsSharedPtr p(new MockWrapper(ptr)); - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *p->ptr); - } - EXPECT_EQ(100, *ptr); -} - -VOID TEST(CoreLogger, SharedPtrAssign) -{ - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q(NULL); - q = p; - EXPECT_EQ(p.get(), q.get()); - } - - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q(new int(101)); - - int* q0 = q.get(); - q = p; - EXPECT_EQ(p.get(), q.get()); - EXPECT_NE(q0, q.get()); - } - - int* ptr0 = new int(100); - SrsAutoFree(int, ptr0); - EXPECT_EQ(100, *ptr0); - - int* ptr1 = new int(200); - SrsAutoFree(int, ptr1); - EXPECT_EQ(200, *ptr1); - - if (true) { - SrsSharedPtr p(new MockWrapper(ptr0)); - EXPECT_EQ(101, *ptr0); - EXPECT_EQ(101, *p->ptr); - - SrsSharedPtr q(new MockWrapper(ptr1)); - EXPECT_EQ(201, *ptr1); - EXPECT_EQ(201, *q->ptr); - - q = p; - EXPECT_EQ(200, *ptr1); - EXPECT_EQ(101, *ptr0); - EXPECT_EQ(101, *p->ptr); - EXPECT_EQ(101, *q->ptr); - } - - EXPECT_EQ(100, *ptr0); - EXPECT_EQ(200, *ptr1); -} - -template -SrsSharedPtr mock_shared_ptr_move_assign(SrsSharedPtr p) { - SrsSharedPtr q = p; - return q; -} - -template -SrsSharedPtr mock_shared_ptr_move_ctr(SrsSharedPtr p) { - return p; -} - -VOID TEST(CoreLogger, SharedPtrMove) -{ - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q(new int(101)); - q = mock_shared_ptr_move_ctr(p); - EXPECT_EQ(q.get(), p.get()); - } - - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q(new int(101)); - q = mock_shared_ptr_move_assign(p); - EXPECT_EQ(q.get(), p.get()); - } - - int* ptr = new int(100); - SrsAutoFree(int, ptr); - EXPECT_EQ(100, *ptr); - - if (true) { - SrsSharedPtr p(new MockWrapper(ptr)); - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *p->ptr); - - SrsSharedPtr q(new MockWrapper(ptr)); - q = mock_shared_ptr_move_ctr(p); - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *q->ptr); - } - EXPECT_EQ(100, *ptr); - - if (true) { - SrsSharedPtr p(new MockWrapper(ptr)); - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *p->ptr); - - SrsSharedPtr q(new MockWrapper(ptr)); - q = mock_shared_ptr_move_assign(p); - EXPECT_EQ(101, *ptr); - EXPECT_EQ(101, *q->ptr); - } - EXPECT_EQ(100, *ptr); - - // Note that this will not trigger the move constructor or move assignment operator. - if (true) { - SrsSharedPtr p(new int(100)); - SrsSharedPtr q = mock_shared_ptr_move_assign(p); - EXPECT_EQ(q.get(), p.get()); - } - - // Note that this will not trigger the move constructor or move assignment operator. - if (true) { - SrsSharedPtr p = SrsSharedPtr(new int(100)); - EXPECT_TRUE(p); - EXPECT_EQ(100, *p); - } -} - -class MockIntResource : public ISrsResource -{ -public: - SrsContextId id_; - int value_; -public: - MockIntResource(int value) : value_(value) { - } - virtual ~MockIntResource() { - } -public: - virtual const SrsContextId& get_id() { - return id_; - } - virtual std::string desc() { - return id_.c_str(); - } -}; - -VOID TEST(CoreLogger, SharedResourceTypical) -{ - if (true) { - SrsSharedResource* p = new SrsSharedResource(new MockIntResource(100)); - EXPECT_TRUE(*p); - EXPECT_EQ(100, (*p)->value_); - srs_freep(p); - } - - if (true) { - SrsSharedResource p(new MockIntResource(100)); - EXPECT_TRUE(p); - EXPECT_EQ(100, p->value_); - } - - if (true) { - SrsSharedResource p = SrsSharedResource(new MockIntResource(100)); - EXPECT_TRUE(p); - EXPECT_EQ(100, p->value_); - } - - if (true) { - SrsSharedResource p(new MockIntResource(100)); - SrsSharedResource q = p; - EXPECT_EQ(p.get(), q.get()); - } - - if (true) { - SrsSharedResource p(new MockIntResource(100)); - SrsSharedResource q(NULL); - q = p; - EXPECT_EQ(p.get(), q.get()); - } - - if (true) { - SrsSharedResource p(new MockIntResource(100)); - SrsSharedResource q(new MockIntResource(200)); - q = p; - EXPECT_EQ(p.get(), q.get()); - } - - if (true) { - SrsSharedResource p(new MockIntResource(100)); - SrsSharedResource q = p; - EXPECT_TRUE(p); - EXPECT_TRUE(q); - EXPECT_EQ(100, p->value_); - EXPECT_EQ(100, q->value_); - } -} - -template -SrsSharedResource mock_shared_resource_move_assign(SrsSharedResource p) { - SrsSharedResource q = p; - return q; -} - -template -SrsSharedResource mock_shared_resource_move_ctr(SrsSharedResource p) { - return p; -} - -VOID TEST(CoreLogger, SharedResourceMove) -{ - if (true) { - SrsSharedResource p(new MockIntResource(100)); - SrsSharedResource q(new MockIntResource(101)); - q = mock_shared_resource_move_ctr(p); - EXPECT_EQ(100, q->value_); - EXPECT_EQ(q.get(), p.get()); - } - - if (true) { - SrsSharedResource p(new MockIntResource(100)); - SrsSharedResource q(new MockIntResource(101)); - q = mock_shared_resource_move_assign(p); - EXPECT_EQ(100, q->value_); - EXPECT_EQ(q.get(), p.get()); - } -} - diff --git a/trunk/src/utest/srs_utest_core.hpp b/trunk/src/utest/srs_utest_core.hpp index 8c7306384d..1c7795b977 100644 --- a/trunk/src/utest/srs_utest_core.hpp +++ b/trunk/src/utest/srs_utest_core.hpp @@ -14,18 +14,5 @@ #include -class MyNormalObject -{ -private: - int id_; -public: - MyNormalObject(int id) { - id_ = id; - } - int id() { - return id_; - } -}; - #endif