From 24945da70c4ed2c6f7b5889bf2d1d5ec1fa58f18 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 8 Nov 2024 13:14:31 -0800 Subject: [PATCH] change to single location field (#803) * change to single location field * update imageSink * fix debug --- go.mod | 4 +- go.sum | 8 ++-- pkg/config/manifest.go | 50 +++++++++------------ pkg/pipeline/controller.go | 3 +- pkg/pipeline/debug.go | 4 +- pkg/pipeline/sink/file.go | 14 +++--- pkg/pipeline/sink/image.go | 14 +++--- pkg/pipeline/sink/segments.go | 19 ++++---- pkg/pipeline/sink/sink.go | 2 +- pkg/pipeline/sink/uploader/alioss.go | 12 ++--- pkg/pipeline/sink/uploader/azure.go | 14 +++--- pkg/pipeline/sink/uploader/gcp.go | 12 ++--- pkg/pipeline/sink/uploader/local.go | 14 +++--- pkg/pipeline/sink/uploader/s3.go | 14 +++--- pkg/pipeline/sink/uploader/uploader.go | 16 +++---- pkg/pipeline/sink/uploader/uploader_test.go | 10 ++--- pkg/pipeline/sink/websocket.go | 4 +- 17 files changed, 100 insertions(+), 114 deletions(-) diff --git a/go.mod b/go.mod index 84ee2630..2f1867a8 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 + github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/pion/rtp v1.8.9 @@ -137,7 +137,7 @@ require ( go.opentelemetry.io/otel/metric v1.29.0 // indirect go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap/exp v0.2.0 // indirect + go.uber.org/zap/exp v0.3.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/go.sum b/go.sum index fed35c3b..f0a5d1ac 100644 --- a/go.sum +++ b/go.sum @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4= -github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw= +github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= @@ -357,8 +357,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= -go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= +go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U= +go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/pkg/config/manifest.go b/pkg/config/manifest.go index c3735547..8ba9e8e0 100644 --- a/pkg/config/manifest.go +++ b/pkg/config/manifest.go @@ -42,29 +42,25 @@ type Manifest struct { } type File struct { - Filename string `json:"filename,omitempty"` - Location string `json:"location,omitempty"` - PresignedUrl string `json:"presigned_url,omitempty"` + Filename string `json:"filename,omitempty"` + Location string `json:"location,omitempty"` } type Playlist struct { - mu sync.Mutex - Location string `json:"location,omitempty"` - PresignedUrl string `json:"presigned_url,omitempty"` - Segments []*Segment `json:"segments,omitempty"` + mu sync.Mutex + Location string `json:"location,omitempty"` + Segments []*Segment `json:"segments,omitempty"` } type Segment struct { - Filename string `json:"filename,omitempty"` - Location string `json:"location,omitempty"` - PresignedUrl string `json:"presigned_url,omitempty"` + Filename string `json:"filename,omitempty"` + Location string `json:"location,omitempty"` } type Image struct { - Filename string `json:"filename,omitempty"` - Timestamp time.Time `json:"timestamp,omitempty"` - Location string `json:"location,omitempty"` - PresignedUrl string `json:"presigned_url,omitempty"` + Filename string `json:"filename,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` + Location string `json:"location,omitempty"` } func (p *PipelineConfig) initManifest() { @@ -103,12 +99,11 @@ func (p *PipelineConfig) shouldCreateManifest() bool { return false } -func (m *Manifest) AddFile(filename, location, presignedUrl string) { +func (m *Manifest) AddFile(filename, location string) { m.mu.Lock() m.Files = append(m.Files, &File{ - Filename: filename, - Location: location, - PresignedUrl: presignedUrl, + Filename: filename, + Location: location, }) m.mu.Unlock() } @@ -123,30 +118,27 @@ func (m *Manifest) AddPlaylist() *Playlist { return p } -func (p *Playlist) UpdateLocation(location, presignedUrl string) { +func (p *Playlist) UpdateLocation(location string) { p.mu.Lock() p.Location = location - p.PresignedUrl = presignedUrl p.mu.Unlock() } -func (p *Playlist) AddSegment(filename, location, presignedUrl string) { +func (p *Playlist) AddSegment(filename, location string) { p.mu.Lock() p.Segments = append(p.Segments, &Segment{ - Filename: filename, - Location: location, - PresignedUrl: presignedUrl, + Filename: filename, + Location: location, }) p.mu.Unlock() } -func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUrl string) { +func (m *Manifest) AddImage(filename string, ts time.Time, location string) { m.mu.Lock() m.Images = append(m.Images, &Image{ - Filename: filename, - Timestamp: ts, - Location: location, - PresignedUrl: presignedUrl, + Filename: filename, + Timestamp: ts, + Location: location, }) m.mu.Unlock() } diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index bc820411..d6e42ee6 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -653,7 +653,7 @@ func (c *Controller) uploadManifest() { infoUpdated := false for _, si := range c.sinks { for _, s := range si { - location, presignedUrl, uploaded, err := s.UploadManifest(manifestPath) + location, uploaded, err := s.UploadManifest(manifestPath) if err != nil { logger.Errorw("failed to upload manifest", err) continue @@ -661,7 +661,6 @@ func (c *Controller) uploadManifest() { if !infoUpdated && uploaded { c.Info.ManifestLocation = location - c.Info.ManifestPresignedUrl = presignedUrl infoUpdated = true } } diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index a037fa8f..4ccb9d4a 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -90,7 +90,7 @@ func (c *Controller) uploadTrackFiles(u *uploader.Uploader) { if strings.HasSuffix(f.Name(), ".csv") { local := path.Join(c.TmpDir, f.Name()) storage := path.Join(c.Info.EgressId, f.Name()) - _, _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) + _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload debug file", err) return @@ -131,7 +131,7 @@ func (c *Controller) uploadDebugFile(u *uploader.Uploader, data []byte, fileExte return } - _, _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false) + _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload debug file", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index 77637340..a4f71ef0 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -42,7 +42,7 @@ func (s *FileSink) Start() error { } func (s *FileSink) Close() error { - location, size, presignedUrl, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) + location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) if err != nil { return err } @@ -51,22 +51,22 @@ func (s *FileSink) Close() error { s.FileInfo.Size = size if s.conf.Manifest != nil { - s.conf.Manifest.AddFile(s.StorageFilepath, location, presignedUrl) + s.conf.Manifest.AddFile(s.StorageFilepath, location) } return nil } -func (s *FileSink) UploadManifest(filepath string) (string, string, bool, error) { +func (s *FileSink) UploadManifest(filepath string) (string, bool, error) { if s.DisableManifest && !s.ManifestRequired() { - return "", "", false, nil + return "", false, nil } storagePath := path.Join(path.Dir(s.StorageFilepath), path.Base(filepath)) - location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) + location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) if err != nil { - return "", "", false, err + return "", false, err } - return location, presignedUrl, true, nil + return location, true, nil } diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index 2076a0a5..deeaefcf 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -106,13 +106,13 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error { imageStoragePath := path.Join(s.StorageDir, filename) - location, _, presignedUrl, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) + location, _, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) if err != nil { return err } if s.conf.Manifest != nil { - s.conf.Manifest.AddImage(imageStoragePath, ts, location, presignedUrl) + s.conf.Manifest.AddImage(imageStoragePath, ts, location) } return nil @@ -150,16 +150,16 @@ func (s *ImageSink) Close() error { return nil } -func (s *ImageSink) UploadManifest(filepath string) (string, string, bool, error) { +func (s *ImageSink) UploadManifest(filepath string) (string, bool, error) { if s.DisableManifest && !s.ManifestRequired() { - return "", "", false, nil + return "", false, nil } storagePath := path.Join(s.StorageDir, path.Base(filepath)) - location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) + location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) if err != nil { - return "", "", false, err + return "", false, err } - return location, presignedUrl, true, nil + return location, true, nil } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 119ae5be..377418d9 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -154,7 +154,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { go func() { defer close(update.uploadComplete) - location, size, presignedUrl, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) + location, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) if err != nil { s.callbacks.OnError(err) return @@ -165,7 +165,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { s.SegmentsInfo.SegmentCount++ s.SegmentsInfo.Size += size if s.manifestPlaylist != nil { - s.manifestPlaylist.AddSegment(segmentStoragePath, location, presignedUrl) + s.manifestPlaylist.AddSegment(segmentStoragePath, location) } s.infoLock.Unlock() }() @@ -222,12 +222,11 @@ func (s *SegmentSink) shouldUploadPlaylist() bool { func (s *SegmentSink) uploadPlaylist() error { playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - playlistLocation, _, presignedUrl, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + playlistLocation, _, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) if err == nil { s.SegmentsInfo.PlaylistLocation = playlistLocation if s.manifestPlaylist != nil { s.manifestPlaylist.Location = playlistLocation - s.manifestPlaylist.PresignedUrl = presignedUrl } } return err @@ -236,7 +235,7 @@ func (s *SegmentSink) uploadPlaylist() error { func (s *SegmentSink) uploadLivePlaylist() error { liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - livePlaylistLocation, _, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + livePlaylistLocation, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) if err == nil { s.SegmentsInfo.LivePlaylistLocation = livePlaylistLocation } @@ -322,16 +321,16 @@ func (s *SegmentSink) Close() error { return nil } -func (s *SegmentSink) UploadManifest(filepath string) (string, string, bool, error) { +func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) { if s.DisableManifest && !s.ManifestRequired() { - return "", "", false, nil + return "", false, nil } storagePath := path.Join(s.StorageDir, path.Base(filepath)) - location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) + location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) if err != nil { - return "", "", false, err + return "", false, err } - return location, presignedUrl, true, nil + return location, true, nil } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index dcae7ed4..d33934c5 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -25,7 +25,7 @@ import ( type Sink interface { Start() error Close() error - UploadManifest(string) (string, string, bool, error) + UploadManifest(string) (string, bool, error) } func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) { diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 6e60fcad..3d117372 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -45,28 +45,28 @@ func newAliOSSUploader(c *config.StorageConfig) (uploader, error) { }, nil } -func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) { +func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { storageFilepath = path.Join(u.prefix, storageFilepath) stat, err := os.Stat(localFilepath) if err != nil { - return "", 0, "", errors.ErrUploadFailed("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret) if err != nil { - return "", 0, "", errors.ErrUploadFailed("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } bucket, err := client.Bucket(u.conf.Bucket) if err != nil { - return "", 0, "", errors.ErrUploadFailed("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } err = bucket.PutObjectFromFile(storageFilepath, localFilepath) if err != nil { - return "", 0, "", errors.ErrUploadFailed("AliOSS", err) + return "", 0, errors.ErrUploadFailed("AliOSS", err) } - return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), "", nil + return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), nil } diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index d8d46c41..dc1db595 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -49,7 +49,7 @@ func newAzureUploader(c *config.StorageConfig) (uploader, error) { }, nil } -func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, string, error) { +func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { storageFilepath = path.Join(u.prefix, storageFilepath) credential, err := azblob.NewSharedKeyCredential( @@ -57,12 +57,12 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType u.conf.AccountKey, ) if err != nil { - return "", 0, "", errors.ErrUploadFailed("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } azUrl, err := url.Parse(u.container) if err != nil { - return "", 0, "", errors.ErrUploadFailed("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{ @@ -78,7 +78,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType file, err := os.Open(localFilepath) if err != nil { - return "", 0, "", errors.ErrUploadFailed("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } defer func() { _ = file.Close() @@ -86,7 +86,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType stat, err := file.Stat() if err != nil { - return "", 0, "", errors.ErrUploadFailed("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } // upload blocks in parallel for optimal performance @@ -97,8 +97,8 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType Parallelism: 16, }) if err != nil { - return "", 0, "", errors.ErrUploadFailed("Azure", err) + return "", 0, errors.ErrUploadFailed("Azure", err) } - return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), "", nil + return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil } diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index aa2d6b33..84783474 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -92,12 +92,12 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) { return u, nil } -func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) { +func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { storageFilepath = path.Join(u.prefix, storageFilepath) file, err := os.Open(localFilepath) if err != nil { - return "", 0, "", errors.ErrUploadFailed("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } defer func() { _ = file.Close() @@ -105,7 +105,7 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp stat, err := file.Stat() if err != nil { - return "", 0, "", errors.ErrUploadFailed("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer( @@ -120,12 +120,12 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp wc.ChunkRetryDeadline = 0 if _, err = io.Copy(wc, file); err != nil { - return "", 0, "", errors.ErrUploadFailed("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } if err = wc.Close(); err != nil { - return "", 0, "", errors.ErrUploadFailed("GCP", err) + return "", 0, errors.ErrUploadFailed("GCP", err) } - return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), "", nil + return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), nil } diff --git a/pkg/pipeline/sink/uploader/local.go b/pkg/pipeline/sink/uploader/local.go index d3b0aabc..170859a3 100644 --- a/pkg/pipeline/sink/uploader/local.go +++ b/pkg/pipeline/sink/uploader/local.go @@ -31,35 +31,35 @@ func newLocalUploader(c *config.StorageConfig) (*localUploader, error) { return &localUploader{prefix: c.PathPrefix}, nil } -func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) { +func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { storageFilepath = path.Join(u.prefix, storageFilepath) stat, err := os.Stat(localFilepath) if err != nil { - return "", 0, "", err + return "", 0, err } dir, _ := path.Split(storageFilepath) if err = os.MkdirAll(dir, 0755); err != nil { - return "", 0, "", err + return "", 0, err } local, err := os.Open(localFilepath) if err != nil { - return "", 0, "", err + return "", 0, err } defer local.Close() storage, err := os.Create(storageFilepath) if err != nil { - return "", 0, "", err + return "", 0, err } defer storage.Close() _, err = io.Copy(storage, local) if err != nil { - return "", 0, "", err + return "", 0, err } - return storageFilepath, stat.Size(), "", nil + return storageFilepath, stat.Size(), nil } diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index a3e612b7..94860f95 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -138,13 +138,13 @@ func updateRegion(awsConf *aws.Config, bucket string) error { func (u *S3Uploader) upload( localFilepath, storageFilepath string, outputType types.OutputType, -) (string, int64, string, error) { +) (string, int64, error) { storageFilepath = path.Join(u.prefix, storageFilepath) file, err := os.Open(localFilepath) if err != nil { - return "", 0, "", errors.ErrUploadFailed("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } defer func() { _ = file.Close() @@ -152,7 +152,7 @@ func (u *S3Uploader) upload( stat, err := file.Stat() if err != nil { - return "", 0, "", errors.ErrUploadFailed("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } l := &s3Logger{ @@ -183,7 +183,7 @@ func (u *S3Uploader) upload( if _, err = manager.NewUploader(client).Upload(context.Background(), input); err != nil { l.log() - return "", 0, "", errors.ErrUploadFailed("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } endpoint := "s3.amazonaws.com" @@ -199,7 +199,7 @@ func (u *S3Uploader) upload( } if !u.generatePresignedUrl { - return location, stat.Size(), "", nil + return location, stat.Size(), nil } res, err := s3.NewPresignClient(client).PresignGetObject(context.Background(), &s3.GetObjectInput{ @@ -207,10 +207,10 @@ func (u *S3Uploader) upload( Key: aws.String(storageFilepath), }) if err != nil { - return "", 0, "", errors.ErrUploadFailed("S3", err) + return "", 0, errors.ErrUploadFailed("S3", err) } - return location, stat.Size(), res.URL, nil + return res.URL, stat.Size(), nil } // s3Logger only logs aws messages on upload failure diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 65c4afed..681af39b 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -32,7 +32,7 @@ const ( ) type uploader interface { - upload(string, string, types.OutputType) (string, int64, string, error) + upload(string, string, types.OutputType) (string, int64, error) } type Uploader struct { @@ -86,10 +86,10 @@ func (u *Uploader) Upload( localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, -) (string, int64, string, error) { +) (string, int64, error) { start := time.Now() - location, size, presignedUrl, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType) + location, size, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType) elapsed := time.Since(start) if primaryErr == nil { @@ -100,14 +100,14 @@ func (u *Uploader) Upload( if deleteAfterUpload { _ = os.Remove(localFilepath) } - return location, size, presignedUrl, nil + return location, size, nil } if u.monitor != nil { u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds())) } if u.backup != nil { - location, size, presignedUrl, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) + location, size, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) if backupErr == nil { u.backupUsed = true if u.monitor != nil { @@ -116,14 +116,14 @@ func (u *Uploader) Upload( if deleteAfterUpload { _ = os.Remove(localFilepath) } - return location, size, presignedUrl, nil + return location, size, nil } - return "", 0, "", psrpc.NewErrorf(psrpc.InvalidArgument, + return "", 0, psrpc.NewErrorf(psrpc.InvalidArgument, "primary: %s\nbackup: %s", primaryErr.Error(), backupErr.Error()) } - return "", 0, "", primaryErr + return "", 0, primaryErr } func (u *Uploader) ManifestRequired() bool { diff --git a/pkg/pipeline/sink/uploader/uploader_test.go b/pkg/pipeline/sink/uploader/uploader_test.go index c1f5db52..07af5c9e 100644 --- a/pkg/pipeline/sink/uploader/uploader_test.go +++ b/pkg/pipeline/sink/uploader/uploader_test.go @@ -1,7 +1,6 @@ package uploader import ( - "fmt" "io" "net/http" "os" @@ -44,16 +43,13 @@ func TestUploader(t *testing.T) { filepath := "uploader_test.go" storagePath := "uploader_test.go" - location, size, presignedUrl, err := u.Upload(filepath, storagePath, "test/plain", false) + location, size, err := u.Upload(filepath, storagePath, "test/plain", false) require.NoError(t, err) - expectedLocation := fmt.Sprintf("https://%s.s3.amazonaws.com/testProject/uploader_test.go", bucket) - - require.Equal(t, expectedLocation, location) require.NotZero(t, size) - require.NotEmpty(t, presignedUrl) + require.NotEmpty(t, location) - response, err := http.Get(presignedUrl) + response, err := http.Get(location) require.NoError(t, err) defer response.Body.Close() diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 26febb35..981b714c 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -219,6 +219,6 @@ func (s *WebsocketSink) Close() error { return nil } -func (s *WebsocketSink) UploadManifest(_ string) (string, string, bool, error) { - return "", "", false, nil +func (s *WebsocketSink) UploadManifest(_ string) (string, bool, error) { + return "", false, nil }