From f1a812ba5d4f9b4f00c4c1ea452c6caf41e49c5e Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 12 Aug 2021 11:48:47 +0200 Subject: [PATCH] rewrite metrics system to provide more data (#492) --- README.md | 87 ++++++++++-------- internal/core/api.go | 70 +++++--------- internal/core/core.go | 25 +++-- internal/core/hls_server.go | 9 +- internal/core/metrics.go | 168 +++++++++++++++++++++++++++++----- internal/core/metrics_test.go | 80 ++++++++++++++++ internal/core/path.go | 10 +- internal/core/path_manager.go | 28 +++++- internal/core/rtmp_conn.go | 3 - internal/core/rtmp_server.go | 24 +++-- internal/core/rtmp_source.go | 3 - internal/core/rtsp_conn.go | 3 - internal/core/rtsp_server.go | 36 ++++++-- internal/core/rtsp_source.go | 3 - internal/core/stats.go | 16 +--- 15 files changed, 391 insertions(+), 174 deletions(-) create mode 100644 internal/core/metrics_test.go diff --git a/README.md b/README.md index e7c7d30b396..754313d5ba0 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,9 @@ Plus: * [Fallback stream](#fallback-stream) * [Start on boot with systemd](#start-on-boot-with-systemd) * [Corrupted frames](#corrupted-frames) - * [Monitoring](#monitoring) * [HTTP API](#http-api) + * [Metrics](#metrics) + * [Pprof](#pprof) * [Command-line usage](#command-line-usage) * [Compile and run from source](#compile-and-run-from-source) * [Links](#links) @@ -497,62 +498,70 @@ In some scenarios, the server can send incomplete or corrupted frames. This can readBufferSize: 8192 ``` -### Monitoring +### HTTP API -There are multiple ways to monitor the server usage over time: +The server can be queried and controlled with an HTTP API, that must be enabled by setting the `api` parameter in the configuration: -* Use the [HTTP API](#http-api), described below. +```yml +api: yes +``` -* A metrics exporter, compatible with Prometheus, can be enabled with the parameter `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request: +The API listens on `apiAddress`, that by default is `127.0.0.1:9997`; for instance, to obtain a list of active paths, run: - ``` - wget -qO- localhost:9998/metrics - ``` +``` +curl http//127.0.0.1:9997/list +``` - Obtaining: +Full documentation of the API is available on the [dedicated site](https://aler9.github.io/rtsp-simple-server/). - ``` - rtsp_clients{state="publishing"} 15 1596122687740 - rtsp_clients{state="reading"} 8 1596122687740 - rtsp_sources{type="rtsp",state="idle"} 3 1596122687740 - rtsp_sources{type="rtsp",state="running"} 2 1596122687740 - rtsp_sources{type="rtmp",state="idle"} 1 1596122687740 - rtsp_sources{type="rtmp",state="running"} 0 1596122687740 - ``` +### Metrics - where: +A metrics exporter, compatible with Prometheus, can be enabled with the parameter `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request: - * `rtsp_clients{state="publishing"}` is the count of clients that are publishing - * `rtsp_clients{state="reading"}` is the count of clients that are reading - * `rtsp_sources{type="rtsp",state="idle"}` is the count of rtsp sources that are not running - * `rtsp_sources{type="rtsp",state="running"}` is the count of rtsp sources that are running - * `rtsp_sources{type="rtmp",state="idle"}` is the count of rtmp sources that are not running - * `rtsp_sources{type="rtmp",state="running"}` is the count of rtmp sources that are running +``` +wget -qO- localhost:9998/metrics +``` -* A performance monitor, compatible with pprof, can be enabled with the parameter `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: +Obtaining: - ``` - go tool pprof -text http://localhost:9999/debug/pprof/goroutine - go tool pprof -text http://localhost:9999/debug/pprof/heap - go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30 - ``` +``` +paths{state="ready"} 2 1628760831152 +paths{state="notReady"} 0 1628760831152 +rtsp_sessions{state="idle"} 0 1628760831152 +rtsp_sessions{state="read"} 0 1628760831152 +rtsp_sessions{state="publish"} 1 1628760831152 +rtsps_sessions{state="idle"} 0 1628760831152 +rtsps_sessions{state="read"} 0 1628760831152 +rtsps_sessions{state="publish"} 0 1628760831152 +rtmp_conns{state="idle"} 0 1628760831152 +rtmp_conns{state="read"} 0 1628760831152 +rtmp_conns{state="publish"} 1 1628760831152 +``` -### HTTP API +where: -The server can be queried and controlled with an HTTP API, that must be enabled by setting the `api` parameter in the configuration: +* `paths{state="ready"}` is the count of paths that are ready +* `paths{state="notReady"}` is the count of paths that are not ready +* `rtsp_sessions{state="idle"}` is the count of RTSP sessions that are idle +* `rtsp_sessions{state="read"}` is the count of RTSP sessions that are reading +* `rtsp_sessions{state="publish"}` is the counf ot RTSP sessions that are publishing +* `rtsps_sessions{state="idle"}` is the count of RTSPS sessions that are idle +* `rtsps_sessions{state="read"}` is the count of RTSPS sessions that are reading +* `rtsps_sessions{state="publish"}` is the counf ot RTSPS sessions that are publishing +* `rtmp_conns{state="idle"}` is the count of RTMP connections that are idle +* `rtmp_conns{state="read"}` is the count of RTMP connections that are reading +* `rtmp_conns{state="publish"}` is the count of RTMP connections that are publishing -```yml -api: yes -``` +### PProf -The API listens on `apiAddress`, that by default is `127.0.0.1:9997`; for instance, to obtain a list of active paths, run: +A performance monitor, compatible with pprof, can be enabled with the parameter `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: ``` -curl http//127.0.0.1:9997/list +go tool pprof -text http://localhost:9999/debug/pprof/goroutine +go tool pprof -text http://localhost:9999/debug/pprof/heap +go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30 ``` -Full documentation of the API is available on the [dedicated site](https://aler9.github.io/rtsp-simple-server/). - ### Command-line usage ``` diff --git a/internal/core/api.go b/internal/core/api.go index 27be605dc96..8e73b46d576 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -18,6 +18,10 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + func fillStruct(dest interface{}, source interface{}) { rvsource := reflect.ValueOf(source) rvdest := reflect.ValueOf(dest) @@ -149,6 +153,7 @@ type apiPathsListData struct { } type apiPathsListRes1 struct { + Data *apiPathsListData Paths map[string]*path Err error } @@ -157,13 +162,9 @@ type apiPathsListReq1 struct { Res chan apiPathsListRes1 } -type apiPathsListRes2 struct { - Err error -} - type apiPathsListReq2 struct { Data *apiPathsListData - Res chan apiPathsListRes2 + Res chan struct{} } type apiRTSPSessionsListItem struct { @@ -176,13 +177,12 @@ type apiRTSPSessionsListData struct { } type apiRTSPSessionsListRes struct { - Err error -} - -type apiRTSPSessionsListReq struct { Data *apiRTSPSessionsListData + Err error } +type apiRTSPSessionsListReq struct{} + type apiRTSPSessionsKickRes struct { Err error } @@ -201,12 +201,12 @@ type apiRTMPConnsListData struct { } type apiRTMPConnsListRes struct { - Err error + Data *apiRTMPConnsListData + Err error } type apiRTMPConnsListReq struct { - Data *apiRTMPConnsListData - Res chan apiRTMPConnsListRes + Res chan apiRTMPConnsListRes } type apiRTMPConnsKickRes struct { @@ -495,44 +495,32 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { } func (a *api) onPathsList(ctx *gin.Context) { - data := apiPathsListData{ - Items: make(map[string]apiPathsItem), - } - res := a.pathManager.OnAPIPathsList(apiPathsListReq1{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - for _, pa := range res.Paths { - pa.OnAPIPathsList(apiPathsListReq2{Data: &data}) - } - - ctx.JSON(http.StatusOK, data) + ctx.JSON(http.StatusOK, res.Data) } func (a *api) onRTSPSessionsList(ctx *gin.Context) { - if reflect.ValueOf(a.rtspServer).IsNil() { + if interfaceIsEmpty(a.rtspServer) { ctx.AbortWithStatus(http.StatusNotFound) return } - data := apiRTSPSessionsListData{ - Items: make(map[string]apiRTSPSessionsListItem), - } - - res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) + res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - ctx.JSON(http.StatusOK, data) + ctx.JSON(http.StatusOK, res.Data) } func (a *api) onRTSPSessionsKick(ctx *gin.Context) { - if reflect.ValueOf(a.rtspServer).IsNil() { + if interfaceIsEmpty(a.rtspServer) { ctx.AbortWithStatus(http.StatusNotFound) return } @@ -549,26 +537,22 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { } func (a *api) onRTSPSSessionsList(ctx *gin.Context) { - if reflect.ValueOf(a.rtspsServer).IsNil() { + if interfaceIsEmpty(a.rtspsServer) { ctx.AbortWithStatus(http.StatusNotFound) return } - data := apiRTSPSessionsListData{ - Items: make(map[string]apiRTSPSessionsListItem), - } - - res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) + res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - ctx.JSON(http.StatusOK, data) + ctx.JSON(http.StatusOK, res.Data) } func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { - if reflect.ValueOf(a.rtspsServer).IsNil() { + if interfaceIsEmpty(a.rtspsServer) { ctx.AbortWithStatus(http.StatusNotFound) return } @@ -585,22 +569,18 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { } func (a *api) onRTMPConnsList(ctx *gin.Context) { - if reflect.ValueOf(a.rtmpServer).IsNil() { + if interfaceIsEmpty(a.rtmpServer) { ctx.AbortWithStatus(http.StatusNotFound) return } - data := apiRTMPConnsListData{ - Items: make(map[string]apiRTMPConnsListItem), - } - - res := a.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{Data: &data}) + res := a.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - ctx.JSON(http.StatusOK, data) + ctx.JSON(http.StatusOK, res.Data) } // OnConfReload is called by core. @@ -611,7 +591,7 @@ func (a *api) OnConfReload(conf *conf.Conf) { } func (a *api) onRTMPConnsKick(ctx *gin.Context) { - if reflect.ValueOf(a.rtmpServer).IsNil() { + if interfaceIsEmpty(a.rtmpServer) { ctx.AbortWithStatus(http.StatusNotFound) return } diff --git a/internal/core/core.go b/internal/core/core.go index 22c894ec5b0..a78c8e355bb 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -200,7 +200,6 @@ func (p *Core) createResources(initial bool) error { if p.metrics == nil { p.metrics, err = newMetrics( p.conf.MetricsAddress, - p.stats, p) if err != nil { return err @@ -229,6 +228,7 @@ func (p *Core) createResources(initial bool) error { p.conf.ReadBufferSize, p.conf.Paths, p.stats, + p.metrics, p) } @@ -260,7 +260,7 @@ func (p *Core) createResources(initial bool) error { p.conf.ProtocolsParsed, p.conf.RunOnConnect, p.conf.RunOnConnectRestart, - p.stats, + p.metrics, p.pathManager, p) if err != nil { @@ -295,7 +295,7 @@ func (p *Core) createResources(initial bool) error { p.conf.ProtocolsParsed, p.conf.RunOnConnect, p.conf.RunOnConnectRestart, - p.stats, + p.metrics, p.pathManager, p) if err != nil { @@ -315,7 +315,7 @@ func (p *Core) createResources(initial bool) error { p.conf.RTSPAddress, p.conf.RunOnConnect, p.conf.RunOnConnectRestart, - p.stats, + p.metrics, p.pathManager, p) if err != nil { @@ -334,7 +334,6 @@ func (p *Core) createResources(initial bool) error { p.conf.HLSSegmentDuration, p.conf.HLSAllowOrigin, p.conf.ReadBufferCount, - p.stats, p.pathManager, p) if err != nil { @@ -378,16 +377,14 @@ func (p *Core) closeResources(newConf *conf.Conf) { closeMetrics := false if newConf == nil || newConf.Metrics != p.conf.Metrics || - newConf.MetricsAddress != p.conf.MetricsAddress || - closeStats { + newConf.MetricsAddress != p.conf.MetricsAddress { closeMetrics = true } closePPROF := false if newConf == nil || newConf.PPROF != p.conf.PPROF || - newConf.PPROFAddress != p.conf.PPROFAddress || - closeStats { + newConf.PPROFAddress != p.conf.PPROFAddress { closePPROF = true } @@ -398,7 +395,8 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.WriteTimeout != p.conf.WriteTimeout || newConf.ReadBufferCount != p.conf.ReadBufferCount || newConf.ReadBufferSize != p.conf.ReadBufferSize || - closeStats { + closeStats || + closeMetrics { closePathManager = true } else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { p.pathManager.OnConfReload(newConf.Paths) @@ -423,7 +421,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { !reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) || newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || - closeStats || + closeMetrics || closePathManager { closeRTSPServer = true } @@ -443,7 +441,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { !reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) || newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || - closeStats || + closeMetrics || closePathManager { closeRTSPSServer = true } @@ -458,7 +456,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.RTSPAddress != p.conf.RTSPAddress || newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || - closeStats || + closeMetrics || closePathManager { closeRTMPServer = true } @@ -472,7 +470,6 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration || newConf.HLSAllowOrigin != p.conf.HLSAllowOrigin || newConf.ReadBufferCount != p.conf.ReadBufferCount || - closeStats || closePathManager { closeHLSServer = true } diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 448dd253b08..99bc7520390 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -23,7 +23,6 @@ type hlsServer struct { hlsSegmentDuration time.Duration hlsAllowOrigin string readBufferCount int - stats *stats pathManager *pathManager parent hlsServerParent @@ -47,7 +46,6 @@ func newHLSServer( hlsSegmentDuration time.Duration, hlsAllowOrigin string, readBufferCount int, - stats *stats, pathManager *pathManager, parent hlsServerParent, ) (*hlsServer, error) { @@ -64,7 +62,6 @@ func newHLSServer( hlsSegmentDuration: hlsSegmentDuration, hlsAllowOrigin: hlsAllowOrigin, readBufferCount: readBufferCount, - stats: stats, pathManager: pathManager, parent: parent, ctx: ctx, @@ -78,11 +75,11 @@ func newHLSServer( s.Log(logger.Info, "listener opened on "+address) + s.pathManager.OnHLSServerSet(s) + s.wg.Add(1) go s.run() - s.pathManager.OnHLSServer(s) - return s, nil } @@ -130,7 +127,7 @@ outer: hs.Shutdown(context.Background()) - s.pathManager.OnHLSServer(nil) + s.pathManager.OnHLSServerSet(nil) } // ServeHTTP implements http.Handler. diff --git a/internal/core/metrics.go b/internal/core/metrics.go index 5dbbc470a1e..e0e6cbcd476 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -6,7 +6,7 @@ import ( "net" "net/http" "strconv" - "sync/atomic" + "sync" "time" "github.com/aler9/rtsp-simple-server/internal/logger" @@ -17,21 +17,36 @@ func formatMetric(key string, value int64, nowUnix int64) string { strconv.FormatInt(nowUnix, 10) + "\n" } +type metricsPathManager interface { + OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 +} + +type metricsRTSPServer interface { + OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes +} + +type metricsRTMPServer interface { + OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes +} + type metricsParent interface { Log(logger.Level, string, ...interface{}) } type metrics struct { - stats *stats - listener net.Listener mux *http.ServeMux server *http.Server + + mutex sync.Mutex + pathManager metricsPathManager + rtspServer metricsRTSPServer + rtspsServer metricsRTSPServer + rtmpServer metricsRTMPServer } func newMetrics( address string, - stats *stats, parent metricsParent, ) (*metrics, error) { listener, err := net.Listen("tcp", address) @@ -40,7 +55,6 @@ func newMetrics( } m := &metrics{ - stats: stats, listener: listener, } @@ -72,30 +86,136 @@ func (m *metrics) run() { func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { nowUnix := time.Now().UnixNano() / 1000000 - countPublishers := atomic.LoadInt64(m.stats.CountPublishers) - countReaders := atomic.LoadInt64(m.stats.CountReaders) - countSourcesRTSP := atomic.LoadInt64(m.stats.CountSourcesRTSP) - countSourcesRTSPRunning := atomic.LoadInt64(m.stats.CountSourcesRTSPRunning) - countSourcesRTMP := atomic.LoadInt64(m.stats.CountSourcesRTMP) - countSourcesRTMPRunning := atomic.LoadInt64(m.stats.CountSourcesRTMPRunning) - out := "" - out += formatMetric("rtsp_clients{state=\"publishing\"}", - countPublishers, nowUnix) - out += formatMetric("rtsp_clients{state=\"reading\"}", - countReaders, nowUnix) + res := m.pathManager.OnAPIPathsList(apiPathsListReq1{}) + if res.Err == nil { + readyCount := int64(0) + notReadyCount := int64(0) + + for _, p := range res.Data.Items { + if p.SourceReady { + readyCount++ + } else { + notReadyCount++ + } + } + + out += formatMetric("paths{state=\"ready\"}", + readyCount, nowUnix) + out += formatMetric("paths{state=\"notReady\"}", + notReadyCount, nowUnix) + } + + if !interfaceIsEmpty(m.rtspServer) { + res := m.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) + if res.Err == nil { + idleCount := int64(0) + readCount := int64(0) + publishCount := int64(0) + + for _, i := range res.Data.Items { + switch i.State { + case "idle": + idleCount++ + case "read": + readCount++ + case "publish": + publishCount++ + } + } + + out += formatMetric("rtsp_sessions{state=\"idle\"}", + idleCount, nowUnix) + out += formatMetric("rtsp_sessions{state=\"read\"}", + readCount, nowUnix) + out += formatMetric("rtsp_sessions{state=\"publish\"}", + publishCount, nowUnix) + } + } - out += formatMetric("rtsp_sources{type=\"rtsp\",state=\"idle\"}", - countSourcesRTSP-countSourcesRTSPRunning, nowUnix) - out += formatMetric("rtsp_sources{type=\"rtsp\",state=\"running\"}", - countSourcesRTSPRunning, nowUnix) + if !interfaceIsEmpty(m.rtspsServer) { + res := m.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) + if res.Err == nil { + idleCount := int64(0) + readCount := int64(0) + publishCount := int64(0) + + for _, i := range res.Data.Items { + switch i.State { + case "idle": + idleCount++ + case "read": + readCount++ + case "publish": + publishCount++ + } + } + + out += formatMetric("rtsps_sessions{state=\"idle\"}", + idleCount, nowUnix) + out += formatMetric("rtsps_sessions{state=\"read\"}", + readCount, nowUnix) + out += formatMetric("rtsps_sessions{state=\"publish\"}", + publishCount, nowUnix) + } + } - out += formatMetric("rtsp_sources{type=\"rtmp\",state=\"idle\"}", - countSourcesRTMP-countSourcesRTMPRunning, nowUnix) - out += formatMetric("rtsp_sources{type=\"rtmp\",state=\"running\"}", - countSourcesRTMPRunning, nowUnix) + if !interfaceIsEmpty(m.rtmpServer) { + res := m.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{}) + if res.Err == nil { + idleCount := int64(0) + readCount := int64(0) + publishCount := int64(0) + + for _, i := range res.Data.Items { + switch i.State { + case "idle": + idleCount++ + case "read": + readCount++ + case "publish": + publishCount++ + } + } + + out += formatMetric("rtmp_conns{state=\"idle\"}", + idleCount, nowUnix) + out += formatMetric("rtmp_conns{state=\"read\"}", + readCount, nowUnix) + out += formatMetric("rtmp_conns{state=\"publish\"}", + publishCount, nowUnix) + } + } w.WriteHeader(http.StatusOK) io.WriteString(w, out) } + +// OnPathManagerSet is called by pathManager. +func (m *metrics) OnPathManagerSet(s metricsPathManager) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.pathManager = s +} + +// OnRTSPServer is called by rtspServer (plain). +func (m *metrics) OnRTSPServerSet(s metricsRTSPServer) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.rtspServer = s +} + +// OnRTSPServer is called by rtspServer (plain). +func (m *metrics) OnRTSPSServerSet(s metricsRTSPServer) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.rtspsServer = s +} + +// OnRTMPServerSet is called by rtmpServer. +func (m *metrics) OnRTMPServerSet(s metricsRTMPServer) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.rtmpServer = s +} diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go new file mode 100644 index 00000000000..abb84b36347 --- /dev/null +++ b/internal/core/metrics_test.go @@ -0,0 +1,80 @@ +package core + +import ( + "io/ioutil" + "net/http" + "os" + "strings" + "testing" + + "github.com/aler9/gortsplib" + "github.com/stretchr/testify/require" +) + +func TestMetrics(t *testing.T) { + serverCertFpath, err := writeTempFile(serverCert) + require.NoError(t, err) + defer os.Remove(serverCertFpath) + + serverKeyFpath, err := writeTempFile(serverKey) + require.NoError(t, err) + defer os.Remove(serverKeyFpath) + + p, ok := newInstance("metrics: yes\n" + + "encryption: optional\n" + + "serverCert: " + serverCertFpath + "\n" + + "serverKey: " + serverKeyFpath + "\n") + require.Equal(t, true, ok) + defer p.close() + + track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.mkv", + "-c", "copy", + "-f", "flv", + "rtmp://localhost:1935/test1/test2", + }) + require.NoError(t, err) + defer cnt1.close() + + req, err := http.NewRequest(http.MethodGet, "http://localhost:9998/metrics", nil) + require.NoError(t, err) + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer res.Body.Close() + require.Equal(t, http.StatusOK, res.StatusCode) + + bo, err := ioutil.ReadAll(res.Body) + require.NoError(t, err) + + vals := make(map[string]string) + lines := strings.Split(string(bo), "\n") + for _, l := range lines[:len(lines)-1] { + fields := strings.Split(l, " ") + vals[fields[0]] = fields[1] + } + + require.Equal(t, map[string]string{ + "paths{state=\"notReady\"}": "0", + "paths{state=\"ready\"}": "2", + "rtmp_conns{state=\"idle\"}": "0", + "rtmp_conns{state=\"publish\"}": "1", + "rtmp_conns{state=\"read\"}": "0", + "rtsp_sessions{state=\"idle\"}": "0", + "rtsp_sessions{state=\"publish\"}": "1", + "rtsp_sessions{state=\"read\"}": "0", + "rtsps_sessions{state=\"idle\"}": "0", + "rtsps_sessions{state=\"publish\"}": "0", + "rtsps_sessions{state=\"read\"}": "0", + }, vals) +} diff --git a/internal/core/path.go b/internal/core/path.go index b9db2ddf4d4..a4a62865b45 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -820,7 +820,7 @@ func (pa *path) handleAPIPathsList(req apiPathsListReq2) { return ret }(), } - req.Res <- apiPathsListRes2{} + close(req.Res) } // OnSourceStaticSetReady is called by a sourceStatic. @@ -936,12 +936,12 @@ func (pa *path) OnReaderPause(req pathReaderPauseReq) { } // OnAPIPathsList is called by api. -func (pa *path) OnAPIPathsList(req apiPathsListReq2) apiPathsListRes2 { - req.Res = make(chan apiPathsListRes2) +func (pa *path) OnAPIPathsList(req apiPathsListReq2) { + req.Res = make(chan struct{}) select { case pa.apiPathsList <- req: - return <-req.Res + <-req.Res + case <-pa.ctx.Done(): - return apiPathsListRes2{Err: fmt.Errorf("terminated")} } } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 9087ab418e3..0d36e2b79c9 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -29,6 +29,7 @@ type pathManager struct { readBufferSize int pathConfs map[string]*conf.PathConf stats *stats + metrics *metrics parent pathManagerParent ctx context.Context @@ -57,6 +58,7 @@ func newPathManager( readBufferSize int, pathConfs map[string]*conf.PathConf, stats *stats, + metrics *metrics, parent pathManagerParent) *pathManager { ctx, ctxCancel := context.WithCancel(parentCtx) @@ -68,6 +70,7 @@ func newPathManager( readBufferSize: readBufferSize, pathConfs: pathConfs, stats: stats, + metrics: metrics, parent: parent, ctx: ctx, ctxCancel: ctxCancel, @@ -88,6 +91,10 @@ func newPathManager( } } + if pm.metrics != nil { + pm.metrics.OnPathManagerSet(pm) + } + pm.wg.Add(1) go pm.run() @@ -261,6 +268,10 @@ outer: } pm.ctxCancel() + + if pm.metrics != nil { + pm.metrics.OnPathManagerSet(nil) + } } func (pm *pathManager) createPath(confName string, conf *conf.PathConf, name string) { @@ -406,8 +417,8 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS } } -// OnHLSServer is called by hlsServer. -func (pm *pathManager) OnHLSServer(s pathManagerHLSServer) { +// OnHLSServerSet is called by hlsServer. +func (pm *pathManager) OnHLSServerSet(s pathManagerHLSServer) { select { case pm.hlsServerSet <- s: case <-pm.ctx.Done(): @@ -419,7 +430,18 @@ func (pm *pathManager) OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 { req.Res = make(chan apiPathsListRes1) select { case pm.apiPathsList <- req: - return <-req.Res + res1 := <-req.Res + + res1.Data = &apiPathsListData{ + Items: make(map[string]apiPathsItem), + } + + for _, pa := range res1.Paths { + pa.OnAPIPathsList(apiPathsListReq2{Data: res1.Data}) + } + + return res1 + case <-pm.ctx.Done(): return apiPathsListRes1{Err: fmt.Errorf("terminated")} } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 2b2637f872c..d240ad352a0 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -66,7 +66,6 @@ type rtmpConn struct { runOnConnect string runOnConnectRestart bool wg *sync.WaitGroup - stats *stats conn *rtmp.Conn pathManager rtmpConnPathManager parent rtmpConnParent @@ -89,7 +88,6 @@ func newRTMPConn( runOnConnect string, runOnConnectRestart bool, wg *sync.WaitGroup, - stats *stats, nconn net.Conn, pathManager rtmpConnPathManager, parent rtmpConnParent) *rtmpConn { @@ -104,7 +102,6 @@ func newRTMPConn( runOnConnect: runOnConnect, runOnConnectRestart: runOnConnectRestart, wg: wg, - stats: stats, conn: rtmp.NewServerConn(nconn), pathManager: pathManager, parent: parent, diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index a8b069adf01..9f7a48b9f86 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -26,7 +26,7 @@ type rtmpServer struct { rtspAddress string runOnConnect string runOnConnectRestart bool - stats *stats + metrics *metrics pathManager *pathManager parent rtmpServerParent @@ -51,7 +51,7 @@ func newRTMPServer( rtspAddress string, runOnConnect string, runOnConnectRestart bool, - stats *stats, + metrics *metrics, pathManager *pathManager, parent rtmpServerParent) (*rtmpServer, error) { l, err := net.Listen("tcp", address) @@ -68,7 +68,7 @@ func newRTMPServer( rtspAddress: rtspAddress, runOnConnect: runOnConnect, runOnConnectRestart: runOnConnectRestart, - stats: stats, + metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -82,6 +82,10 @@ func newRTMPServer( s.Log(logger.Info, "listener opened on %s", address) + if s.metrics != nil { + s.metrics.OnRTMPServerSet(s) + } + s.wg.Add(1) go s.run() @@ -147,7 +151,6 @@ outer: s.runOnConnect, s.runOnConnectRestart, &s.wg, - s.stats, nconn, s.pathManager, s) @@ -160,8 +163,12 @@ outer: delete(s.conns, c) case req := <-s.apiRTMPConnsList: + data := &apiRTMPConnsListData{ + Items: make(map[string]apiRTMPConnsListItem), + } + for c := range s.conns { - req.Data.Items[c.ID()] = apiRTMPConnsListItem{ + data.Items[c.ID()] = apiRTMPConnsListItem{ RemoteAddr: c.RemoteAddr().String(), State: func() string { switch c.safeState() { @@ -175,7 +182,8 @@ outer: }(), } } - req.Res <- apiRTMPConnsListRes{} + + req.Res <- apiRTMPConnsListRes{Data: data} case req := <-s.apiRTMPConnsKick: res := func() bool { @@ -202,6 +210,10 @@ outer: s.ctxCancel() s.l.Close() + + if s.metrics != nil { + s.metrics.OnRTMPServerSet(s) + } } func (s *rtmpServer) newConnID() (string, error) { diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 6c6a921b7ab..5421216fa68 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "sync/atomic" "time" "github.com/aler9/gortsplib" @@ -61,7 +60,6 @@ func newRTMPSource( ctxCancel: ctxCancel, } - atomic.AddInt64(s.stats.CountSourcesRTMP, +1) s.log(logger.Info, "started") s.wg.Add(1) @@ -72,7 +70,6 @@ func newRTMPSource( // Close closes a Source. func (s *rtmpSource) Close() { - atomic.AddInt64(s.stats.CountSourcesRTMPRunning, -1) s.log(logger.Info, "stopped") s.ctxCancel() } diff --git a/internal/core/rtsp_conn.go b/internal/core/rtsp_conn.go index 803f43ca861..80acf74d664 100644 --- a/internal/core/rtsp_conn.go +++ b/internal/core/rtsp_conn.go @@ -41,7 +41,6 @@ type rtspConn struct { runOnConnect string runOnConnectRestart bool pathManager *pathManager - stats *stats conn *gortsplib.ServerConn parent rtspConnParent @@ -59,7 +58,6 @@ func newRTSPConn( runOnConnect string, runOnConnectRestart bool, pathManager *pathManager, - stats *stats, conn *gortsplib.ServerConn, parent rtspConnParent) *rtspConn { c := &rtspConn{ @@ -69,7 +67,6 @@ func newRTSPConn( runOnConnect: runOnConnect, runOnConnectRestart: runOnConnectRestart, pathManager: pathManager, - stats: stats, conn: conn, parent: parent, } diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index 0ce9c1bc683..6b137e6aa68 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -30,7 +30,7 @@ type rtspServer struct { protocols map[conf.Protocol]struct{} runOnConnect string runOnConnectRestart bool - stats *stats + metrics *metrics pathManager *pathManager parent rtspServerParent @@ -65,7 +65,7 @@ func newRTSPServer( protocols map[conf.Protocol]struct{}, runOnConnect string, runOnConnectRestart bool, - stats *stats, + metrics *metrics, pathManager *pathManager, parent rtspServerParent) (*rtspServer, error) { ctx, ctxCancel := context.WithCancel(parentCtx) @@ -76,7 +76,7 @@ func newRTSPServer( isTLS: isTLS, rtspAddress: rtspAddress, protocols: protocols, - stats: stats, + metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -128,6 +128,14 @@ func newRTSPServer( s.Log(logger.Info, "TCP listener opened on %s", address) + if s.metrics != nil { + if !isTLS { + s.metrics.OnRTSPServerSet(s) + } else { + s.metrics.OnRTSPSServerSet(s) + } + } + s.wg.Add(1) go s.run() @@ -179,6 +187,14 @@ outer: s.ctxCancel() s.srv.Close() + + if s.metrics != nil { + if !s.isTLS { + s.metrics.OnRTSPServerSet(nil) + } else { + s.metrics.OnRTSPSServerSet(nil) + } + } } func (s *rtspServer) newSessionID() (string, error) { @@ -218,7 +234,6 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { s.runOnConnect, s.runOnConnectRestart, s.pathManager, - s.stats, ctx.Conn, s) @@ -344,7 +359,7 @@ func (s *rtspServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { se.OnFrame(ctx) } -// OnAPIRTSPSessionsList is called by api. +// OnAPIRTSPSessionsList is called by api and metrics. func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes { select { case <-s.ctx.Done(): @@ -353,8 +368,14 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe } s.mutex.RLock() + defer s.mutex.RUnlock() + + data := &apiRTSPSessionsListData{ + Items: make(map[string]apiRTSPSessionsListItem), + } + for _, s := range s.sessions { - req.Data.Items[s.ID()] = apiRTSPSessionsListItem{ + data.Items[s.ID()] = apiRTSPSessionsListItem{ RemoteAddr: s.RemoteAddr().String(), State: func() string { switch s.safeState() { @@ -370,9 +391,8 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe }(), } } - s.mutex.RUnlock() - return apiRTSPSessionsListRes{} + return apiRTSPSessionsListRes{Data: data} } // OnAPIRTSPSessionsKick is called by api. diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 34dc398082d..55196e4112f 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -8,7 +8,6 @@ import ( "fmt" "strings" "sync" - "sync/atomic" "time" "github.com/aler9/gortsplib" @@ -75,7 +74,6 @@ func newRTSPSource( ctxCancel: ctxCancel, } - atomic.AddInt64(s.stats.CountSourcesRTSP, +1) s.log(logger.Info, "started") s.wg.Add(1) @@ -85,7 +83,6 @@ func newRTSPSource( } func (s *rtspSource) Close() { - atomic.AddInt64(s.stats.CountSourcesRTSP, -1) s.log(logger.Info, "stopped") s.ctxCancel() } diff --git a/internal/core/stats.go b/internal/core/stats.go index 2d9aaeaa0d6..dcef6dd15a5 100644 --- a/internal/core/stats.go +++ b/internal/core/stats.go @@ -8,22 +8,14 @@ func ptrInt64() *int64 { type stats struct { // use pointers to avoid a crash on 32bit platforms // https://github.com/golang/go/issues/9959 - CountPublishers *int64 - CountReaders *int64 - CountSourcesRTSP *int64 - CountSourcesRTSPRunning *int64 - CountSourcesRTMP *int64 - CountSourcesRTMPRunning *int64 + CountPublishers *int64 + CountReaders *int64 } func newStats() *stats { return &stats{ - CountPublishers: ptrInt64(), - CountReaders: ptrInt64(), - CountSourcesRTSP: ptrInt64(), - CountSourcesRTSPRunning: ptrInt64(), - CountSourcesRTMP: ptrInt64(), - CountSourcesRTMPRunning: ptrInt64(), + CountPublishers: ptrInt64(), + CountReaders: ptrInt64(), } }