Skip to content

Commit

Permalink
Merge pull request #7 from tarosky/issue/6
Browse files Browse the repository at this point in the history
Upload image to S3 to remove dependency of imgconv on EFS
Close #7.
  • Loading branch information
harai authored Jan 13, 2021
2 parents a81173b + 42de453 commit 3e68a4b
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 55 deletions.
194 changes: 151 additions & 43 deletions imgserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ const (
lastModifiedHeader = "Last-Modified"

s3ErrCodeNotFound = "NotFound" // https://forums.aws.amazon.com/thread.jspa?threadID=145909

timestampMetadata = "Original-Timestamp"
pathMetadata = "Original-Path"
)

type config struct {
region string
accessKeyID string
secretAccessKey string
s3Bucket string
s3KeyBase string
s3SrcKeyBase string
s3DestKeyBase string
sqsQueueURL string
sqsBatchWaitTime uint
efsMountPath string
Expand Down Expand Up @@ -114,8 +118,13 @@ func main() {
Required: true,
},
&cli.StringFlag{
Name: "s3-key-base",
Aliases: []string{"k"},
Name: "s3-src-key-base",
Aliases: []string{"sk"},
Required: true,
},
&cli.StringFlag{
Name: "s3-dest-key-base",
Aliases: []string{"dk"},
Required: true,
},
&cli.StringFlag{
Expand Down Expand Up @@ -162,7 +171,8 @@ func main() {
cfg := &config{
region: c.String("region"),
s3Bucket: c.String("s3-bucket"),
s3KeyBase: c.String("s3-key-base"),
s3SrcKeyBase: c.String("s3-src-key-base"),
s3DestKeyBase: c.String("s3-dest-key-base"),
sqsQueueURL: c.String("sqs-queue-url"),
sqsBatchWaitTime: c.Uint("sqs-batch-wait-time"),
efsMountPath: c.String("efs-mount-path"),
Expand Down Expand Up @@ -468,6 +478,20 @@ type efsImageReader struct {
err error
}

func (r *efsImageReader) Close() error {
if r.reader != nil {
return r.reader.Close()
}
return nil
}

func (r *efsImageReader) Seek(offset int64, whence int) (int64, error) {
if r.reader != nil {
return r.reader.Seek(offset, whence)
}
return 0, nil
}

type offset int

const (
Expand All @@ -488,7 +512,10 @@ func (b *efsFileBody) Read(p []byte) (n int, err error) {
}

func (b *efsFileBody) Close() error {
return b.body.Close()
if b.body != nil {
return b.body.Close()
}
return nil
}

func (b *efsFileBody) Seek(offset int64, whence int) (int64, error) {
Expand Down Expand Up @@ -562,7 +589,10 @@ func (b *s3Body) Read(p []byte) (n int, err error) {
}

func (b *s3Body) Close() error {
return b.body.Close()
if b.body != nil {
return b.body.Close()
}
return nil
}

func (b *s3Body) Seek(offset int64, whence int) (int64, error) {
Expand Down Expand Up @@ -660,6 +690,13 @@ type s3ImageData struct {
reader *s3Body
}

func (d *s3ImageData) Close() error {
if d.reader != nil {
return d.reader.Close()
}
return nil
}

func quoteETag(eTag string) string {
return "\"" + eTag + "\""
}
Expand Down Expand Up @@ -824,47 +861,91 @@ func (e *environment) getWebPReader(ctx context.Context, s3Key string, webPStatu
}

type filePath struct {
efs string
s3 string
sqs string
name string
efs string
s3JPNG string
s3WebP string
sqs string
path string
name string
}

func (e *environment) ensureWebPUpdated(
jpng *efsImageStatus,
webP *s3ImageStatus,
ctx context.Context,
jpngStatus *efsImageStatus,
webPStatus *s3ImageStatus,
jpngReader *efsImageReader,
fpath *filePath,
taskCh chan<- *task,
) {
zapPathField := zap.String("path", fpath.efs)

//
// The principle is that never change file if error occurs.
//

if jpng.err != nil {
if jpngStatus.err != nil || webPStatus.err != nil {
return
}

if webP.err != nil {
// Do nothing since the both files don't exist.
if jpngStatus.time == nil && webPStatus.time == nil {
return
}

// Do nothing since both files don't exist.
if jpng.time == nil && webP.time == nil {
if jpngStatus.time != nil && webPStatus.time != nil && jpngStatus.time.Equal(*webPStatus.time) {
return
}

// Update if only one side exists.
if jpng.time == nil || webP.time == nil {
taskCh <- &task{Path: fpath.sqs}
return
if jpngReader == nil {
jpngReaderCh := make(chan *efsImageReader)
go e.getJPNGReader(ctx, fpath.efs, jpngReaderCh)
jpngReader = <-jpngReaderCh
defer func() {
if err := jpngReader.Close(); err != nil {
e.log.Error("failed to close EFS file", zapPathField, zap.Error(err))
}
}()
} else {
jpngReader.Seek(0, io.SeekStart)
}

if jpng.time.Equal(*webP.time) {
if jpngReader.err != nil {
return
}

taskCh <- &task{Path: fpath.sqs}
if jpngReader.reader != nil && jpngStatus.time != nil {
// PUT the latest image file for processing.
var contentType string
switch filepath.Ext(fpath.s3JPNG) {
case ".jpg":
contentType = jpegContentType
case ".png":
contentType = pngContentType
default:
e.log.Error("unknown extension for images", zapPathField)
return
}

tsStr := jpngStatus.time.Format(time.RFC3339Nano)
if _, err := e.s3Client.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: &e.s3Bucket,
Key: &fpath.s3JPNG,
Body: jpngReader.reader,
ContentType: &contentType,
Metadata: map[string]*string{
pathMetadata: &fpath.path,
timestampMetadata: &tsStr,
},
}); err != nil {
e.log.Error("failed to PUT S3 source image",
zapPathField, zap.Error(err), zap.String("s3key", fpath.s3JPNG))
return
}
}
// Do nothing and just add task when the file was deleted from EFS.
// The task will delete the corresponding WebP file when no source image exists.

taskCh <- &task{Path: fpath.sqs}
}

func (e *environment) respondWithEFSFile(
Expand All @@ -883,11 +964,6 @@ func (e *environment) respondWithEFSFile(
size: status.size,
log: e.log,
}
defer func() {
if err := body.Close(); err != nil {
e.log.Error("failed to close EFS file", zap.Error(err), zap.String("path", fpath.efs))
}
}()

c.Writer.Header().Set(cacheControlHeader, cache)
c.Writer.Header().Set(eTagHeader, status.eTag)
Expand All @@ -902,11 +978,17 @@ func (e *environment) handleJPNGRequest(c *gin.Context, fpath *filePath, taskCh

go e.checkJPNGStatus(fpath.efs, jpngStatusCh)
go e.getJPNGReader(c, fpath.efs, jpngReaderCh)
go e.checkWebPStatus(c, fpath.s3, webPStatusCh)
go e.checkWebPStatus(c, fpath.s3WebP, webPStatusCh)

jpngStatus := <-jpngStatusCh
jpngReader := <-jpngReaderCh

defer func() {
if err := jpngReader.Close(); err != nil {
e.log.Error("failed to close EFS file", zap.Error(err), zap.String("path", fpath.efs))
}
}()

if jpngReader.err != nil || jpngStatus.err != nil {
e.respondWithInternalServerErrorText(c)
} else if jpngReader.reader == nil || jpngStatus.time == nil {
Expand All @@ -918,7 +1000,7 @@ func (e *environment) handleJPNGRequest(c *gin.Context, fpath *filePath, taskCh
// Now ensure WebP file exists.
webPStatus := <-webPStatusCh

e.ensureWebPUpdated(jpngStatus, webPStatus, fpath, taskCh)
e.ensureWebPUpdated(c, jpngStatus, webPStatus, jpngReader, fpath, taskCh)
}

func (e *environment) respondWithInternalServerErrorText(c *gin.Context) {
Expand All @@ -935,8 +1017,8 @@ func (e *environment) respondWithNotFoundText(c *gin.Context) {

func (e *environment) respondWithS3Object(c *gin.Context, fpath *filePath, webPData *s3ImageData) {
defer func() {
if err := webPData.reader.Close(); err != nil {
e.log.Error("failed to close S3 body", zap.Error(err), zap.String("path", fpath.s3))
if err := webPData.Close(); err != nil {
e.log.Error("failed to close S3 body", zap.Error(err), zap.String("path", fpath.s3WebP))
}
}()

Expand All @@ -947,10 +1029,12 @@ func (e *environment) respondWithS3Object(c *gin.Context, fpath *filePath, webPD
c.Writer.Flush()
}

func (e *environment) respondTemporarily(c *gin.Context, fpath *filePath, jpngStatus *efsImageStatus) {
jpngReaderCh := make(chan *efsImageReader)
go e.getJPNGReader(c, fpath.efs, jpngReaderCh)
jpngReader := <-jpngReaderCh
func (e *environment) respondTemporarily(
c *gin.Context,
fpath *filePath,
jpngStatus *efsImageStatus,
jpngReader *efsImageReader,
) {

if jpngReader.err != nil || jpngStatus.err != nil {
e.respondWithInternalServerErrorText(c)
Expand All @@ -965,28 +1049,49 @@ func (e *environment) handleWebPRequest(c *gin.Context, fpath *filePath, taskCh
webPDataCh := make(chan *s3ImageData)
jpngStatusCh := make(chan *efsImageStatus)

go e.getWebPReader(c, fpath.s3, webPDataCh)
go e.getWebPReader(c, fpath.s3WebP, webPDataCh)
go e.checkJPNGStatus(fpath.efs, jpngStatusCh)

webPData := <-webPDataCh
var jpngStatus *efsImageStatus
var jpngReader *efsImageReader
if webPData.err != nil {
e.log.Info(
"WebP is requested but JPEG/PNG will be responded due to error on S3",
zap.String("path", fpath.efs),
zap.Error(webPData.err))
jpngReaderCh := make(chan *efsImageReader)
go e.getJPNGReader(c, fpath.efs, jpngReaderCh)
jpngStatus = <-jpngStatusCh
e.respondTemporarily(c, fpath, jpngStatus)

jpngReader = <-jpngReaderCh
defer func() {
if err := jpngReader.Close(); err != nil {
e.log.Error("failed to close EFS file", zap.Error(err), zap.String("path", fpath.efs))
}
}()

e.respondTemporarily(c, fpath, jpngStatus, jpngReader)
} else if webPData.time != nil {
e.respondWithS3Object(c, fpath, webPData)
jpngStatus = <-jpngStatusCh
} else {
jpngReaderCh := make(chan *efsImageReader)
go e.getJPNGReader(c, fpath.efs, jpngReaderCh)
// WebP not yet generated
jpngStatus = <-jpngStatusCh
e.respondTemporarily(c, fpath, jpngStatus)

jpngReader = <-jpngReaderCh
defer func() {
if err := jpngReader.Close(); err != nil {
e.log.Error("failed to close EFS file", zap.Error(err), zap.String("path", fpath.efs))
}
}()

e.respondTemporarily(c, fpath, jpngStatus, jpngReader)
}

e.ensureWebPUpdated(jpngStatus, &webPData.s3ImageStatus, fpath, taskCh)
e.ensureWebPUpdated(c, jpngStatus, &webPData.s3ImageStatus, jpngReader, fpath, taskCh)
}

func (e *environment) handleRequest(c *gin.Context, path string, acceptHeader string, taskCh chan<- *task) {
Expand All @@ -999,12 +1104,15 @@ func (e *environment) handleRequest(c *gin.Context, path string, acceptHeader st
return
}
_, name := filepath.Split(path)
sanitizedPath := strings.TrimPrefix(efsAbsPath, e.efsMountPath+"/")

fpath := &filePath{
efs: efsAbsPath,
s3: filepath.Clean(filepath.Join(e.s3KeyBase, path+".webp")),
sqs: strings.TrimPrefix(efsAbsPath, e.efsMountPath+"/"),
name: name,
efs: efsAbsPath,
s3JPNG: filepath.Clean(filepath.Join(e.s3SrcKeyBase, path)),
s3WebP: filepath.Clean(filepath.Join(e.s3DestKeyBase, path+".webp")),
sqs: sanitizedPath,
path: sanitizedPath,
name: name,
}

if supportsWebP(acceptHeader) {
Expand Down
Loading

0 comments on commit 3e68a4b

Please sign in to comment.