From 2cab98aa6854d4f7caab2895e62124adcaf37d00 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 2 Jan 2023 22:47:51 +0800 Subject: [PATCH] SRS5: Test: Add blackbox for HLS. PICK 30779f3b5a6080a3e3ffb559fe9394404b739947 --- trunk/3rdparty/srs-bench/blackbox/hls_test.go | 116 +++++++++++++++ .../3rdparty/srs-bench/blackbox/rtmp_test.go | 34 +++-- trunk/3rdparty/srs-bench/blackbox/util.go | 135 +++++++++++++----- 3 files changed, 234 insertions(+), 51 deletions(-) create mode 100644 trunk/3rdparty/srs-bench/blackbox/hls_test.go diff --git a/trunk/3rdparty/srs-bench/blackbox/hls_test.go b/trunk/3rdparty/srs-bench/blackbox/hls_test.go new file mode 100644 index 0000000000..6117f1f6ff --- /dev/null +++ b/trunk/3rdparty/srs-bench/blackbox/hls_test.go @@ -0,0 +1,116 @@ +// The MIT License (MIT) +// +// # Copyright (c) 2023 Winlin +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package blackbox + +import ( + "context" + "fmt" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "math/rand" + "os" + "path" + "sync" + "testing" + "time" +) + +func TestRtmpPublish_HlsPlay_Basic(t *testing.T) { + // This case is run in parallel. + t.Parallel() + + // Setup the max timeout for this case. + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + defer cancel() + + // Check a set of errors. + var r0, r1, r2, r3, r4 error + defer func(ctx context.Context) { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4); err != nil { + t.Errorf("Fail for err %+v", err) + } else { + logger.Tf(ctx, "test done with err %+v", err) + } + }(ctx) + + var wg sync.WaitGroup + defer wg.Wait() + + // Start SRS server and wait for it to be ready. + svr := NewSRSServer(func(v *srsServer) { + v.envs = []string{ + "SRS_HTTP_SERVER_ENABLED=on", + "SRS_VHOST_HLS_ENABLED=on", + } + }) + wg.Add(1) + go func() { + defer wg.Done() + r0 = svr.Run(ctx, cancel) + }() + + // Start FFmpeg to publish stream. + streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int()) + streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID) + ffmpeg := NewFFmpeg(func(v *ffmpegClient) { + v.args = []string{ + "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL, + } + }) + wg.Add(1) + go func() { + defer wg.Done() + <-svr.ReadyCtx().Done() + r1 = ffmpeg.Run(ctx, cancel) + }() + + // Start FFprobe to detect and verify stream. + duration := time.Duration(*srsFFprobeDuration) * time.Millisecond + ffprobe := NewFFprobe(func(v *ffprobeClient) { + v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.ts", streamID)) + v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.m3u8", svr.HTTPPort(), streamID) + v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond + }) + wg.Add(1) + go func() { + defer wg.Done() + <-svr.ReadyCtx().Done() + r2 = ffprobe.Run(ctx, cancel) + }() + + // Fast quit for probe done. + select { + case <-ctx.Done(): + case <-ffprobe.ProbeDoneCtx().Done(): + defer cancel() + + str, m := ffprobe.Result() + if len(m.Streams) != 2 { + r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str) + } + + // Note that HLS score is low, so we only check duration. Note that only check half of duration, because we + // might get only some pieces of segments. + if dv := m.Duration(); dv < duration/2 { + r4 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration/2, m.String(), str) + } + } +} diff --git a/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go b/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go index 12cbdc9e60..71fb3a8280 100644 --- a/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go +++ b/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go @@ -42,9 +42,9 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) { defer cancel() // Check a set of errors. - var r0, r1, r2, r3 error + var r0, r1, r2, r3, r4, r5 error defer func(ctx context.Context) { - if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil { t.Errorf("Fail for err %+v", err) } else { logger.Tf(ctx, "test done with err %+v", err) @@ -55,11 +55,7 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) { defer wg.Wait() // Start SRS server and wait for it to be ready. - svr := NewSRSServer(func(v *srsServer) { - v.envs = []string{ - fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen), - } - }) + svr := NewSRSServer() wg.Add(1) go func() { defer wg.Done() @@ -84,7 +80,7 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) { // Start FFprobe to detect and verify stream. duration := time.Duration(*srsFFprobeDuration) * time.Millisecond ffprobe := NewFFprobe(func(v *ffprobeClient) { - v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID)) + v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID)) v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond }) wg.Add(1) @@ -104,6 +100,13 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) { if len(m.Streams) != 2 { r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str) } + + if ts := 90; m.Format.ProbeScore < ts { + r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str) + } + if dv := m.Duration(); dv < duration { + r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str) + } } } @@ -116,9 +119,9 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) { defer cancel() // Check a set of errors. - var r0, r1, r2, r3 error + var r0, r1, r2, r3, r4, r5 error defer func(ctx context.Context) { - if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil { t.Errorf("Fail for err %+v", err) } else { logger.Tf(ctx, "test done with err %+v", err) @@ -131,9 +134,7 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) { // Start SRS server and wait for it to be ready. svr := NewSRSServer(func(v *srsServer) { v.envs = []string{ - fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen), "SRS_HTTP_SERVER_ENABLED=on", - fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen), "SRS_VHOST_HTTP_REMUX_ENABLED=on", } }) @@ -161,7 +162,7 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) { // Start FFprobe to detect and verify stream. duration := time.Duration(*srsFFprobeDuration) * time.Millisecond ffprobe := NewFFprobe(func(v *ffprobeClient) { - v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID)) + v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID)) v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.flv", svr.HTTPPort(), streamID) v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond }) @@ -182,5 +183,12 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) { if len(m.Streams) != 2 { r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str) } + + if ts := 90; m.Format.ProbeScore < ts { + r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str) + } + if dv := m.Duration(); dv < duration { + r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str) + } } } diff --git a/trunk/3rdparty/srs-bench/blackbox/util.go b/trunk/3rdparty/srs-bench/blackbox/util.go index b3c328ff3c..16afeabd81 100644 --- a/trunk/3rdparty/srs-bench/blackbox/util.go +++ b/trunk/3rdparty/srs-bench/blackbox/util.go @@ -206,12 +206,20 @@ type backendService struct { r0 error // The process pid. pid int + // Whether ignore process exit status error. + ignoreExitStatusError bool // Hooks for owner. + // Before start the process. onBeforeStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error - onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error - onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error - onDispose func(ctx context.Context, bs *backendService) error + // After started the process. + onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error + // Before kill the process, when case is done. + onBeforeKill func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error + // After stopped the process. Always callback when run is called. + onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error + // When dispose the process. Always callback when run is called. + onDispose func(ctx context.Context, bs *backendService) error } func newBackendService(opts ...func(v *backendService)) *backendService { @@ -246,7 +254,7 @@ func (v *backendService) Close() error { v.onDispose(v.caseCtx, v) } - logger.Tf(v.caseCtx, "Service is closed, pid=%v, r0=%v", v.pid, v.r0) + logger.Tf(v.caseCtx, "Process is closed, pid=%v, r0=%v", v.pid, v.r0) return nil } @@ -255,6 +263,9 @@ func (v *backendService) ReadyCtx() context.Context { } func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) error { + // Always dispose resource of process. + defer v.Close() + // Start SRS with -e, which only use environment variables. cmd := exec.Command(v.name, v.args...) @@ -288,7 +299,6 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err if err := cmd.Start(); err != nil { return err } - defer v.Close() // Now process started, query the pid. v.pid = cmd.Process.Pid @@ -314,12 +324,15 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err // Notify other goroutine, SRS already done. defer processDoneCancel() - v.r0 = cmd.Wait() + if err := cmd.Wait(); err != nil && !v.ignoreExitStatusError { + v.r0 = errors.Wrapf(err, "Process wait err, name=%v, args=%v", v.name, v.args) + } if v.onStop != nil { - if r1 := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); r1 != nil { - logger.Wf(ctx, "Process onStop err %v", r1) + if err := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); err != nil { if v.r0 == nil { - v.r0 = r1 + v.r0 = errors.Wrapf(err, "Process onStop err, name=%v, args=%v", v.name, v.args) + } else { + logger.Ef(ctx, "Process onStop err %v", err) } } } @@ -332,6 +345,11 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err select { case <-ctx.Done(): + // Notify owner that we're going to kill the process. + if v.onBeforeKill != nil { + v.onBeforeKill(ctx, v, cmd) + } + // When case terminated, also terminate the SRS process. cmd.Process.Signal(syscall.SIGINT) case <-processDone.Done(): @@ -339,7 +357,7 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err return } - // Start a goroutine to ensure SRS killed. + // Start a goroutine to ensure process killed. go func() { time.Sleep(3 * time.Second) if processDone.Err() == nil { // Ignore if already done. @@ -436,15 +454,20 @@ func NewSRSServer(opts ...func(v *srsServer)) SRSServer { allocator.Free(v.httpListen) pidFile := path.Join(v.workDir, v.srsRelativePidFile) - if _, err := os.Stat(pidFile); !os.IsNotExist(err) { + if _, err := os.Stat(pidFile); err == nil { os.Remove(pidFile) } idFile := path.Join(v.workDir, v.srsRelativeIDFile) - if _, err := os.Stat(idFile); !os.IsNotExist(err) { + if _, err := os.Stat(idFile); err == nil { os.Remove(idFile) } + hlsFiles := path.Join(v.workDir, "objs", "live") + if _, err := os.Stat(hlsFiles); err == nil { + os.RemoveAll(hlsFiles) + } + logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, r0=%v", v.srsID, bs.pid, bs.r0) return nil } @@ -500,6 +523,14 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error { fmt.Sprintf("SRS_HTTP_API_LISTEN=%v", v.apiListen), // Avoid error for macOS, which ulimit to 256. "SRS_MAX_CONNECTIONS=100", + // Setup the default directory for HTTP server. + "SRS_HTTP_SERVER_DIR=objs", + // Setup the default directory for HLS stream. + "SRS_VHOST_HLS_HLS_PATH=objs", + // Setup the RTMP listen port. + fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen), + // Setup the HTTP sever listen port. + fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen), } // Rewrite envs by case. if v.envs != nil { @@ -523,7 +554,8 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error { for ctx.Err() == nil { time.Sleep(100 * time.Millisecond) - res, err := http.Get(fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen)) + r := fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen) + res, err := http.Get(r) if err != nil { continue } @@ -534,7 +566,7 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error { continue } - logger.Tf(ctx, "SRS API is ready, %v", string(b)) + logger.Tf(ctx, "SRS API is ready, %v %v", r, string(b)) v.readyCtxCancel() return } @@ -591,6 +623,9 @@ func NewFFmpeg(opts ...func(v *ffmpegClient)) FFmpegClient { return nil } + // We ignore any exit error, because FFmpeg might exit with error even publish ok. + v.process.ignoreExitStatusError = true + for _, opt := range opts { opt(v) } @@ -670,22 +705,35 @@ func (v *ffprobeClient) Result() (string, *ffprobeObject) { } func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFunc) error { - ctx, cancel := context.WithTimeout(ctxCase, v.timeout) - defer cancel() - - logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v", - v.streamURL, v.dvrFile, v.duration, v.timeout) - - // Try to start a DVR process. - for ctx.Err() == nil { - // If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process - // might need to wait for a duration of segment, 10s as such. - _ = v.doDVR(ctx) - - // Check whether DVR file is ok. - if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 { - logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size()) - break + if true { + ctx, cancel := context.WithTimeout(ctxCase, v.timeout) + defer cancel() + + logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v", + v.streamURL, v.dvrFile, v.duration, v.timeout) + + // Try to start a DVR process. + for ctx.Err() == nil { + // If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process + // might need to wait for a duration of segment, 10s as such. + _ = v.doDVR(ctx) + + // Check whether DVR file is ok. + if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 { + logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size()) + break + } + + // Wait for a while and retry. Use larger timeout for HLS. + retryTimeout := 1 * time.Second + if strings.Contains(v.streamURL, ".m3u8") { + retryTimeout = 3 * time.Second + } + + select { + case <-ctx.Done(): + case <-time.After(retryTimeout): + } } } @@ -695,7 +743,7 @@ func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFu } // Start a probe process for the DVR file. - return v.doProbe(ctx, cancelCase) + return v.doProbe(ctxCase, cancelCase) } func (v *ffprobeClient) doDVR(ctx context.Context) error { @@ -763,13 +811,6 @@ func (v *ffprobeClient) doProbe(ctx context.Context, cancel context.CancelFunc) m := v.metadata logger.Tf(ctx, "FFprobe done pid=%v, %v", bs.pid, m.String()) - if ts := 90; m.Format.ProbeScore < ts { - return errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str) - } - if dv := m.Duration(); dv < v.duration { - return errors.Errorf("short duration=%v < %v, %v, %v", dv, v.duration, m.String(), str) - } - v.doneCancel() return nil } @@ -997,3 +1038,21 @@ func (v *ffprobeObject) Duration() time.Duration { return time.Duration(dv*1000) * time.Millisecond } + +func (v *ffprobeObject) Video() *ffprobeObjectMedia { + for _, media := range v.Streams { + if media.CodecType == "video" { + return &media + } + } + return nil +} + +func (v *ffprobeObject) Audio() *ffprobeObjectMedia { + for _, media := range v.Streams { + if media.CodecType == "audio" { + return &media + } + } + return nil +}