Skip to content

Commit

Permalink
Revert "SmartPtr: Support shared ptr for SRT source. (ossrs#4084)"
Browse files Browse the repository at this point in the history
This reverts commit 7b9c52b.
  • Loading branch information
winlinvip committed Jun 13, 2024
1 parent 36dc545 commit fecf36f
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 42 deletions.
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class SrsSharedResource : public ISrsResource
private:
SrsSharedPtr<T> ptr_;
public:
SrsSharedResource(T* ptr = NULL) : ptr_(ptr) {
SrsSharedResource(T* ptr) : ptr_(ptr) {
}
SrsSharedResource(const SrsSharedResource<T>& cp) : ptr_(cp.ptr_) {
}
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,11 +1216,11 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti

// Check whether SRT stream is busy.
#ifdef SRS_SRT
SrsSrtSource* srt = NULL;
bool srt_server_enabled = _srs_config->get_srt_enabled();
bool srt_enabled = _srs_config->get_srt_enabled(r->vhost);
if (srt_server_enabled && srt_enabled) {
SrsSharedPtr<SrsSrtSource> srt;
if ((err = _srs_srt_sources->fetch_or_create(r, srt)) != srs_success) {
if ((err = _srs_srt_sources->fetch_or_create(r, &srt)) != srs_success) {
return srs_error_wrap(err, "create source");
}

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,11 +1102,11 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)

// Check whether SRT stream is busy.
#ifdef SRS_SRT
SrsSrtSource* srt = NULL;
bool srt_server_enabled = _srs_config->get_srt_enabled();
bool srt_enabled = _srs_config->get_srt_enabled(req->vhost);
if (srt_server_enabled && srt_enabled && !info->edge) {
SrsSharedPtr<SrsSrtSource> srt;
if ((err = _srs_srt_sources->fetch_or_create(req, srt)) != srs_success) {
if ((err = _srs_srt_sources->fetch_or_create(req, &srt)) != srs_success) {
return srs_error_wrap(err, "create source");
}

Expand Down
1 change: 0 additions & 1 deletion trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,6 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str());
}

// See SrsSrtSource::on_consumer_destroy
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
#if 0
Expand Down
5 changes: 3 additions & 2 deletions trunk/src/app/srs_app_srt_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ srs_error_t SrsSrtRecvThread::get_recv_err()
return srs_error_copy(recv_err_);
}

SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port) : srt_source_(new SrsSrtSource())
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
Expand All @@ -171,6 +171,7 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s

trd_ = new SrsSTCoroutine("ts-srt", this, _srs_context->get_id());

srt_source_ = NULL;
req_ = new SrsRequest();
req_->ip = ip;

Expand Down Expand Up @@ -284,7 +285,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s",
streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str());

if ((err = _srs_srt_sources->fetch_or_create(req_, srt_source_)) != srs_success) {
if ((err = _srs_srt_sources->fetch_or_create(req_, &srt_source_)) != srs_success) {
return srs_error_wrap(err, "fetch srt source");
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_srt_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class SrsMpegtsSrtConn : public ISrsConnection, public ISrsStartable, public ISr
SrsCoroutine* trd_;

SrsRequest* req_;
SrsSharedPtr<SrsSrtSource> srt_source_;
SrsSrtSource* srt_source_;
SrsSecurity* security_;
};

Expand Down
48 changes: 21 additions & 27 deletions trunk/src/app/srs_app_srt_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,52 +102,56 @@ SrsSrtSourceManager::~SrsSrtSourceManager()
srs_mutex_destroy(lock);
}

srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps)
srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** pps)
{
srs_error_t err = srs_success;

// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
SrsSharedPtr<SrsSrtSource> source = it->second;

SrsSrtSource* source = NULL;
if ((source = fetch(r)) != NULL) {
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->update_auth(r);
pps = source;

*pps = source;
return err;
}

SrsSharedPtr<SrsSrtSource> source(new SrsSrtSource());
string stream_url = r->get_stream_url();
string vhost = r->vhost;

// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());

srs_trace("new srt source, stream_url=%s", stream_url.c_str());

source = new SrsSrtSource();
if ((err = source->initialize(r)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}

pool[stream_url] = source;
pps = source;

*pps = source;

return err;
}

void SrsSrtSourceManager::eliminate(SrsRequest* r)
SrsSrtSource* SrsSrtSourceManager::fetch(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsSrtSource* source = NULL;

string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
if (pool.find(stream_url) == pool.end()) {
return NULL;
}

source = pool[stream_url];

return source;
}

SrsSrtSourceManager* _srs_srt_sources = NULL;
Expand Down Expand Up @@ -969,11 +973,6 @@ void SrsSrtSource::on_consumer_destroy(SrsSrtConsumer* consumer)
if (it != consumers.end()) {
it = consumers.erase(it);
}

// Destroy and cleanup source when no publishers and consumers.
if (can_publish_ && consumers.empty()) {
_srs_srt_sources->eliminate(req);
}
}

bool SrsSrtSource::can_publish()
Expand Down Expand Up @@ -1027,11 +1026,6 @@ void SrsSrtSource::on_unpublish()
bridge_->on_unpublish();
srs_freep(bridge_);
}

// Destroy and cleanup source when no publishers and consumers.
if (can_publish_ && consumers.empty()) {
_srs_srt_sources->eliminate(req);
}
}

srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_srt_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <srs_kernel_ts.hpp>
#include <srs_protocol_st.hpp>
#include <srs_app_stream_bridge.hpp>
#include <srs_core_autofree.hpp>

class SrsSharedPtrMessage;
class SrsRequest;
Expand Down Expand Up @@ -51,17 +50,18 @@ class SrsSrtSourceManager
{
private:
srs_mutex_t lock;
std::map< std::string, SrsSharedPtr<SrsSrtSource> > pool;
std::map<std::string, SrsSrtSource*> pool;
public:
SrsSrtSourceManager();
virtual ~SrsSrtSourceManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps);
// Dispose and destroy the source.
virtual void eliminate(SrsRequest* r);
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSrtSource** pps);
public:
// Get the exists source, NULL when not exists.
virtual SrsSrtSource* fetch(SrsRequest* r);
};

// Global singleton instance.
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_autofree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class SrsSharedPtr
uint32_t* ref_count_;
public:
// Create a shared ptr with the object.
SrsSharedPtr(T* ptr = NULL) {
SrsSharedPtr(T* ptr) {
ptr_ = ptr;
ref_count_ = new uint32_t(1);
}
Expand Down

0 comments on commit fecf36f

Please sign in to comment.