Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change to single location field #803

Merged
merged 3 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/pion/rtp v1.8.9
Expand Down Expand Up @@ -137,7 +137,7 @@ require (
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down Expand Up @@ -357,8 +357,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs=
go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ=
go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U=
go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
50 changes: 21 additions & 29 deletions pkg/config/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,25 @@ type Manifest struct {
}

type File struct {
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
}

type Playlist struct {
mu sync.Mutex
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Segments []*Segment `json:"segments,omitempty"`
mu sync.Mutex
Location string `json:"location,omitempty"`
Segments []*Segment `json:"segments,omitempty"`
}

type Segment struct {
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
}

type Image struct {
Filename string `json:"filename,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Filename string `json:"filename,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Location string `json:"location,omitempty"`
}

func (p *PipelineConfig) initManifest() {
Expand Down Expand Up @@ -103,12 +99,11 @@ func (p *PipelineConfig) shouldCreateManifest() bool {
return false
}

func (m *Manifest) AddFile(filename, location, presignedUrl string) {
func (m *Manifest) AddFile(filename, location string) {
m.mu.Lock()
m.Files = append(m.Files, &File{
Filename: filename,
Location: location,
PresignedUrl: presignedUrl,
Filename: filename,
Location: location,
})
m.mu.Unlock()
}
Expand All @@ -123,30 +118,27 @@ func (m *Manifest) AddPlaylist() *Playlist {
return p
}

func (p *Playlist) UpdateLocation(location, presignedUrl string) {
func (p *Playlist) UpdateLocation(location string) {
p.mu.Lock()
p.Location = location
p.PresignedUrl = presignedUrl
p.mu.Unlock()
}

func (p *Playlist) AddSegment(filename, location, presignedUrl string) {
func (p *Playlist) AddSegment(filename, location string) {
p.mu.Lock()
p.Segments = append(p.Segments, &Segment{
Filename: filename,
Location: location,
PresignedUrl: presignedUrl,
Filename: filename,
Location: location,
})
p.mu.Unlock()
}

func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUrl string) {
func (m *Manifest) AddImage(filename string, ts time.Time, location string) {
m.mu.Lock()
m.Images = append(m.Images, &Image{
Filename: filename,
Timestamp: ts,
Location: location,
PresignedUrl: presignedUrl,
Filename: filename,
Timestamp: ts,
Location: location,
})
m.mu.Unlock()
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,15 +653,14 @@ func (c *Controller) uploadManifest() {
infoUpdated := false
for _, si := range c.sinks {
for _, s := range si {
location, presignedUrl, uploaded, err := s.UploadManifest(manifestPath)
location, uploaded, err := s.UploadManifest(manifestPath)
if err != nil {
logger.Errorw("failed to upload manifest", err)
continue
}

if !infoUpdated && uploaded {
c.Info.ManifestLocation = location
c.Info.ManifestPresignedUrl = presignedUrl
infoUpdated = true
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *Controller) uploadTrackFiles(u *uploader.Uploader) {
if strings.HasSuffix(f.Name(), ".csv") {
local := path.Join(c.TmpDir, f.Name())
storage := path.Join(c.Info.EgressId, f.Name())
_, _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
if err != nil {
logger.Errorw("failed to upload debug file", err)
return
Expand Down Expand Up @@ -131,7 +131,7 @@ func (c *Controller) uploadDebugFile(u *uploader.Uploader, data []byte, fileExte
return
}

_, _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false)
_, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false)
if err != nil {
logger.Errorw("failed to upload debug file", err)
return
Expand Down
14 changes: 7 additions & 7 deletions pkg/pipeline/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *FileSink) Start() error {
}

func (s *FileSink) Close() error {
location, size, presignedUrl, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
if err != nil {
return err
}
Expand All @@ -51,22 +51,22 @@ func (s *FileSink) Close() error {
s.FileInfo.Size = size

if s.conf.Manifest != nil {
s.conf.Manifest.AddFile(s.StorageFilepath, location, presignedUrl)
s.conf.Manifest.AddFile(s.StorageFilepath, location)
}

return nil
}

func (s *FileSink) UploadManifest(filepath string) (string, string, bool, error) {
func (s *FileSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
return "", "", false, nil
return "", false, nil
}

storagePath := path.Join(path.Dir(s.StorageFilepath), path.Base(filepath))
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
if err != nil {
return "", "", false, err
return "", false, err
}

return location, presignedUrl, true, nil
return location, true, nil
}
14 changes: 7 additions & 7 deletions pkg/pipeline/sink/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error {

imageStoragePath := path.Join(s.StorageDir, filename)

location, _, presignedUrl, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true)
location, _, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true)
if err != nil {
return err
}

if s.conf.Manifest != nil {
s.conf.Manifest.AddImage(imageStoragePath, ts, location, presignedUrl)
s.conf.Manifest.AddImage(imageStoragePath, ts, location)
}

return nil
Expand Down Expand Up @@ -150,16 +150,16 @@ func (s *ImageSink) Close() error {
return nil
}

func (s *ImageSink) UploadManifest(filepath string) (string, string, bool, error) {
func (s *ImageSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
return "", "", false, nil
return "", false, nil
}

storagePath := path.Join(s.StorageDir, path.Base(filepath))
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
if err != nil {
return "", "", false, err
return "", false, err
}

return location, presignedUrl, true, nil
return location, true, nil
}
19 changes: 9 additions & 10 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
go func() {
defer close(update.uploadComplete)

location, size, presignedUrl, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
location, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
if err != nil {
s.callbacks.OnError(err)
return
Expand All @@ -165,7 +165,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
s.SegmentsInfo.SegmentCount++
s.SegmentsInfo.Size += size
if s.manifestPlaylist != nil {
s.manifestPlaylist.AddSegment(segmentStoragePath, location, presignedUrl)
s.manifestPlaylist.AddSegment(segmentStoragePath, location)
}
s.infoLock.Unlock()
}()
Expand Down Expand Up @@ -222,12 +222,11 @@ func (s *SegmentSink) shouldUploadPlaylist() bool {
func (s *SegmentSink) uploadPlaylist() error {
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
playlistLocation, _, presignedUrl, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
playlistLocation, _, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
if err == nil {
s.SegmentsInfo.PlaylistLocation = playlistLocation
if s.manifestPlaylist != nil {
s.manifestPlaylist.Location = playlistLocation
s.manifestPlaylist.PresignedUrl = presignedUrl
}
}
return err
Expand All @@ -236,7 +235,7 @@ func (s *SegmentSink) uploadPlaylist() error {
func (s *SegmentSink) uploadLivePlaylist() error {
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
livePlaylistLocation, _, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
livePlaylistLocation, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
if err == nil {
s.SegmentsInfo.LivePlaylistLocation = livePlaylistLocation
}
Expand Down Expand Up @@ -322,16 +321,16 @@ func (s *SegmentSink) Close() error {
return nil
}

func (s *SegmentSink) UploadManifest(filepath string) (string, string, bool, error) {
func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
return "", "", false, nil
return "", false, nil
}

storagePath := path.Join(s.StorageDir, path.Base(filepath))
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
if err != nil {
return "", "", false, err
return "", false, err
}

return location, presignedUrl, true, nil
return location, true, nil
}
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type Sink interface {
Start() error
Close() error
UploadManifest(string) (string, string, bool, error)
UploadManifest(string) (string, bool, error)
}

func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/sink/uploader/alioss.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,28 @@ func newAliOSSUploader(c *config.StorageConfig) (uploader, error) {
}, nil
}

func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) {
func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
storageFilepath = path.Join(u.prefix, storageFilepath)

stat, err := os.Stat(localFilepath)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

bucket, err := client.Bucket(u.conf.Bucket)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

err = bucket.PutObjectFromFile(storageFilepath, localFilepath)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), "", nil
return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), nil
}
Loading