Skip to content

Commit

Permalink
change to single location field (#803)
Browse files Browse the repository at this point in the history
* change to single location field

* update imageSink

* fix debug
  • Loading branch information
frostbyte73 authored Nov 8, 2024
1 parent e4fd0e0 commit 24945da
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 114 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/pion/rtp v1.8.9
Expand Down Expand Up @@ -137,7 +137,7 @@ require (
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down Expand Up @@ -357,8 +357,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs=
go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ=
go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U=
go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
50 changes: 21 additions & 29 deletions pkg/config/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,25 @@ type Manifest struct {
}

type File struct {
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
}

type Playlist struct {
mu sync.Mutex
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Segments []*Segment `json:"segments,omitempty"`
mu sync.Mutex
Location string `json:"location,omitempty"`
Segments []*Segment `json:"segments,omitempty"`
}

type Segment struct {
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Filename string `json:"filename,omitempty"`
Location string `json:"location,omitempty"`
}

type Image struct {
Filename string `json:"filename,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Location string `json:"location,omitempty"`
PresignedUrl string `json:"presigned_url,omitempty"`
Filename string `json:"filename,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Location string `json:"location,omitempty"`
}

func (p *PipelineConfig) initManifest() {
Expand Down Expand Up @@ -103,12 +99,11 @@ func (p *PipelineConfig) shouldCreateManifest() bool {
return false
}

func (m *Manifest) AddFile(filename, location, presignedUrl string) {
func (m *Manifest) AddFile(filename, location string) {
m.mu.Lock()
m.Files = append(m.Files, &File{
Filename: filename,
Location: location,
PresignedUrl: presignedUrl,
Filename: filename,
Location: location,
})
m.mu.Unlock()
}
Expand All @@ -123,30 +118,27 @@ func (m *Manifest) AddPlaylist() *Playlist {
return p
}

func (p *Playlist) UpdateLocation(location, presignedUrl string) {
func (p *Playlist) UpdateLocation(location string) {
p.mu.Lock()
p.Location = location
p.PresignedUrl = presignedUrl
p.mu.Unlock()
}

func (p *Playlist) AddSegment(filename, location, presignedUrl string) {
func (p *Playlist) AddSegment(filename, location string) {
p.mu.Lock()
p.Segments = append(p.Segments, &Segment{
Filename: filename,
Location: location,
PresignedUrl: presignedUrl,
Filename: filename,
Location: location,
})
p.mu.Unlock()
}

func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUrl string) {
func (m *Manifest) AddImage(filename string, ts time.Time, location string) {
m.mu.Lock()
m.Images = append(m.Images, &Image{
Filename: filename,
Timestamp: ts,
Location: location,
PresignedUrl: presignedUrl,
Filename: filename,
Timestamp: ts,
Location: location,
})
m.mu.Unlock()
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,15 +653,14 @@ func (c *Controller) uploadManifest() {
infoUpdated := false
for _, si := range c.sinks {
for _, s := range si {
location, presignedUrl, uploaded, err := s.UploadManifest(manifestPath)
location, uploaded, err := s.UploadManifest(manifestPath)
if err != nil {
logger.Errorw("failed to upload manifest", err)
continue
}

if !infoUpdated && uploaded {
c.Info.ManifestLocation = location
c.Info.ManifestPresignedUrl = presignedUrl
infoUpdated = true
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *Controller) uploadTrackFiles(u *uploader.Uploader) {
if strings.HasSuffix(f.Name(), ".csv") {
local := path.Join(c.TmpDir, f.Name())
storage := path.Join(c.Info.EgressId, f.Name())
_, _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
if err != nil {
logger.Errorw("failed to upload debug file", err)
return
Expand Down Expand Up @@ -131,7 +131,7 @@ func (c *Controller) uploadDebugFile(u *uploader.Uploader, data []byte, fileExte
return
}

_, _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false)
_, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false)
if err != nil {
logger.Errorw("failed to upload debug file", err)
return
Expand Down
14 changes: 7 additions & 7 deletions pkg/pipeline/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *FileSink) Start() error {
}

func (s *FileSink) Close() error {
location, size, presignedUrl, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
if err != nil {
return err
}
Expand All @@ -51,22 +51,22 @@ func (s *FileSink) Close() error {
s.FileInfo.Size = size

if s.conf.Manifest != nil {
s.conf.Manifest.AddFile(s.StorageFilepath, location, presignedUrl)
s.conf.Manifest.AddFile(s.StorageFilepath, location)
}

return nil
}

func (s *FileSink) UploadManifest(filepath string) (string, string, bool, error) {
func (s *FileSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
return "", "", false, nil
return "", false, nil
}

storagePath := path.Join(path.Dir(s.StorageFilepath), path.Base(filepath))
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
if err != nil {
return "", "", false, err
return "", false, err
}

return location, presignedUrl, true, nil
return location, true, nil
}
14 changes: 7 additions & 7 deletions pkg/pipeline/sink/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error {

imageStoragePath := path.Join(s.StorageDir, filename)

location, _, presignedUrl, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true)
location, _, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true)
if err != nil {
return err
}

if s.conf.Manifest != nil {
s.conf.Manifest.AddImage(imageStoragePath, ts, location, presignedUrl)
s.conf.Manifest.AddImage(imageStoragePath, ts, location)
}

return nil
Expand Down Expand Up @@ -150,16 +150,16 @@ func (s *ImageSink) Close() error {
return nil
}

func (s *ImageSink) UploadManifest(filepath string) (string, string, bool, error) {
func (s *ImageSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
return "", "", false, nil
return "", false, nil
}

storagePath := path.Join(s.StorageDir, path.Base(filepath))
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
if err != nil {
return "", "", false, err
return "", false, err
}

return location, presignedUrl, true, nil
return location, true, nil
}
19 changes: 9 additions & 10 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
go func() {
defer close(update.uploadComplete)

location, size, presignedUrl, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
location, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
if err != nil {
s.callbacks.OnError(err)
return
Expand All @@ -165,7 +165,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
s.SegmentsInfo.SegmentCount++
s.SegmentsInfo.Size += size
if s.manifestPlaylist != nil {
s.manifestPlaylist.AddSegment(segmentStoragePath, location, presignedUrl)
s.manifestPlaylist.AddSegment(segmentStoragePath, location)
}
s.infoLock.Unlock()
}()
Expand Down Expand Up @@ -222,12 +222,11 @@ func (s *SegmentSink) shouldUploadPlaylist() bool {
func (s *SegmentSink) uploadPlaylist() error {
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
playlistLocation, _, presignedUrl, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
playlistLocation, _, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
if err == nil {
s.SegmentsInfo.PlaylistLocation = playlistLocation
if s.manifestPlaylist != nil {
s.manifestPlaylist.Location = playlistLocation
s.manifestPlaylist.PresignedUrl = presignedUrl
}
}
return err
Expand All @@ -236,7 +235,7 @@ func (s *SegmentSink) uploadPlaylist() error {
func (s *SegmentSink) uploadLivePlaylist() error {
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
livePlaylistLocation, _, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
livePlaylistLocation, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
if err == nil {
s.SegmentsInfo.LivePlaylistLocation = livePlaylistLocation
}
Expand Down Expand Up @@ -322,16 +321,16 @@ func (s *SegmentSink) Close() error {
return nil
}

func (s *SegmentSink) UploadManifest(filepath string) (string, string, bool, error) {
func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
return "", "", false, nil
return "", false, nil
}

storagePath := path.Join(s.StorageDir, path.Base(filepath))
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
if err != nil {
return "", "", false, err
return "", false, err
}

return location, presignedUrl, true, nil
return location, true, nil
}
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type Sink interface {
Start() error
Close() error
UploadManifest(string) (string, string, bool, error)
UploadManifest(string) (string, bool, error)
}

func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/sink/uploader/alioss.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,28 @@ func newAliOSSUploader(c *config.StorageConfig) (uploader, error) {
}, nil
}

func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) {
func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
storageFilepath = path.Join(u.prefix, storageFilepath)

stat, err := os.Stat(localFilepath)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

bucket, err := client.Bucket(u.conf.Bucket)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

err = bucket.PutObjectFromFile(storageFilepath, localFilepath)
if err != nil {
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
return "", 0, errors.ErrUploadFailed("AliOSS", err)
}

return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), "", nil
return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), nil
}
Loading

0 comments on commit 24945da

Please sign in to comment.