diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 276f278887..f1891c5236 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -39,6 +39,7 @@ using namespace std; #include #include #include +#include SrsBufferCache::SrsBufferCache(SrsServer* s, SrsRequest* r) { @@ -987,6 +988,7 @@ bool SrsLiveEntry::is_mp3() SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr) { server = svr; + async_ = new SrsAsyncCallWorker(); mux.hijack(this); _srs_config->subscribe(this); @@ -996,6 +998,9 @@ SrsHttpStreamServer::~SrsHttpStreamServer() { mux.unhijack(this); _srs_config->unsubscribe(this); + + async_->stop(); + srs_freep(async_); if (true) { std::map::iterator it; @@ -1023,6 +1028,10 @@ srs_error_t SrsHttpStreamServer::initialize() if ((err = initialize_flv_streaming()) != srs_success) { return srs_error_wrap(err, "http flv stream"); } + + if ((err = async_->start()) != srs_success) { + return srs_error_wrap(err, "async start"); + } return err; } @@ -1114,39 +1123,19 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r) return; } - // Free all HTTP resources. - SrsUniquePtr entry(it->second); - entry->disposing = true; - - SrsUniquePtr stream(entry->stream); - SrsUniquePtr cache(entry->cache); - - // Notify cache and stream to stop. - if (stream->entry) stream->entry->enabled = false; - stream->expire(); - cache->stop(); - - // Wait for cache and stream to stop. - int i = 0; - for (; i < 1024; i++) { - if (!cache->alive() && !stream->alive()) { - break; - } - srs_usleep(100 * SRS_UTIME_MILLISECONDS); + // Set the entry to disposing, which will prevent the stream to be reused. + SrsLiveEntry* entry = it->second; + if (entry->disposing) { + return; } + entry->disposing = true; - if (cache->alive() || stream->alive()) { - srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive()); + // Use async worker to execute the task, which will destroy the stream. + srs_error_t err = srs_success; + if ((err = async_->execute(new SrsHttpStreamDestroy(&mux, &streamHandlers, sid))) != srs_success) { + srs_warn("http: ignore unmount stream failed, sid=%s, err=%s", sid.c_str(), srs_error_desc(err).c_str()); + srs_freep(err); } - - // Remove the entry from handlers. - streamHandlers.erase(it); - - // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and - // stream stopped for it uses it. - mux.unhandle(entry->mount, stream.get()); - - srs_trace("http: unmount flv stream for sid=%s, i=%d", sid.c_str(), i); } srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1296,3 +1285,64 @@ srs_error_t SrsHttpStreamServer::initialize_flv_entry(std::string vhost) return err; } +SrsHttpStreamDestroy::SrsHttpStreamDestroy(SrsHttpServeMux* mux, map* handlers, string sid) +{ + mux_ = mux; + sid_ = sid; + streamHandlers_ = handlers; +} + +SrsHttpStreamDestroy::~SrsHttpStreamDestroy() +{ +} + +srs_error_t SrsHttpStreamDestroy::call() +{ + srs_error_t err = srs_success; + + std::map::iterator it = streamHandlers_->find(sid_); + if (it == streamHandlers_->end()) { + return err; + } + + // Free all HTTP resources. + SrsUniquePtr entry(it->second); + entry->disposing = true; + + SrsUniquePtr stream(entry->stream); + SrsUniquePtr cache(entry->cache); + + // Notify cache and stream to stop. + if (stream->entry) stream->entry->enabled = false; + stream->expire(); + cache->stop(); + + // Wait for cache and stream to stop. + int i = 0; + for (; i < 1024; i++) { + if (!cache->alive() && !stream->alive()) { + break; + } + srs_usleep(100 * SRS_UTIME_MILLISECONDS); + } + + if (cache->alive() || stream->alive()) { + srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive()); + } + + // Remove the entry from handlers. + streamHandlers_->erase(it); + + // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and + // stream stopped for it uses it. + mux_->unhandle(entry->mount, stream.get()); + + srs_trace("http: unmount flv stream for sid=%s, i=%d", sid_.c_str(), i); + return err; +} + +string SrsHttpStreamDestroy::to_string() +{ + return "destroy"; +} + diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 4e63b00cdd..b3520553b3 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -17,6 +18,7 @@ class SrsAacTransmuxer; class SrsMp3Transmuxer; class SrsFlvTransmuxer; class SrsTsTransmuxer; +class SrsAsyncCallWorker; // A cache for HTTP Live Streaming encoder, to make android(weixin) happy. class SrsBufferCache : public ISrsCoroutineHandler @@ -245,6 +247,7 @@ class SrsHttpStreamServer : public ISrsReloadHandler { private: SrsServer* server; + SrsAsyncCallWorker* async_; public: SrsHttpServeMux mux; // The http live streaming template, to create streams. @@ -268,5 +271,19 @@ class SrsHttpStreamServer : public ISrsReloadHandler virtual srs_error_t initialize_flv_entry(std::string vhost); }; +class SrsHttpStreamDestroy : public ISrsAsyncCallTask +{ +private: + std::string sid_; + std::map* streamHandlers_; + SrsHttpServeMux* mux_; +public: + SrsHttpStreamDestroy(SrsHttpServeMux* mux, map* handlers, string sid); + virtual ~SrsHttpStreamDestroy(); +public: + virtual srs_error_t call(); + virtual std::string to_string(); +}; + #endif