Skip to content

Commit

Permalink
Live: Fail when flv stream is disposing.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 31, 2024
1 parent 49b74f0 commit 8fcfdd4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 32 deletions.
59 changes: 32 additions & 27 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ using namespace std;
#include <srs_app_recv_thread.hpp>
#include <srs_app_http_hooks.hpp>

SrsBufferCache::SrsBufferCache(SrsRequest* r)
SrsBufferCache::SrsBufferCache(SrsServer* s, SrsRequest* r)
{
req = r->copy()->as_http();
queue = new SrsMessageQueue(true);
trd = new SrsSTCoroutine("http-stream", this);

// TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
server_ = s;
}

SrsBufferCache::~SrsBufferCache()
Expand Down Expand Up @@ -122,10 +123,11 @@ srs_error_t SrsBufferCache::cycle()
return err;
}

SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(live_source.get() != NULL);

// the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge.
Expand Down Expand Up @@ -578,12 +580,13 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
return writer->writev(iov, iovcnt, pnwrite);
}

SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
SrsLiveStream::SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c)
{
cache = c;
req = r->copy()->as_http();
security_ = new SrsSecurity();
alive_viewers_ = 0;
server_ = s;
}

SrsLiveStream::~SrsLiveStream()
Expand Down Expand Up @@ -692,10 +695,16 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK);

SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(live_source.get() != NULL);

bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost);
live_source->set_cache(enabled_cache);
live_source->set_gop_cache_max_frames(gcmf);

// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
Expand Down Expand Up @@ -904,6 +913,7 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder* enc, SrsSh
SrsLiveEntry::SrsLiveEntry(std::string m)
{
mount = m;
disposing = false;

stream = NULL;
cache = NULL;
Expand Down Expand Up @@ -1015,8 +1025,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
entry = new SrsLiveEntry(mount);

entry->req = r->copy()->as_http();
entry->cache = new SrsBufferCache(r);
entry->stream = new SrsLiveStream(r, entry->cache);
entry->cache = new SrsBufferCache(server, r);
entry->stream = new SrsLiveStream(server, r, entry->cache);

// TODO: FIXME: maybe refine the logic of http remux service.
// if user push streams followed:
Expand Down Expand Up @@ -1045,6 +1055,12 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = streamHandlers[sid];

// Fail if system is disposing the entry.
if (entry->disposing) {
return srs_error_new(ERROR_NO_SOURCE, "stream is disposing");
}

entry->stream->update_auth(r);
entry->cache->update_auth(r);
}
Expand All @@ -1068,7 +1084,7 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)

// Free all HTTP resources.
SrsUniquePtr<SrsLiveEntry> entry(it->second);
streamHandlers.erase(it);
entry->disposing = true;

SrsUniquePtr<SrsLiveStream> stream(entry->stream);
SrsUniquePtr<SrsBufferCache> cache(entry->cache);
Expand All @@ -1086,6 +1102,9 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
}

// Remove the entry from handlers.
streamHandlers.erase(it);

// Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
// stream stopped for it uses it.
mux.unhandle(entry->mount, stream.get());
Expand Down Expand Up @@ -1187,17 +1206,6 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}
}

SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(r.get(), server, live_source)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(live_source.get() != NULL);

bool enabled_cache = _srs_config->get_gop_cache(r->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost);
live_source->set_cache(enabled_cache);
live_source->set_gop_cache_max_frames(gcmf);

// create http streaming handler.
if ((err = http_mount(r.get())) != srs_success) {
return srs_error_wrap(err, "http mount");
Expand All @@ -1208,11 +1216,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
entry = streamHandlers[sid];
*ph = entry->stream;
}

// trigger edge to fetch from origin.
bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost);
srs_trace("flv: source url=%s, is_edge=%d, source_id=%s/%s",
r->get_stream_url().c_str(), vhost_is_edge, live_source->source_id().c_str(), live_source->pre_source_id().c_str());

srs_trace("flv: hijack %s ok", upath.c_str());

return err;
}
Expand Down
9 changes: 7 additions & 2 deletions trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class SrsBufferCache : public ISrsCoroutineHandler
{
private:
srs_utime_t fast_cache;
SrsServer* server_;
private:
SrsMessageQueue* queue;
SrsRequest* req;
SrsCoroutine* trd;
public:
SrsBufferCache(SrsRequest* r);
SrsBufferCache(SrsServer* s, SrsRequest* r);
virtual ~SrsBufferCache();
virtual srs_error_t update_auth(SrsRequest* r);
public:
Expand Down Expand Up @@ -182,12 +183,13 @@ class SrsLiveStream : public ISrsHttpHandler
SrsRequest* req;
SrsBufferCache* cache;
SrsSecurity* security_;
SrsServer* server_;
// For multiple viewers, which means there will more than one alive viewers for a live stream, so we must
// use an int value to represent if there is any viewer is alive. We should never do cleanup unless all
// viewers closed the connection.
int alive_viewers_;
public:
SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsRequest* r);
public:
Expand Down Expand Up @@ -218,6 +220,9 @@ struct SrsLiveEntry

SrsLiveStream* stream;
SrsBufferCache* cache;

// Whether is disposing the entry.
bool disposing;

SrsLiveEntry(std::string m);
virtual ~SrsLiveEntry();
Expand Down
3 changes: 0 additions & 3 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1909,9 +1909,6 @@ SrsLiveSource::SrsLiveSource()

SrsLiveSource::~SrsLiveSource()
{
// Because source maybe created by http stream, so we must notify the handler to cleanup.
handler->on_unpublish(req);

_srs_config->unsubscribe(this);

// never free the consumers,
Expand Down

0 comments on commit 8fcfdd4

Please sign in to comment.