From 650708356b0cdee458774525eec2b511ba9b2991 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 8 Sep 2023 16:14:06 -0700 Subject: [PATCH] Participant Composite (#474) * send it * fixes * fix test svc reset * more fixes * state updates * ffprobe timeout * try without delay/unpublish * fixes * fix unencoded video * test delayed publish * test all audio * more updates * final? * segments with video delay broken * test with video delays * passing locally * working? * add config to audio bin * use idle probe * cleaning --- go.mod | 43 +++--- go.sum | 102 ++++++------- magefile.go | 2 + pkg/config/pipeline.go | 37 +++++ pkg/config/service.go | 5 + pkg/gstreamer/pipeline.go | 7 +- pkg/pipeline/builder/video.go | 40 +++++ pkg/pipeline/controller.go | 8 +- pkg/pipeline/source/sdk.go | 185 ++++++++++++++++------- pkg/pipeline/source/source.go | 3 +- pkg/pipeline/watch.go | 4 - pkg/service/handler.go | 2 +- pkg/service/service.go | 3 +- pkg/stats/monitor.go | 19 +++ pkg/types/types.go | 1 + test/ffprobe.go | 17 ++- test/integration.go | 45 +++--- test/participant.go | 268 ++++++++++++++++++++++++++++++++++ test/room_composite.go | 5 - test/runner.go | 1 + test/stream.go | 1 + test/track_composite.go | 5 - test/web.go | 5 - 23 files changed, 625 insertions(+), 183 deletions(-) create mode 100644 test/participant.go diff --git a/go.mod b/go.mod index b75f8642..1d079830 100644 --- a/go.mod +++ b/go.mod @@ -19,11 +19,11 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/livekit/livekit-server v1.4.5-0.20230814182001-77c8e824735b github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.6.0 - github.com/livekit/psrpc v0.3.2 - github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8 + github.com/livekit/protocol v1.6.2-0.20230825070127-9f0a8f87da8d + github.com/livekit/psrpc v0.3.3 + github.com/livekit/server-sdk-go v1.0.17-0.20230825204729-fcf5bdfadd2c github.com/pion/rtp v1.8.1 - github.com/pion/webrtc/v3 v3.2.14 + github.com/pion/webrtc/v3 v3.2.16 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/stretchr/testify v1.8.4 @@ -31,7 +31,7 @@ require ( github.com/tinyzimmer/go-gst v0.2.33 github.com/urfave/cli/v2 v2.25.7 go.uber.org/atomic v1.11.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.25.0 google.golang.org/api v0.130.0 google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 @@ -39,12 +39,11 @@ require ( ) require ( - cloud.google.com/go v0.110.2 // indirect - cloud.google.com/go/compute v1.19.3 // indirect + cloud.google.com/go v0.110.6 // indirect + cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect - cloud.google.com/go/iam v1.1.0 // indirect + cloud.google.com/go/iam v1.1.1 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bep/debounce v1.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -70,7 +69,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jxskiss/base62 v1.1.0 // indirect - github.com/klauspost/compress v1.16.5 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/livekit/mediatransportutil v0.0.0-20230814030822-8d5de0008b08 // indirect @@ -80,9 +79,10 @@ require ( github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-pointer v0.0.1 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/nats-io/nats.go v1.26.0 // indirect + github.com/nats-io/nats.go v1.28.0 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.7 // indirect github.com/pion/ice/v2 v2.3.10 // indirect @@ -101,26 +101,25 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect - github.com/redis/go-redis/v9 v9.0.5 // indirect + github.com/redis/go-redis/v9 v9.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/thoas/go-funk v0.9.3 // indirect github.com/twitchtv/twirp v8.1.3+incompatible // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - go.uber.org/goleak v1.1.12 // indirect - go.uber.org/multierr v1.6.0 // indirect - golang.org/x/crypto v0.11.0 // indirect - golang.org/x/exp v0.0.0-20230810033253-352e893a4cad // indirect - golang.org/x/net v0.13.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/crypto v0.12.0 // indirect + golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect + golang.org/x/net v0.14.0 // indirect golang.org/x/oauth2 v0.9.0 // indirect golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect + google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect ) diff --git a/go.sum b/go.sum index 4f156279..df99d7ac 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,13 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.110.2 h1:sdFPBr6xG9/wkBbfhmUz/JmZC7X6LavQgcrVINrKiVA= -cloud.google.com/go v0.110.2/go.mod h1:k04UEeEtb6ZBRTv3dZz4CeJC3jKGxyhl0sAiVVquxiw= -cloud.google.com/go/compute v1.19.3 h1:DcTwsFgGev/wV5+q8o2fzgcHOaac+DKGC91ZlvpsQds= -cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI= +cloud.google.com/go v0.110.6 h1:8uYAkj3YHTP/1iwReuHPxLSbdcyc+dSBbzFMrVwDR6Q= +cloud.google.com/go v0.110.6/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= +cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY= +cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= -cloud.google.com/go/iam v1.1.0 h1:67gSqaPukx7O8WLLHMa0PNs3EBGd2eE4d+psbO/CO94= -cloud.google.com/go/iam v1.1.0/go.mod h1:nxdHjaKfCr7fNYx/HJMM8LgiMugmveWlkatear5gVyk= +cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y= +cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= cloud.google.com/go/storage v1.31.0 h1:+S3LjjEN2zZ+L5hOwj4+1OkGCsLVe0NzpXKQ1pSdTCI= cloud.google.com/go/storage v1.31.0/go.mod h1:81ams1PrhW16L4kF7qg+4mTq7SRs5HsbDTM0bWvrwJ0= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= @@ -32,12 +32,11 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/aws/aws-sdk-go v1.44.296 h1:ALRZIIKI+6EBWDiWP4RHWmOtHZ7dywRzenL4NWgNI2A= github.com/aws/aws-sdk-go v1.44.296/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= -github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= +github.com/bsm/ginkgo/v2 v2.9.5 h1:rtVBYPs3+TC5iLUVOis1B9tjLTup7Cj5IfzosKtvTJ0= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -153,8 +152,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= -github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -177,12 +176,12 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230814030822-8d5de0008b08 h1:e0qwVjrtzmADgNZpdgSJgyhlF6BgrHkpdnkONL8pLrw= github.com/livekit/mediatransportutil v0.0.0-20230814030822-8d5de0008b08/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= -github.com/livekit/protocol v1.6.0 h1:19S+vFZqnivKIOpyR3DEK/mSaykQ3UEf7H2G/mBOE54= -github.com/livekit/protocol v1.6.0/go.mod h1:SUS9foM1xBzw/AFrgTJuFX/oSuwlnIbHmpdiPdCvwEM= -github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew= -github.com/livekit/psrpc v0.3.2/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= -github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8 h1:Vm64l3U2rWAB2IdG/0/CXIZTgVPmldsQrDAOcq+zQQs= -github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8/go.mod h1:4OIkcDpQJuOQJEThr+Z8+WQfkcs12BXC+9cuhigLnDM= +github.com/livekit/protocol v1.6.2-0.20230825070127-9f0a8f87da8d h1:PCFKd1f72grQ38Vgx5IFjUsVcy2SQ0bKxkbVvhkZPWs= +github.com/livekit/protocol v1.6.2-0.20230825070127-9f0a8f87da8d/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= +github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= +github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= +github.com/livekit/server-sdk-go v1.0.17-0.20230825204729-fcf5bdfadd2c h1:tFuVYn9UL6ln6Fwfu0SV3R19tB06OUYluNRxZLtYa+8= +github.com/livekit/server-sdk-go v1.0.17-0.20230825204729-fcf5bdfadd2c/go.mod h1:lLDZe/p7v4xaVTRMAXXDFWg3PQwGKZQ3hCcLYbVD27k= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -198,8 +197,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= -github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE= -github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -215,11 +214,12 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8= github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= -github.com/pion/ice/v2 v2.3.9/go.mod h1:lT3kv5uUIlHfXHU/ZRD7uKD/ufM202+eTa3C/umgGf4= github.com/pion/ice/v2 v2.3.10 h1:T3bUJKqh7pGEdMyTngUcTeQd6io9X8JjgsVWZDannnY= github.com/pion/ice/v2 v2.3.10/go.mod h1:hHGCibDfmXGqukayQw979xEctASp2Pe5Oe0iDU8pRus= github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w= @@ -252,11 +252,10 @@ github.com/pion/transport/v2 v2.1.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlD github.com/pion/transport/v2 v2.2.0/go.mod h1:AdSw4YBZVDkZm8fpoz+fclXyQwANWmZAlDuQdctTThQ= github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= -github.com/pion/turn/v2 v2.1.2/go.mod h1:1kjnPkBcex3dhCU2Am+AAmxDcGhLX3WnMfmkNpvSTQU= github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA= github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= -github.com/pion/webrtc/v3 v3.2.14 h1:GlqnBnnLlcYYA/LOwqLLU1plZYwx0Y/e/57bZ2tzQcU= -github.com/pion/webrtc/v3 v3.2.14/go.mod h1:r1mtixc2MH847mmQTPwlEvGge7D18C2T5qp8jI9Lm44= +github.com/pion/webrtc/v3 v3.2.16 h1:2tfQ8qdyUAjeG5Zn44yE98umMtdxuHembJ3WYhj4Zd4= +github.com/pion/webrtc/v3 v3.2.16/go.mod h1:vm5dipobPQGXn2hNyQ+hh2KbTTTaDxJiDcM+MyAyrsc= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -270,8 +269,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= -github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= -github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= +github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0NiuqvtfMY= +github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -280,7 +279,6 @@ github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -301,7 +299,6 @@ github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6S github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= @@ -309,15 +306,13 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= -go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -327,18 +322,16 @@ golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= -golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= -golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= -golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -356,7 +349,6 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -367,10 +359,9 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= -golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.9.0 h1:BPpt2kU7oMRq3kCHAA1tbSEshXRw1LpG2ztgDwrzuAs= @@ -380,7 +371,6 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= @@ -398,9 +388,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -412,8 +400,9 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -422,7 +411,6 @@ golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -435,9 +423,9 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -447,7 +435,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -466,12 +453,12 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao= -google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= -google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= -google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 h1:DEH99RbiLZhMxrpEJCZ0A+wdTe0EOgou/poSLx9vWf4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g= +google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02u4kWEp7oYBGYFFkCdKS/uYR9Z7+0/xuuFp8= +google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e h1:z3vDksarJxsAKM5dmEGv0GHwE2hKJ096wZra71Vs4sw= +google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 h1:lv6/DhyiFFGsmzxbsUUTOkN29II+zeWHxvT8Lpdxsv0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -496,7 +483,6 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/magefile.go b/magefile.go index 7037ce5b..aecdccc7 100644 --- a/magefile.go +++ b/magefile.go @@ -80,6 +80,8 @@ func Proto() error { } func Integration(configFile string) error { + defer Dotfiles() + dir, err := os.Getwd() if err != nil { return err diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 5dcdc82d..31bceb81 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -289,6 +289,43 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { return err } + case *rpc.StartEgressRequest_Participant: + p.RequestType = types.RequestTypeParticipant + clone := proto.Clone(req.Participant).(*livekit.ParticipantEgressRequest) + p.Info.Request = &livekit.EgressInfo_Participant{ + Participant: clone, + } + redactEncodedOutputs(clone) + + p.SourceType = types.SourceTypeSDK + p.Latency = sdkLatency + + p.Info.RoomName = req.Participant.RoomName + p.AudioEnabled = true + p.AudioTranscoding = true + p.VideoEnabled = true + p.VideoTranscoding = true + p.Identity = req.Participant.Identity + if p.Identity == "" { + return errors.ErrInvalidInput("identity") + } + + // encoding options + switch opts := req.Participant.Options.(type) { + case *livekit.ParticipantEgressRequest_Preset: + p.applyPreset(opts.Preset) + + case *livekit.ParticipantEgressRequest_Advanced: + if err := p.applyAdvanced(opts.Advanced); err != nil { + return err + } + } + + // output params + if err := p.updateEncodedOutputs(req.Participant); err != nil { + return err + } + case *rpc.StartEgressRequest_TrackComposite: p.RequestType = types.RequestTypeTrackComposite clone := proto.Clone(req.TrackComposite).(*livekit.TrackCompositeEgressRequest) diff --git a/pkg/config/service.go b/pkg/config/service.go index 604493c2..29b65795 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -30,6 +30,7 @@ const ( roomCompositeCpuCost = 4 webCpuCost = 4 + participantCpuCost = 1 trackCompositeCpuCost = 1 trackCpuCost = 0.5 @@ -51,6 +52,7 @@ type ServiceConfig struct { type CPUCostConfig struct { RoomCompositeCpuCost float64 `yaml:"room_composite_cpu_cost"` WebCpuCost float64 `yaml:"web_cpu_cost"` + ParticipantCpuCost float64 `yaml:"participant_cpu_cost"` TrackCompositeCpuCost float64 `yaml:"track_composite_cpu_cost"` TrackCpuCost float64 `yaml:"track_cpu_cost"` } @@ -83,6 +85,9 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) { if conf.WebCpuCost <= 0 { conf.WebCpuCost = webCpuCost } + if conf.ParticipantCpuCost <= 0 { + conf.ParticipantCpuCost = participantCpuCost + } if conf.TrackCompositeCpuCost <= 0 { conf.TrackCompositeCpuCost = trackCompositeCpuCost } diff --git a/pkg/gstreamer/pipeline.go b/pkg/gstreamer/pipeline.go index d6d44b30..2060a2ab 100644 --- a/pkg/gstreamer/pipeline.go +++ b/pkg/gstreamer/pipeline.go @@ -22,18 +22,17 @@ import ( "github.com/tinyzimmer/go-gst/gst" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/protocol/logger" ) const ( stateChangeTimeout = time.Second * 15 - stopTimeout = time.Second * 30 ) type Pipeline struct { *Bin - loop *glib.MainLoop - + loop *glib.MainLoop binsAdded bool elementsAdded bool running chan struct{} @@ -111,6 +110,7 @@ func (p *Pipeline) SetState(state gst.State) error { go func() { stateErr <- p.pipeline.SetState(state) }() + select { case <-time.After(stateChangeTimeout): return errors.ErrPipelineFrozen @@ -157,6 +157,7 @@ func (p *Pipeline) Stop() { } if err := p.OnStop(); err != nil { + logger.Errorw("onStop failure", err) p.OnError(err) } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 8a60eeac..dfd91f4d 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" + lksdk "github.com/livekit/server-sdk-go" ) const videoTestSrcName = "video_test_src" @@ -64,6 +65,8 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error return err } + pipeline.AddOnTrackAdded(b.onTrackAdded) + pipeline.AddOnTrackRemoved(b.onTrackRemoved) pipeline.AddOnTrackMuted(b.onTrackMuted) pipeline.AddOnTrackUnmuted(b.onTrackUnmuted) } @@ -90,6 +93,43 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error return pipeline.AddSourceBin(b.bin) } +func (b *VideoBin) onTrackAdded(ts *config.TrackSource) { + if b.bin.GetState() > gstreamer.StateRunning { + return + } + + if ts.Kind == lksdk.TrackKindVideo { + if err := b.addAppSrcBin(ts); err != nil { + b.bin.OnError(err) + } + } +} + +func (b *VideoBin) onTrackRemoved(trackID string) { + if b.bin.GetState() > gstreamer.StateRunning { + return + } + + b.mu.Lock() + pad := b.pads[trackID] + if pad == nil { + b.mu.Unlock() + return + } + delete(b.pads, trackID) + b.mu.Unlock() + + if b.selectedPad == trackID { + if err := b.setSelectorPad(videoTestSrcName); err != nil { + b.bin.OnError(err) + } + } + + if _, err := b.bin.RemoveSourceBin(trackID); err != nil { + b.bin.OnError(err) + } +} + func (b *VideoBin) onTrackMuted(trackID string) { if b.bin.GetState() > gstreamer.StateRunning { return diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 2304282c..a8cab099 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -38,7 +38,6 @@ import ( const ( pipelineName = "pipeline" - eosTimeout = time.Second * 30 ) type Controller struct { @@ -126,7 +125,7 @@ func (c *Controller) BuildPipeline() error { } if c.AudioEnabled { - if err := builder.BuildAudioBin(p, c.PipelineConfig); err != nil { + if err = builder.BuildAudioBin(p, c.PipelineConfig); err != nil { return err } } @@ -175,6 +174,11 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { c.Info.StartedAt = time.Now().UnixNano() defer func() { now := time.Now().UnixNano() + + if c.SourceType == types.SourceTypeSDK { + c.updateDuration(c.src.GetEndedAt()) + } + c.Info.UpdatedAt = now c.Info.EndedAt = now if c.SourceType == types.SourceTypeSDK { diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 983a6c78..0ad3a80a 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -33,6 +33,7 @@ import ( "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/pipeline/source/sdk" "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" lksdk "github.com/livekit/server-sdk-go" @@ -55,10 +56,9 @@ type SDKSource struct { filenameReplacements map[string]string errors chan error - active atomic.Int32 - audioWriter *sdk.AppWriter - videoWriter *sdk.AppWriter - closed core.Fuse + writers map[string]*sdk.AppWriter + active atomic.Int32 + closed core.Fuse startRecording chan struct{} endRecording chan struct{} @@ -77,6 +77,7 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstr }), initialized: core.NewFuse(), filenameReplacements: make(map[string]string), + writers: make(map[string]*sdk.AppWriter), closed: core.NewFuse(), startRecording: startRecording, endRecording: make(chan struct{}), @@ -93,16 +94,20 @@ func (s *SDKSource) StartRecording() chan struct{} { return s.startRecording } -func (s *SDKSource) Playing(trackID string) { - if w := s.getWriterForTrack(trackID); w != nil { - w.Play() - } -} - func (s *SDKSource) EndRecording() chan struct{} { return s.endRecording } +func (s *SDKSource) Playing(trackID string) { + s.mu.Lock() + writer := s.writers[trackID] + s.mu.Unlock() + + if writer != nil { + writer.Play() + } +} + func (s *SDKSource) GetStartedAt() int64 { return s.sync.GetStartedAt() } @@ -115,12 +120,17 @@ func (s *SDKSource) CloseWriters() { s.closed.Once(func() { s.sync.End() - if s.audioWriter != nil { - go s.audioWriter.Drain(false) - } - if s.videoWriter != nil { - go s.videoWriter.Drain(false) + var wg sync.WaitGroup + s.mu.Lock() + wg.Add(len(s.writers)) + for _, w := range s.writers { + go func(writer *sdk.AppWriter) { + defer wg.Done() + writer.Drain(false) + }(w) } + s.mu.Unlock() + wg.Wait() }) } @@ -144,6 +154,10 @@ func (s *SDKSource) joinRoom() error { }, OnDisconnected: s.onDisconnected, } + if s.RequestType == types.RequestTypeParticipant { + cb.ParticipantCallback.OnTrackPublished = s.onTrackPublished + cb.OnParticipantDisconnected = s.onParticipantDisconnected + } logger.Debugw("connecting to room") s.room = lksdk.CreateRoom(cb) @@ -154,9 +168,12 @@ func (s *SDKSource) joinRoom() error { var fileIdentifier string var err error switch s.RequestType { + case types.RequestTypeParticipant: + fileIdentifier = s.Identity + err = s.awaitParticipant(s.Identity) + case types.RequestTypeTrackComposite: fileIdentifier = s.Info.RoomName - tracks := make(map[string]struct{}) if s.AudioEnabled { tracks[s.AudioTrackID] = struct{}{} @@ -168,7 +185,6 @@ func (s *SDKSource) joinRoom() error { case types.RequestTypeTrack: fileIdentifier = s.TrackID - err = s.awaitTracks(map[string]struct{}{s.TrackID: {}}) } if err != nil { @@ -183,6 +199,37 @@ func (s *SDKSource) joinRoom() error { return nil } +func (s *SDKSource) awaitParticipant(identity string) error { + s.errors = make(chan error, 2) + + rp, err := s.getParticipant(identity) + if err != nil { + return err + } + + for trackCount := 0; trackCount == 0 || trackCount < len(rp.Tracks()); trackCount++ { + if err = <-s.errors; err != nil { + return err + } + } + + s.initialized.Break() + return nil +} + +func (s *SDKSource) getParticipant(identity string) (*lksdk.RemoteParticipant, error) { + deadline := time.Now().Add(subscriptionTimeout) + for time.Now().Before(deadline) { + for _, p := range s.room.GetParticipants() { + if p.Identity() == identity { + return p, nil + } + } + time.Sleep(100 * time.Millisecond) + } + return nil, errors.ErrParticipantNotFound(identity) +} + func (s *SDKSource) awaitTracks(expecting map[string]struct{}) error { trackCount := len(expecting) s.errors = make(chan error, trackCount) @@ -253,10 +300,6 @@ func (s *SDKSource) subscribe(track lksdk.TrackPublication) error { // ----- Callbacks ----- func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { - if s.initialized.IsBroken() { - return - } - var onSubscribeErr error defer func() { if s.initialized.IsBroken() { @@ -269,7 +312,6 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo }() s.active.Inc() - ts := &config.TrackSource{ TrackID: pub.SID(), Kind: pub.Kind(), @@ -294,8 +336,15 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo return } - s.audioWriter = writer - s.AudioTrack = ts + s.mu.Lock() + s.writers[ts.TrackID] = writer + s.mu.Unlock() + + if s.initialized.IsBroken() { + s.callbacks.OnTrackAdded(ts) + } else { + s.AudioTrack = ts + } case types.MimeTypeH264, types.MimeTypeVP8, types.MimeTypeVP9: s.VideoEnabled = true @@ -313,8 +362,15 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo return } - s.videoWriter = writer - s.VideoTrack = ts + s.mu.Lock() + s.writers[ts.TrackID] = writer + s.mu.Unlock() + + if s.initialized.IsBroken() { + s.callbacks.OnTrackAdded(ts) + } else { + s.VideoTrack = ts + } default: onSubscribeErr = errors.ErrNotSupported(string(ts.MimeType)) @@ -324,16 +380,21 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo if !s.initialized.IsBroken() { s.mu.Lock() switch s.RequestType { + case types.RequestTypeParticipant: + s.filenameReplacements["{publisher_identity}"] = s.Identity + case types.RequestTypeTrackComposite: if s.Identity == "" || track.Kind() == webrtc.RTPCodecTypeVideo { s.Identity = rp.Identity() s.filenameReplacements["{publisher_identity}"] = s.Identity } + case types.RequestTypeTrack: s.Identity = rp.Identity() s.TrackKind = pub.Kind().String() if pub.Kind() == lksdk.TrackKindVideo && s.Outputs[types.EgressTypeWebsocket] != nil { onSubscribeErr = errors.ErrIncompatible("websocket", ts.MimeType) + s.mu.Unlock() return } s.TrackSource = strings.ToLower(pub.Source().String()) @@ -378,48 +439,68 @@ func (s *SDKSource) createWriter( return writer, nil } -func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) { - if w := s.getWriterForTrack(pub.SID()); w != nil { - w.SetTrackMuted(true) +func (s *SDKSource) onTrackPublished(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { + if rp.Identity() != s.Identity { + return } -} -func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participant) { - if w := s.getWriterForTrack(pub.SID()); w != nil { - w.SetTrackMuted(false) + switch pub.Source() { + case livekit.TrackSource_CAMERA, livekit.TrackSource_MICROPHONE: + if err := s.subscribe(pub); err != nil { + logger.Errorw("failed to subscribe to track", err, "trackID", pub.SID()) + } + default: + logger.Infow("ignoring participant track", + "reason", fmt.Sprintf("source %s", pub.Source())) + return } } -func (s *SDKSource) getWriterForTrack(trackID string) *sdk.AppWriter { - if s.audioWriter != nil && s.audioWriter.TrackID() == trackID { - return s.audioWriter +func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) { + s.mu.Lock() + writer := s.writers[pub.SID()] + s.mu.Unlock() + + if writer != nil { + writer.SetTrackMuted(true) } - if s.videoWriter != nil && s.videoWriter.TrackID() == trackID { - return s.videoWriter +} + +func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participant) { + s.mu.Lock() + writer := s.writers[pub.SID()] + s.mu.Unlock() + + if writer != nil { + writer.SetTrackMuted(false) } - return nil } func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { + logger.Debugw("track unsubscribed", "trackID", pub.SID()) s.onTrackFinished(pub.SID()) } func (s *SDKSource) onTrackFinished(trackID string) { - var w *sdk.AppWriter - if s.audioWriter != nil && s.audioWriter.TrackID() == trackID { - logger.Infow("removing audio writer") - w = s.audioWriter - s.audioWriter = nil - } else if s.videoWriter != nil && s.videoWriter.TrackID() == trackID { - logger.Infow("removing video writer") - w = s.videoWriter - s.videoWriter = nil - } else { - return + s.mu.Lock() + writer := s.writers[trackID] + delete(s.writers, trackID) + s.mu.Unlock() + + if writer != nil { + writer.Drain(true) + active := s.active.Dec() + if s.RequestType == types.RequestTypeParticipant { + s.callbacks.OnTrackRemoved(trackID) + s.sync.RemoveTrack(trackID) + } else if active == 0 { + s.onDisconnected() + } } +} - w.Drain(true) - if s.active.Dec() == 0 { +func (s *SDKSource) onParticipantDisconnected(rp *lksdk.RemoteParticipant) { + if rp.Identity() == s.Identity { s.onDisconnected() } } diff --git a/pkg/pipeline/source/source.go b/pkg/pipeline/source/source.go index 49ad6727..79be65d8 100644 --- a/pkg/pipeline/source/source.go +++ b/pkg/pipeline/source/source.go @@ -37,7 +37,8 @@ func New(ctx context.Context, p *config.PipelineConfig, callbacks *gstreamer.Cal types.RequestTypeWeb: return NewWebSource(ctx, p) - case types.RequestTypeTrackComposite, + case types.RequestTypeParticipant, + types.RequestTypeTrackComposite, types.RequestTypeTrack: return NewSDKSource(ctx, p, callbacks) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 62d7e9b3..0e2166c8 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -221,10 +221,6 @@ func parseDebugInfo(gErr *gst.GError) (element, name, message string) { } func (c *Controller) handleMessageStateChanged(msg *gst.Message) { - if c.playing.IsBroken() { - return - } - _, newState := msg.ParseStateChanged() if newState != gst.StatePlaying { return diff --git a/pkg/service/handler.go b/pkg/service/handler.go index 4c681b1d..a7040d14 100644 --- a/pkg/service/handler.go +++ b/pkg/service/handler.go @@ -208,7 +208,7 @@ func (h *Handler) sendUpdate(ctx context.Context, info *livekit.EgressInfo) { } func sendUpdate(ctx context.Context, c rpc.IOInfoClient, info *livekit.EgressInfo) { - requestType, outputType := egress.GetTypes(info) + requestType, outputType := egress.GetTypes(info.Request) switch info.Status { case livekit.EgressStatus_EGRESS_FAILED: logger.Warnw("egress failed", errors.New(info.Error), diff --git a/pkg/service/service.go b/pkg/service/service.go index af878290..4391acb3 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -128,7 +128,7 @@ func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) return nil, err } - requestType, outputType := egress.GetTypes(p.Info) + requestType, outputType := egress.GetTypes(p.Info.Request) logger.Infow("request validated", "egressID", req.EgressId, "requestType", requestType, @@ -183,6 +183,7 @@ func (s *Service) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressR func (s *Service) Status() ([]byte, error) { info := map[string]interface{}{ "CpuLoad": s.GetCPULoad(), + "CpuHold": s.GetCPUHold(), } s.mu.RLock() diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 64198e67..55aeaa0f 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -99,6 +99,7 @@ func (m *Monitor) checkCPUConfig() error { requirements := []float64{ m.cpuCostConfig.RoomCompositeCpuCost, m.cpuCostConfig.WebCpuCost, + m.cpuCostConfig.ParticipantCpuCost, m.cpuCostConfig.TrackCompositeCpuCost, m.cpuCostConfig.TrackCpuCost, } @@ -135,6 +136,13 @@ func (m *Monitor) GetCPULoad() float64 { return (m.cpuStats.NumCPU() - m.cpuStats.GetCPUIdle()) / m.cpuStats.NumCPU() * 100 } +func (m *Monitor) GetCPUHold() float64 { + m.mu.Lock() + defer m.mu.Unlock() + + return m.reserved +} + func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { m.mu.Lock() defer m.mu.Unlock() @@ -165,6 +173,8 @@ func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool { accept = available >= m.cpuCostConfig.RoomCompositeCpuCost case *rpc.StartEgressRequest_Web: accept = available >= m.cpuCostConfig.WebCpuCost + case *rpc.StartEgressRequest_Participant: + accept = available >= m.cpuCostConfig.ParticipantCpuCost case *rpc.StartEgressRequest_TrackComposite: accept = available >= m.cpuCostConfig.TrackCompositeCpuCost case *rpc.StartEgressRequest_Track: @@ -188,6 +198,8 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { cpuHold = m.cpuCostConfig.RoomCompositeCpuCost case *rpc.StartEgressRequest_Web: cpuHold = m.cpuCostConfig.WebCpuCost + case *rpc.StartEgressRequest_Participant: + cpuHold = m.cpuCostConfig.ParticipantCpuCost case *rpc.StartEgressRequest_TrackComposite: cpuHold = m.cpuCostConfig.TrackCompositeCpuCost case *rpc.StartEgressRequest_Track: @@ -206,6 +218,8 @@ func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) { m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Add(1) case *rpc.StartEgressRequest_Web: m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Add(1) + case *rpc.StartEgressRequest_Participant: + m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Add(1) case *rpc.StartEgressRequest_TrackComposite: m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrackComposite}).Add(1) case *rpc.StartEgressRequest_Track: @@ -224,6 +238,9 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) { case *rpc.StartEgressRequest_Web: m.reserved -= m.cpuCostConfig.WebCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1) + case *rpc.StartEgressRequest_Participant: + m.reserved -= m.cpuCostConfig.ParticipantCpuCost + m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1) case *rpc.StartEgressRequest_TrackComposite: m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrackComposite}).Sub(1) @@ -242,6 +259,8 @@ func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) { m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost case *rpc.StartEgressRequest_Web: m.reserved -= m.cpuCostConfig.WebCpuCost + case *rpc.StartEgressRequest_Participant: + m.reserved -= m.cpuCostConfig.ParticipantCpuCost case *rpc.StartEgressRequest_TrackComposite: m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost case *rpc.StartEgressRequest_Track: diff --git a/pkg/types/types.go b/pkg/types/types.go index 9424ccff..a0e66d7e 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -26,6 +26,7 @@ const ( // request types RequestTypeRoomComposite = "room_composite" RequestTypeWeb = "web" + RequestTypeParticipant = "participant" RequestTypeTrackComposite = "track_composite" RequestTypeTrack = "track" diff --git a/test/ffprobe.go b/test/ffprobe.go index a79ea1c9..c20fc780 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -105,8 +105,21 @@ func ffprobe(input string) (*FFProbeInfo, error) { } func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.EgressInfo, egressType types.EgressType, withMuting bool, sourceFramerate float64) { - info, err := ffprobe(in) - require.NoError(t, err, "input %s does not exist", in) + var info *FFProbeInfo + var err error + + done := make(chan struct{}) + go func() { + info, err = ffprobe(in) + close(done) + }() + + select { + case <-time.After(time.Second * 15): + t.Fatal("no response from ffprobe") + case <-done: + require.NoError(t, err, "input %s does not exist", in) + } switch p.Outputs[egressType].GetOutputType() { case types.OutputTypeRaw: diff --git a/test/integration.go b/test/integration.go index 557b3373..20273b9d 100644 --- a/test/integration.go +++ b/test/integration.go @@ -98,8 +98,7 @@ type testCase struct { func (r *Runner) awaitIdle(t *testing.T) { r.svc.KillAll() for i := 0; i < 30; i++ { - status := r.getStatus(t) - if len(status) == 1 { + if r.svc.GetCPUHold() == 0 { return } time.Sleep(time.Second) @@ -122,26 +121,28 @@ func (r *Runner) publishSamplesToRoom(t *testing.T, audioCodec, videoCodec types return } -func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration) { - go func() { - time.Sleep(publishAfter) - done := make(chan struct{}) - pub := r.publish(t, codec, done) - if unpublishAfter != 0 { - time.AfterFunc(unpublishAfter, func() { - select { - case <-done: - return - default: +func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAt, unpublishAt time.Duration) { + if codec != "" { + go func() { + time.Sleep(publishAt) + done := make(chan struct{}) + pub := r.publish(t, codec, done) + if unpublishAt != 0 { + time.AfterFunc(unpublishAt-publishAt, func() { + select { + case <-done: + return + default: + _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + }) + } else { + t.Cleanup(func() { _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) - } - }) - } else { - t.Cleanup(func() { - _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) - }) - } - }() + }) + } + }() + } } func (r *Runner) publishSampleToRoom(t *testing.T, codec types.MimeType, withMuting bool) string { @@ -259,7 +260,7 @@ func (r *Runner) getUpdate(t *testing.T, egressID string) *livekit.EgressInfo { return info } - case <-time.After(time.Minute): + case <-time.After(time.Second * 30): t.Fatal("no update from results channel") return nil } diff --git a/test/participant.go b/test/participant.go new file mode 100644 index 00000000..4135527a --- /dev/null +++ b/test/participant.go @@ -0,0 +1,268 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration + +package test + +import ( + "testing" + "time" + + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" +) + +func (r *Runner) testParticipant(t *testing.T) { + if !r.runParticipantTests() { + return + } + + r.sourceFramerate = 23.97 + r.testParticipantFile(t) + r.testParticipantStream(t) + r.testParticipantSegments(t) + r.testParticipantMulti(t) +} + +func (r *Runner) runParticipantTest( + t *testing.T, name string, test *testCase, + f func(t *testing.T, identity string), +) { + t.Run(name, func(t *testing.T) { + r.awaitIdle(t) + r.publishSampleOffset(t, test.audioCodec, test.audioDelay, test.audioUnpublish) + if test.audioRepublish != 0 { + r.publishSampleOffset(t, test.audioCodec, test.audioRepublish, 0) + } + r.publishSampleOffset(t, test.videoCodec, test.videoDelay, test.videoUnpublish) + if test.videoRepublish != 0 { + r.publishSampleOffset(t, test.videoCodec, test.videoRepublish, 0) + } + f(t, r.room.LocalParticipant.Identity()) + }) +} + +func (r *Runner) testParticipantFile(t *testing.T) { + if !r.runFileTests() { + return + } + + t.Run("Participant/File", func(t *testing.T) { + for _, test := range []*testCase{ + { + name: "VP8", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 8, + audioUnpublish: time.Second * 14, + audioRepublish: time.Second * 20, + videoCodec: types.MimeTypeVP8, + filename: "participant_{publisher_identity}_vp8_{time}.mp4", + }, + { + name: "H264", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + videoUnpublish: time.Second * 10, + videoRepublish: time.Second * 20, + filename: "participant_{room_name}_h264_{time}.mp4", + }, + { + name: "AudioOnly", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + audioUnpublish: time.Second * 10, + audioRepublish: time.Second * 15, + filename: "participant_{room_name}_{time}.mp4", + }, + } { + r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) { + fileOutput := &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: r.getFilePath(test.filename), + } + if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil { + fileOutput.Filepath = test.filename + fileOutput.Output = &livekit.EncodedFileOutput_Azure{ + Azure: r.AzureUpload, + } + } + + participantRequest := &livekit.ParticipantEgressRequest{ + RoomName: r.room.Name(), + Identity: identity, + FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + } + if test.options != nil { + participantRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ + Advanced: test.options, + } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{ + Participant: participantRequest, + }, + } + + test.expectVideoTranscoding = true + r.runFileTest(t, req, test) + }) + if r.Short { + return + } + } + }) +} + +func (r *Runner) testParticipantStream(t *testing.T) { + if !r.runStreamTests() { + return + } + + test := &testCase{ + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 8, + videoCodec: types.MimeTypeVP8, + } + + r.runParticipantTest(t, "Participant/Stream", test, + func(t *testing.T, identity string) { + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{ + Participant: &livekit.ParticipantEgressRequest{ + RoomName: r.room.Name(), + Identity: identity, + StreamOutputs: []*livekit.StreamOutput{{ + Urls: []string{streamUrl1, badStreamUrl1}, + }}, + }, + }, + } + + r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + }, + ) +} + +func (r *Runner) testParticipantSegments(t *testing.T) { + if !r.runSegmentTests() { + return + } + + t.Run("Participant/Segments", func(t *testing.T) { + for _, test := range []*testCase{ + { + name: "VP8", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + // videoDelay: time.Second * 10, + // videoUnpublish: time.Second * 20, + filename: "participant_{publisher_identity}_vp8_{time}", + playlist: "participant_{publisher_identity}_vp8_{time}.m3u8", + }, + { + name: "H264", + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 10, + audioUnpublish: time.Second * 20, + videoCodec: types.MimeTypeH264, + filename: "participant_{room_name}_h264_{time}", + playlist: "participant_{room_name}_h264_{time}.m3u8", + }, + } { + r.runParticipantTest(t, test.name, test, + func(t *testing.T, identity string) { + segmentOutput := &livekit.SegmentedFileOutput{ + FilenamePrefix: r.getFilePath(test.filename), + PlaylistName: test.playlist, + FilenameSuffix: test.filenameSuffix, + } + if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { + segmentOutput.FilenamePrefix = test.filename + segmentOutput.Output = &livekit.SegmentedFileOutput_S3{ + S3: r.S3Upload, + } + } + + trackRequest := &livekit.ParticipantEgressRequest{ + RoomName: r.room.Name(), + Identity: identity, + SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ + Advanced: test.options, + } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{ + Participant: trackRequest, + }, + } + test.expectVideoTranscoding = true + + r.runSegmentsTest(t, req, test) + }, + ) + if r.Short { + return + } + } + }) +} + +func (r *Runner) testParticipantMulti(t *testing.T) { + if !r.runMultiTests() { + return + } + + test := &testCase{ + audioCodec: types.MimeTypeOpus, + audioUnpublish: time.Second * 20, + videoCodec: types.MimeTypeVP8, + videoDelay: time.Second * 10, + } + + r.runParticipantTest(t, "Participant/Multi", test, + func(t *testing.T, identity string) { + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{ + Participant: &livekit.ParticipantEgressRequest{ + RoomName: r.room.Name(), + Identity: identity, + FileOutputs: []*livekit.EncodedFileOutput{{ + FileType: livekit.EncodedFileType_MP4, + Filepath: r.getFilePath("participant_multiple_{time}"), + }}, + StreamOutputs: []*livekit.StreamOutput{{ + Protocol: livekit.StreamProtocol_RTMP, + }}, + }, + }, + } + + r.runMultipleTest(t, req, true, true, false, livekit.SegmentedFileSuffix_INDEX) + }, + ) +} diff --git a/test/room_composite.go b/test/room_composite.go index 3e86435a..00e6ed2c 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -46,11 +46,6 @@ func (r *Runner) runRoomTest(t *testing.T, name string, audioCodec, videoCodec t r.awaitIdle(t) r.publishSamplesToRoom(t, audioCodec, videoCodec) f(t) - if t.Failed() { - r.svc.Stop(true) - r.svc.Reset() - go r.svc.Run() - } }) } diff --git a/test/runner.go b/test/runner.go index d0b0cd60..a86eee82 100644 --- a/test/runner.go +++ b/test/runner.go @@ -176,6 +176,7 @@ func (r *Runner) Run(t *testing.T, svc *service.Service, bus psrpc.MessageBus, t // run tests r.testRoomComposite(t) r.testWeb(t) + r.testParticipant(t) r.testTrackComposite(t) r.testTrack(t) } diff --git a/test/stream.go b/test/stream.go index 4b9be2b0..3476f8b1 100644 --- a/test/stream.go +++ b/test/stream.go @@ -57,6 +57,7 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * // verify and check updates time.Sleep(time.Second * 5) r.verifyStreams(t, p, streamUrl1, streamUrl2) + r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ redactedUrl1: livekit.StreamInfo_ACTIVE, redactedUrl2: livekit.StreamInfo_ACTIVE, diff --git a/test/track_composite.go b/test/track_composite.go index 49b0fa46..09e945f9 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -45,11 +45,6 @@ func (r *Runner) runTrackTest( r.awaitIdle(t) audioTrackID, videoTrackID := r.publishSamplesToRoom(t, audioCodec, videoCodec) f(t, audioTrackID, videoTrackID) - if t.Failed() { - r.svc.Stop(true) - r.svc.Reset() - go r.svc.Run() - } }) } diff --git a/test/web.go b/test/web.go index 488721a7..c91b6ede 100644 --- a/test/web.go +++ b/test/web.go @@ -40,11 +40,6 @@ func (r *Runner) runWebTest(t *testing.T, name string, f func(t *testing.T)) { t.Run(name, func(t *testing.T) { r.awaitIdle(t) f(t) - if t.Failed() { - r.svc.Stop(true) - r.svc.Reset() - go r.svc.Run() - } }) }