From 920f87cd38e99d9186dc1da3a1cfc3963bc721c5 Mon Sep 17 00:00:00 2001 From: Winlin Date: Sun, 1 Sep 2024 06:44:35 +0800 Subject: [PATCH] Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166) Edge FLV is not working because it is stuck in an infinite loop waiting. Previously, there was no need to wait for exit since resources were not being cleaned up. Now, since resources need to be cleaned up, it must wait for all active connections to exit, which causes this issue. To reproduce the issue, start SRS edge, run the bellow command and press `CTRL+C` to stop the request: ```bash curl http://localhost:8080/live/livestream.flv -v >/dev/null ``` It will cause edge to fetch stream from origin, and free the consumer when client quit. When `SrsLiveStream::do_serve_http` return, it will free the consumer: ```cpp srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { SrsUniquePtr consumer(consumer_raw); ``` Keep in mind that in this moment, the stream is alive, because only set to not alive after this function return: ```cpp alive_viewers_++; err = do_serve_http(w, r); // Free 'this' alive stream. alive_viewers_--; // Crash here, because 'this' is freed. ``` When freeing the consumer, it will cause the source to unpublish and attempt to free the HTTP handler, which ultimately waits for the stream not to be alive: ```cpp SrsLiveConsumer::~SrsLiveConsumer() { source_->on_consumer_destroy(this); void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer) { if (consumers.empty()) { play_edge->on_all_client_stop(); void SrsLiveSource::on_unpublish() { handler->on_unpublish(req); void SrsHttpStreamServer::http_unmount(SrsRequest* r) { if (stream->entry) stream->entry->enabled = false; for (; i < 1024; i++) { if (!cache->alive() && !stream->alive()) { break; } srs_usleep(100 * SRS_UTIME_MILLISECONDS); } ``` After 120 seconds, it will free the stream and cause SRS to crash because the stream is still active. In order to track this potential issue, also add an important warning log: ```cpp srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive()); ``` SRS may crash if got this log. --------- Co-authored-by: Jacob Su --- trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_http_stream.cpp | 41 ++++++++++++++++----------- trunk/src/app/srs_app_http_stream.hpp | 2 +- trunk/src/app/srs_app_rtc_source.hpp | 2 +- trunk/src/app/srs_app_utility.hpp | 2 +- trunk/src/core/srs_core_version6.hpp | 2 +- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 6bd9a4104f..d14b6eea2a 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166) * v6.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v6.0.153 (#4162) * v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164) * v6.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v6.0.151 (#4157) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 0a4a8e5b8b..df2c4a523e 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -636,17 +636,32 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage return srs_error_wrap(err, "http hook"); } + SrsSharedPtr live_source = _srs_sources->fetch(req); + if (!live_source.get()) { + return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + } + + // Create consumer of source, ignore gop cache, use the audio gop cache. + SrsLiveConsumer* consumer_raw = NULL; + if ((err = live_source->create_consumer(consumer_raw)) != srs_success) { + return srs_error_wrap(err, "create consumer"); + } + // When freeing the consumer, it may trigger the source unpublishing for edge. This will trigger the http + // unmount, which waiting for all http live stream to dispose, so we should free the consumer when this + // object is not alive. + SrsUniquePtr consumer(consumer_raw); + // Add the viewer to the viewers list. viewers_.push_back(hc); // Serve the viewer connection. - err = do_serve_http(w, r); + err = do_serve_http(live_source.get(), consumer.get(), w, r); // Remove viewer from the viewers list. vector::iterator it = std::find(viewers_.begin(), viewers_.end(), hc); srs_assert (it != viewers_.end()); viewers_.erase(it); - + // Do hook after serving. http_hooks_on_stop(r); @@ -667,7 +682,7 @@ void SrsLiveStream::expire() } } -srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +srs_error_t SrsLiveStream::do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; @@ -711,19 +726,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Enter chunked mode, because we didn't set the content-length. w->write_header(SRS_CONSTS_HTTP_OK); - SrsSharedPtr live_source = _srs_sources->fetch(req); - if (!live_source.get()) { - return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); - } - - // create consumer of souce, ignore gop cache, use the audio gop cache. - SrsLiveConsumer* consumer_raw = NULL; - if ((err = live_source->create_consumer(consumer_raw)) != srs_success) { - return srs_error_wrap(err, "create consumer"); - } - SrsUniquePtr consumer(consumer_raw); - - if ((err = live_source->consumer_dumps(consumer.get(), true, true, !enc->has_cache())) != srs_success) { + if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) { return srs_error_wrap(err, "dumps consumer"); } @@ -744,7 +747,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // if gop cache enabled for encoder, dump to consumer. if (enc->has_cache()) { - if ((err = enc->dump_cache(consumer.get(), live_source->jitter())) != srs_success) { + if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) { return srs_error_wrap(err, "encoder dump cache"); } } @@ -1106,6 +1109,10 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r) srs_usleep(100 * SRS_UTIME_MILLISECONDS); } + if (cache->alive() || stream->alive()) { + srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive()); + } + // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and // stream stopped for it uses it. mux.unhandle(entry->mount, stream.get()); diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 2e233ced6f..cf37379820 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -199,7 +199,7 @@ class SrsLiveStream : public ISrsHttpHandler, public ISrsExpire public: virtual void expire(); private: - virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); + virtual srs_error_t do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r); virtual void http_hooks_on_stop(ISrsHttpMessage* r); virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index e917316d59..b8450a51ba 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -253,7 +253,7 @@ class SrsRtcSource : public ISrsFastTimer void set_publish_stream(ISrsRtcPublishStream* v); // Consume the shared RTP packet, user must free it. srs_error_t on_rtp(SrsRtpPacket* pkt); - // Set and get stream description for souce + // Set and get stream description for source bool has_stream_desc(); void set_stream_desc(SrsRtcSourceDescription* stream_desc); std::vector get_track_desc(std::string type, std::string media_type); diff --git a/trunk/src/app/srs_app_utility.hpp b/trunk/src/app/srs_app_utility.hpp index 347cb54bfe..b7b877154c 100644 --- a/trunk/src/app/srs_app_utility.hpp +++ b/trunk/src/app/srs_app_utility.hpp @@ -55,7 +55,7 @@ extern std::string srs_path_build_timestamp(std::string template_path); // @return an int error code. extern srs_error_t srs_kill_forced(int& pid); -// Current process resouce usage. +// Current process resource usage. // @see: man getrusage class SrsRusage { diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index c5007030a3..ed3ffb50d3 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 153 +#define VERSION_REVISION 154 #endif