Skip to content

Commit

Permalink
UniquePtr: Support SrsUniquePtr to replace SrsAutoFree.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jul 8, 2024
1 parent 6bbd461 commit 0c84717
Show file tree
Hide file tree
Showing 57 changed files with 1,368 additions and 1,385 deletions.
1 change: 1 addition & 0 deletions trunk/3rdparty/st-srs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ The branch [srs](https://github.com/ossrs/state-threads/tree/srs) was patched an
- [x] Check capability for backtrack.
- [x] Support set specifics for any thread.
- [x] Support st_destroy to free resources for asan.
- [x] Support free the stack, [#38](https://github.com/ossrs/state-threads/issues/38).
- [ ] System: Support sendmmsg for UDP, [#12](https://github.com/ossrs/state-threads/issues/12).

## GDB Tools
Expand Down
6 changes: 5 additions & 1 deletion trunk/3rdparty/st-srs/stk.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ _st_stack_t *_st_stack_new(int stack_size)
_st_stack_t *ts;
int extra;

/* If cache stack, we try to use stack from the cache list. */
#ifdef MD_CACHE_STACK
for (qp = _st_free_stacks.next; qp != &_st_free_stacks; qp = qp->next) {
ts = _ST_THREAD_STACK_PTR(qp);
Expand All @@ -80,10 +81,13 @@ _st_stack_t *_st_stack_new(int stack_size)
#endif

extra = _st_randomize_stacks ? _ST_PAGE_SIZE : 0;
/* If not cache stack, we will free all stack in the list, which contains the stack to be freed.
* Note that we should never directly free it at _st_stack_free, because it is still be used,
* and will cause crash. */
#ifndef MD_CACHE_STACK
for (qp = _st_free_stacks.next; qp != &_st_free_stacks;) {
ts = _ST_THREAD_STACK_PTR(qp);
// Before qp is freed, move to next one, because the qp will be freed when free the ts.
/* Before qp is freed, move to next one, because the qp will be freed when free the ts. */
qp = qp->next;

ST_REMOVE_LINK(&ts->links);
Expand Down
2 changes: 1 addition & 1 deletion trunk/3rdparty/st-srs/utest/st_utest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct ErrorObject {
};
extern std::ostream& operator<<(std::ostream& out, const ErrorObject* err);
#define ST_ASSERT_ERROR(error, r0, message) if (error) return new ErrorObject(r0, message)
#define ST_COROUTINE_JOIN(trd, r0) ErrorObject* r0 = NULL; SrsAutoFree(ErrorObject, r0); if (trd) st_thread_join(trd, (void**)&r0)
#define ST_COROUTINE_JOIN(trd, r0) ErrorObject* r0 = NULL; if (trd) st_thread_join(trd, (void**)&r0); SrsUniquePtr<ErrorObject> r0_uptr(r0)
#define ST_EXPECT_SUCCESS(r0) EXPECT_TRUE(!r0) << r0
#define ST_EXPECT_FAILED(r0) EXPECT_TRUE(r0) << r0

Expand Down
4 changes: 2 additions & 2 deletions trunk/ide/srs_clion/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ ProcessorCount(JOBS)
# We should always configure SRS for switching between branches.
IF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
EXECUTE_PROCESS(
COMMAND ./configure --osx --srt=on --gb28181=on --apm=on --h265=on --utest=on --ffmpeg-opus=off --jobs=${JOBS}
COMMAND ./configure --osx --srt=on --gb28181=on --apm=on --h265=on --hds=on --utest=on --ffmpeg-opus=off --jobs=${JOBS}
WORKING_DIRECTORY ${SRS_DIR} RESULT_VARIABLE ret)
ELSE ()
EXECUTE_PROCESS(
COMMAND ./configure --srt=on --gb28181=on --apm=on --h265=on --utest=on --ffmpeg-opus=off --jobs=${JOBS}
COMMAND ./configure --srt=on --gb28181=on --apm=on --h265=on --hds=on --utest=on --ffmpeg-opus=off --jobs=${JOBS}
WORKING_DIRECTORY ${SRS_DIR} RESULT_VARIABLE ret)
ENDIF ()
if(NOT ret EQUAL 0)
Expand Down
8 changes: 8 additions & 0 deletions trunk/scripts/copy_to_gits.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ if [[ ! -f ~/git/srs-bench/go.mod ]]; then
exit -1
fi

if [[ ! -d ~/git/state-threads ]]; then
echo "no state-threads at ~/git"
exit -1
fi

echo "Copy signaling"
cp -R 3rdparty/signaling/* ~/git/signaling/ &&
cp -R 3rdparty/signaling/.gitignore ~/git/signaling/ &&
Expand All @@ -29,3 +34,6 @@ cp -R 3rdparty/srs-bench/* ~/git/srs-bench/ &&
cp -R 3rdparty/srs-bench/.gitignore ~/git/srs-bench/ &&
(cd ~/git/srs-bench && git st)

echo "Copy state-threads"
cp -R 3rdparty/st-srs/* ~/git/state-threads/ &&
(cd ~/git/state-threads && git st)
28 changes: 13 additions & 15 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1157,13 +1157,13 @@ srs_error_t SrsConfDirective::parse_conf(SrsConfigBuffer* buffer, SrsDirectiveCo
srs_assert(!file.empty());
srs_trace("config parse include %s", file.c_str());

SrsConfigBuffer* include_file_buffer = NULL;
SrsAutoFree(SrsConfigBuffer, include_file_buffer);
if ((err = conf->build_buffer(file, &include_file_buffer)) != srs_success) {
SrsConfigBuffer* include_file_buffer_raw = NULL;
if ((err = conf->build_buffer(file, &include_file_buffer_raw)) != srs_success) {
return srs_error_wrap(err, "buffer fullfill %s", file.c_str());
}
SrsUniquePtr<SrsConfigBuffer> include_file_buffer(include_file_buffer_raw);

if ((err = parse_conf(include_file_buffer, SrsDirectiveContextFile, conf)) != srs_success) {
if ((err = parse_conf(include_file_buffer.get(), SrsDirectiveContextFile, conf)) != srs_success) {
return srs_error_wrap(err, "parse include buffer %s", file.c_str());
}
}
Expand Down Expand Up @@ -1618,10 +1618,8 @@ srs_error_t SrsConfig::reload_vhost(SrsConfDirective* old_root)
srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
{
srs_error_t err = srs_success;

SrsConfDirective* old_root = root;
SrsAutoFree(SrsConfDirective, old_root);


SrsUniquePtr<SrsConfDirective> old_root(root);
root = conf->root;
conf->root = NULL;

Expand Down Expand Up @@ -1655,14 +1653,14 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
}

// Merge config: rtc_server
if ((err = reload_rtc_server(old_root)) != srs_success) {
if ((err = reload_rtc_server(old_root.get())) != srs_success) {
return srs_error_wrap(err, "http steram");;
}

// TODO: FIXME: support reload stream_caster.

// merge config: vhost
if ((err = reload_vhost(old_root)) != srs_success) {
if ((err = reload_vhost(old_root.get())) != srs_success) {
return srs_error_wrap(err, "vhost");;
}

Expand Down Expand Up @@ -2250,13 +2248,13 @@ srs_error_t SrsConfig::parse_file(const char* filename)
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "empty config");
}

SrsConfigBuffer* buffer = NULL;
SrsAutoFree(SrsConfigBuffer, buffer);
if ((err = build_buffer(config_file, &buffer)) != srs_success) {
SrsConfigBuffer* buffer_raw = NULL;
if ((err = build_buffer(config_file, &buffer_raw)) != srs_success) {
return srs_error_wrap(err, "buffer fullfill %s", filename);
}

if ((err = parse_buffer(buffer)) != srs_success) {

SrsUniquePtr<SrsConfigBuffer> buffer(buffer_raw);
if ((err = parse_buffer(buffer.get())) != srs_success) {
return srs_error_wrap(err, "parse buffer %s", filename);
}

Expand Down
14 changes: 6 additions & 8 deletions trunk/src/app/srs_app_dash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,9 @@ srs_error_t SrsMpdWriter::write(SrsFormat* format, SrsFragmentWindow* afragments
}
ss << " </Period>" << endl;
ss << "</MPD>" << endl;

SrsFileWriter* fw = new SrsFileWriter();
SrsAutoFree(SrsFileWriter, fw);


SrsUniquePtr<SrsFileWriter> fw(new SrsFileWriter());

string full_path_tmp = full_path + ".tmp";
if ((err = fw->open(full_path_tmp)) != srs_success) {
return srs_error_wrap(err, "Open MPD file=%s failed", full_path_tmp.c_str());
Expand Down Expand Up @@ -651,10 +650,9 @@ srs_error_t SrsDashController::refresh_init_mp4(SrsSharedPtrMessage* msg, SrsFor
} else {
path += "/audio-init.mp4";
}

SrsInitMp4* init_mp4 = new SrsInitMp4();
SrsAutoFree(SrsInitMp4, init_mp4);


SrsUniquePtr<SrsInitMp4> init_mp4(new SrsInitMp4());

init_mp4->set_path(path);

int tid = msg->is_video()? video_track_id : audio_track_id;
Expand Down
48 changes: 22 additions & 26 deletions trunk/src/app/srs_app_dvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@ srs_error_t SrsDvrSegmenter::write_metadata(SrsSharedPtrMessage* metadata)
srs_error_t SrsDvrSegmenter::write_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* audio = shared_audio->copy();
SrsAutoFree(SrsSharedPtrMessage, audio);
if ((err = jitter->correct(audio, jitter_algorithm)) != srs_success) {

// TODO: FIXME: Use SrsSharedPtr instead.
SrsUniquePtr<SrsSharedPtrMessage> audio(shared_audio->copy());

if ((err = jitter->correct(audio.get(), jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "jitter");
}

if ((err = on_update_duration(audio)) != srs_success) {
if ((err = on_update_duration(audio.get())) != srs_success) {
return srs_error_wrap(err, "update duration");
}

if ((err = encode_audio(audio, format)) != srs_success) {
if ((err = encode_audio(audio.get(), format)) != srs_success) {
return srs_error_wrap(err, "encode audio");
}

Expand All @@ -141,19 +141,19 @@ srs_error_t SrsDvrSegmenter::write_audio(SrsSharedPtrMessage* shared_audio, SrsF
srs_error_t SrsDvrSegmenter::write_video(SrsSharedPtrMessage* shared_video, SrsFormat* format)
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* video = shared_video->copy();
SrsAutoFree(SrsSharedPtrMessage, video);
if ((err = jitter->correct(video, jitter_algorithm)) != srs_success) {

// TODO: FIXME: Use SrsSharedPtr instead.
SrsUniquePtr<SrsSharedPtrMessage> video(shared_video->copy());

if ((err = jitter->correct(video.get(), jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "jitter");
}

if ((err = encode_video(video, format)) != srs_success) {
if ((err = encode_video(video.get(), format)) != srs_success) {
return srs_error_wrap(err, "encode video");
}

if ((err = on_update_duration(video)) != srs_success) {
if ((err = on_update_duration(video.get())) != srs_success) {
return srs_error_wrap(err, "update duration");
}

Expand Down Expand Up @@ -262,9 +262,8 @@ srs_error_t SrsDvrFlvSegmenter::refresh_metadata()
SrsBuffer stream(buf, SrsAmf0Size::number());

// filesize to buf.
SrsAmf0Any* size = SrsAmf0Any::number((double)cur);
SrsAutoFree(SrsAmf0Any, size);

SrsUniquePtr<SrsAmf0Any> size(SrsAmf0Any::number((double)cur));

stream.skip(-1 * stream.pos());
if ((err = size->write(&stream)) != srs_success) {
return srs_error_wrap(err, "write filesize");
Expand All @@ -277,9 +276,8 @@ srs_error_t SrsDvrFlvSegmenter::refresh_metadata()
}

// duration to buf
SrsAmf0Any* dur = SrsAmf0Any::number((double)srsu2ms(fragment->duration()) / 1000.0);
SrsAutoFree(SrsAmf0Any, dur);

SrsUniquePtr<SrsAmf0Any> dur(SrsAmf0Any::number((double)srsu2ms(fragment->duration()) / 1000.0));

stream.skip(-1 * stream.pos());
if ((err = dur->write(&stream)) != srs_success) {
return srs_error_wrap(err, "write duration");
Expand Down Expand Up @@ -332,15 +330,13 @@ srs_error_t SrsDvrFlvSegmenter::encode_metadata(SrsSharedPtrMessage* metadata)
}

SrsBuffer stream(metadata->payload, metadata->size);

SrsAmf0Any* name = SrsAmf0Any::str();
SrsAutoFree(SrsAmf0Any, name);

SrsUniquePtr<SrsAmf0Any> name(SrsAmf0Any::str());
if ((err = name->read(&stream)) != srs_success) {
return srs_error_wrap(err, "read name");
}

SrsAmf0Object* obj = SrsAmf0Any::object();
SrsAutoFree(SrsAmf0Object, obj);

SrsUniquePtr<SrsAmf0Object> obj(SrsAmf0Any::object());
if ((err = obj->read(&stream)) != srs_success) {
return srs_error_wrap(err, "read object");
}
Expand Down
51 changes: 24 additions & 27 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)

#ifdef SRS_APM
// Create a client span and store it to an AMF0 propagator.
ISrsApmSpan* span_client = _srs_apm->inject(_srs_apm->span("edge-pull")->set_kind(SrsApmKindClient)->as_child(_srs_apm->load()), sdk->extra_args());
SrsAutoFree(ISrsApmSpan, span_client);
SrsUniquePtr<ISrsApmSpan> span_client(_srs_apm->inject(_srs_apm->span("edge-pull")->set_kind(SrsApmKindClient)
->as_child(_srs_apm->load()), sdk->extra_args()));
#endif

if ((err = sdk->connect()) != srs_success) {
Expand Down Expand Up @@ -594,9 +594,8 @@ srs_error_t SrsEdgeIngester::do_cycle()
srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
srs_error_t err = srs_success;

SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);

SrsUniquePtr<SrsPithyPrint> pprint(SrsPithyPrint::create_edge());

// we only use the redict once.
// reset the redirect to empty, for maybe the origin changed.
Expand All @@ -615,15 +614,15 @@ srs_error_t SrsEdgeIngester::ingest(string& redirect)
}

// read from client.
SrsCommonMessage* msg = NULL;
if ((err = upstream->recv_message(&msg)) != srs_success) {
SrsCommonMessage* msg_raw = NULL;
if ((err = upstream->recv_message(&msg_raw)) != srs_success) {
return srs_error_wrap(err, "recv message");
}

srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg);
if ((err = process_publish_message(msg, redirect)) != srs_success) {
srs_assert(msg_raw);
SrsUniquePtr<SrsCommonMessage> msg(msg_raw);

if ((err = process_publish_message(msg.get(), redirect)) != srs_success) {
return srs_error_wrap(err, "process message");
}
}
Expand Down Expand Up @@ -659,14 +658,14 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri

// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {
SrsPacket* pkt_raw = NULL;
if ((err = upstream->decode_message(msg, &pkt_raw)) != srs_success) {
return srs_error_wrap(err, "decode message");
}
SrsAutoFree(SrsPacket, pkt);
SrsUniquePtr<SrsPacket> pkt(pkt_raw);

if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt.get())) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt.get());
if ((err = source_->on_meta_data(msg, metadata)) != srs_success) {
return srs_error_wrap(err, "source consume metadata");
}
Expand All @@ -678,15 +677,15 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri

// call messages, for example, reject, redirect.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {
SrsPacket* pkt_raw = NULL;
if ((err = upstream->decode_message(msg, &pkt_raw)) != srs_success) {
return srs_error_wrap(err, "decode message");
}
SrsAutoFree(SrsPacket, pkt);
SrsUniquePtr<SrsPacket> pkt(pkt_raw);

// RTMP 302 redirect
if (dynamic_cast<SrsCallPacket*>(pkt)) {
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
if (dynamic_cast<SrsCallPacket*>(pkt.get())) {
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt.get());
if (!call->arguments->is_object()) {
return err;
}
Expand Down Expand Up @@ -793,8 +792,8 @@ srs_error_t SrsEdgeForwarder::start()
#ifdef SRS_APM
// Create a client span and store it to an AMF0 propagator.
// Note that we are able to load the span from coroutine context because in the same coroutine.
ISrsApmSpan* span_client = _srs_apm->inject(_srs_apm->span("edge-push")->set_kind(SrsApmKindClient)->as_child(_srs_apm->load()), sdk->extra_args());
SrsAutoFree(ISrsApmSpan, span_client);
SrsUniquePtr<ISrsApmSpan> span_client(_srs_apm->inject(_srs_apm->span("edge-push")->set_kind(SrsApmKindClient)
->as_child(_srs_apm->load()), sdk->extra_args()));
#endif

if ((err = sdk->connect()) != srs_success) {
Expand Down Expand Up @@ -858,10 +857,8 @@ srs_error_t SrsEdgeForwarder::do_cycle()
srs_error_t err = srs_success;

sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);

SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);


SrsUniquePtr<SrsPithyPrint> pprint(SrsPithyPrint::create_edge());
SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);

while (true) {
Expand Down
7 changes: 3 additions & 4 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,9 @@ srs_error_t SrsForwarder::forward()
srs_error_t err = srs_success;

sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);

SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
SrsAutoFree(SrsPithyPrint, pprint);


SrsUniquePtr<SrsPithyPrint> pprint(SrsPithyPrint::create_forwarder());

SrsMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);

// update sequence header
Expand Down
Loading

0 comments on commit 0c84717

Please sign in to comment.