Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge: Fix flv edge crash when http unmount. v6.0.154 v7.0.13 #4166

Merged
merged 4 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,17 +636,32 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return srs_error_wrap(err, "http hook");
}

SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

// Create consumer of source, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
if ((err = live_source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
// When freeing the consumer, it may trigger the source unpublishing for edge. This will trigger the http
// unmount, which waiting for all http live stream to dispose, so we should free the consumer when this
// object is not alive.
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);

// Add the viewer to the viewers list.
viewers_.push_back(hc);

// Serve the viewer connection.
err = do_serve_http(w, r);
err = do_serve_http(live_source.get(), consumer.get(), w, r);

// Remove viewer from the viewers list.
vector<ISrsExpire*>::iterator it = std::find(viewers_.begin(), viewers_.end(), hc);
srs_assert (it != viewers_.end());
viewers_.erase(it);

// Do hook after serving.
http_hooks_on_stop(r);

Expand All @@ -667,7 +682,7 @@ void SrsLiveStream::expire()
}
}

srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
srs_error_t SrsLiveStream::do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -711,19 +726,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK);

SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
if ((err = live_source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);

if ((err = live_source->consumer_dumps(consumer.get(), true, true, !enc->has_cache())) != srs_success) {
if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
return srs_error_wrap(err, "dumps consumer");
}

Expand All @@ -744,7 +747,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess

// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((err = enc->dump_cache(consumer.get(), live_source->jitter())) != srs_success) {
if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) {
return srs_error_wrap(err, "encoder dump cache");
}
}
Expand Down Expand Up @@ -1106,6 +1109,10 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
}

if (cache->alive() || stream->alive()) {
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
}

// Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
// stream stopped for it uses it.
mux.unhandle(entry->mount, stream.get());
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class SrsLiveStream : public ISrsHttpHandler, public ISrsExpire
public:
virtual void expire();
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r);
virtual void http_hooks_on_stop(ISrsHttpMessage* r);
virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class SrsRtcSource : public ISrsFastTimer
void set_publish_stream(ISrsRtcPublishStream* v);
// Consume the shared RTP packet, user must free it.
srs_error_t on_rtp(SrsRtpPacket* pkt);
// Set and get stream description for souce
// Set and get stream description for source
bool has_stream_desc();
void set_stream_desc(SrsRtcSourceDescription* stream_desc);
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ extern std::string srs_path_build_timestamp(std::string template_path);
// @return an int error code.
extern srs_error_t srs_kill_forced(int& pid);

// Current process resouce usage.
// Current process resource usage.
// @see: man getrusage
class SrsRusage
{
Expand Down
Loading