From a9697af316a3f3fbdfa32fa391557d7f4561fa27 Mon Sep 17 00:00:00 2001 From: V Date: Mon, 31 Oct 2022 09:29:42 +0000 Subject: [PATCH] upgrade s3 go sdk (#7) Co-authored-by: venki --- s3.go | 272 +++++++++++++++++++++++++++++++++++++++-------------- scanner.go | 13 ++- 2 files changed, 210 insertions(+), 75 deletions(-) diff --git a/s3.go b/s3.go index 542db73..4e8c317 100644 --- a/s3.go +++ b/s3.go @@ -1,97 +1,221 @@ package main import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "context" + "net/http" + "github.com/aws/smithy-go" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" "net/url" "time" "fmt" "os" + "strconv" "errors" ) + +func getPartSize() int64 { + var partSize int64 + + strSizeInMb, err := os.LookupEnv("DOWNLOAD_PART_SIZE") + + if !err { + elog.Println(time.Now().Format(time.RFC3339) + "DOWNLOAD_PART_SIZE is not present..using DefaultDownloadPartSize ") + partSize = manager.DefaultDownloadPartSize + } else { + sizeInMb, err := strconv.Atoi(strSizeInMb) + if err != nil { + elog.Println(time.Now().Format(time.RFC3339) + "DOWNLOAD_PART_SIZE conversion issue..using DefaultDownloadPartSize ") + partSize = manager.DefaultDownloadPartSize + } else { + partSize = int64(sizeInMb) * 1024 * 1204 + } + } + return partSize +} + +func getRegion() string { + region, err := os.LookupEnv("AWS_REGION") + if !err { + elog.Println(time.Now().Format(time.RFC3339) + "AWS_REGION is not present..using us-east-1") + region = "us-east-1" + } + return region +} + // check if a bucket exists. func bucketExists(bucket string) (bool, error) { - awsSession, _ := session.NewSession(&aws.Config{ - Region: aws.String(getRegion())}, + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(getRegion()), ) - - svc := s3.New(awsSession) - input := &s3.HeadBucketInput{ - Bucket: aws.String(bucket), + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: Filed to load config for bucket "+bucket + " error : " + err.Error()) + return false, errors.New("Filed to load config") } - _, err := svc.HeadBucket(input) + s3client := s3.NewFromConfig(cfg) + + _, err = s3client.HeadBucket(context.TODO(),&s3.HeadBucketInput{Bucket: aws.String(bucket)}) + if err != nil { - if aerr, ok := err.(awserr.Error); ok { - switch aerr.Code() { - case s3.ErrCodeNoSuchBucket: - return false,nil + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + var httpResponseErr *awshttp.ResponseError + if errors.As(err, &httpResponseErr) { + switch httpResponseErr.HTTPStatusCode() { + case http.StatusMovedPermanently: + elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + " error : " + err.Error()) + return false, errors.New("Bucket StatusMovedPermanently ") + case http.StatusForbidden: + elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + " error : " + err.Error()) + return false, errors.New("Bucket StatusForbidden") + case http.StatusNotFound: + elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + " error : " + err.Error()) + return false, nil default: - elog.Println( time.Now().Format(time.RFC3339) + " bucketExists failed for bucket "+bucket + " error : " + err.Error()) - return false, errors.New("Filed to find bucket") + elog.Println(time.Now().Format(time.RFC3339) + " bucketExists: ResponseError failed for bucket "+bucket + "with error: "+err.Error()) + return false, errors.New("Filed to find bucket") + } + } else { + elog.Println(time.Now().Format(time.RFC3339) + " bucketExists: ApiError failed for bucket "+bucket + "with error: "+err.Error()) + return false, errors.New("Filed to find bucket") } + } else { + elog.Println(time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + "with error: "+err.Error()) + return false, errors.New("Filed to find bucket") } - elog.Println( time.Now().Format(time.RFC3339) + " bucketExists got unknown error for bucket "+bucket + " error : " + err.Error()) - return false, errors.New("Filed to find bucket") } return true,nil } -// check if a file exists. -func keyExists(bucket string, key string) (bool, error) { - awsSession, _ := session.NewSession(&aws.Config{ - Region: aws.String(getRegion())}, +func getHeadObject(bucket string, key string) (*s3.HeadObjectOutput, error) { + + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(getRegion()), ) + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: Filed to load config for bucket "+bucket + " error : " + err.Error()) + return nil, errors.New("Filed to load config") + } - svc := s3.New(awsSession) + s3client := s3.NewFromConfig(cfg) - _, err := svc.HeadObject(&s3.HeadObjectInput{ + headObjectResponse, err := s3client.HeadObject(context.TODO(), &s3.HeadObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), }) + if err != nil { - if aerr, ok := err.(awserr.Error); ok { - switch aerr.Code() { - case "NotFound": // s3.ErrCodeNoSuchKey does not work, aws is missing this error code so we hardwire a string - elog.Println( time.Now().Format(time.RFC3339) + " keyExists got NotFound error for " +key+ " bucket "+bucket + " error : " + err.Error()) - return false, nil - default: - elog.Println( time.Now().Format(time.RFC3339) + " keyExists failed for " +key+ " bucket "+bucket + " error : " + err.Error()) - return false, errors.New("Filed to find file") + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + var httpResponseErr *awshttp.ResponseError + if errors.As(err, &httpResponseErr) { + switch httpResponseErr.HTTPStatusCode() { + case http.StatusMovedPermanently: + elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" error : " + err.Error()) + return nil, errors.New("Bucket StatusMovedPermanently ") + case http.StatusForbidden: + elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" error : " + err.Error()) + return nil, errors.New("Bucket StatusForbidden") + case http.StatusNotFound: + elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" error : " + err.Error()) + return nil, errors.New("Bucket StatusNotFound") + default: + elog.Println(time.Now().Format(time.RFC3339) + " getHeadObject: ResponseError failed for bucket "+bucket +" key "+key+" with error: "+err.Error()) + return nil, errors.New("Filed to find object") + } + } else { + elog.Println(time.Now().Format(time.RFC3339) + " getHeadObject: APIError failed for bucket "+bucket +" key "+key+" with error: "+err.Error()) + return nil, errors.New("Filed to find object") } + } else { + elog.Println(time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" with error: "+err.Error()) + return nil, errors.New("Filed to find object") } - elog.Println( time.Now().Format(time.RFC3339) + " keyExists got unknown error for " +key+ " bucket "+bucket + " error : " + err.Error()) - return false, errors.New("Filed to find file") } - return true, nil + + return headObjectResponse, nil } +// check if a file exists. +func keyExists(bucket string, key string) (bool, error) { -func getRegion() string { - region, err := os.LookupEnv("AWS_REGION") - if !err { - fmt.Println("AWS_REGION is not present..using us-east-1") - region = "us-east-1" + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(getRegion()), + ) + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + "keyExists: Filed to load config for bucket "+bucket +" key "+key+" error : " + err.Error()) + return false, errors.New("Filed to load config") } - return region + + s3client := s3.NewFromConfig(cfg) + + _, err = s3client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + var httpResponseErr *awshttp.ResponseError + if errors.As(err, &httpResponseErr) { + switch httpResponseErr.HTTPStatusCode() { + case http.StatusNotFound: + elog.Println( time.Now().Format(time.RFC3339) + " keyExists: failed for bucket "+bucket +" key "+key+" error : " + err.Error()) + return false, nil + default: + elog.Println(time.Now().Format(time.RFC3339) + " keyExists: ResponseError failed for bucket "+bucket +" key "+key+" with error: "+err.Error()) + return false, errors.New("Filed to find key") + } + } else { + elog.Println(time.Now().Format(time.RFC3339) + " keyExists: APIErrorfailed for bucket "+bucket +" key "+key+" with error: "+err.Error()) + return false, errors.New("Filed to find key") + } + } else { + elog.Println(time.Now().Format(time.RFC3339) + " keyExists: failed for bucket "+bucket +" key "+key+" with error: "+err.Error()) + return false, errors.New("Filed to find key") + } + } + + return true, nil } func readFile(bucket string, item string) ([] byte, error) { - awsSession, _ := session.NewSession(&aws.Config{ - Region: aws.String(getRegion())}, + // Load AWS Config + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(getRegion()), ) - - buff := &aws.WriteAtBuffer{} + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + " readFile: Filed to load config to read file " +item+ " from bucket "+bucket + " error : " + err.Error()) + return nil, errors.New("Filed to load config") + } + + // Create an S3 client using the loaded configuration + s3client := s3.NewFromConfig(cfg) - s3dl := s3manager.NewDownloader(awsSession) + // Create a downloader with the client and custom downloader options + downloader := manager.NewDownloader(s3client, func(d *manager.Downloader) { + d.PartSize = getPartSize() + }) - _, err := s3dl.Download(buff, &s3.GetObjectInput{ + headObject, err := getHeadObject(bucket,item) + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + " readFile: getHeadObject failed " +item+ " from bucket "+bucket + " error : " + err.Error()) + return nil, errors.New("Filed to read file") + } + // pre-allocate in memory buffer, where headObject type is *s3.HeadObjectOutput + buff := make([]byte, int(headObject.ContentLength)) + // wrap with aws.WriteAtBuffer + w := manager.NewWriteAtBuffer(buff) + // download file into the memory + _, err = downloader.Download(context.TODO(), w, &s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(item), }) @@ -103,54 +227,60 @@ func readFile(bucket string, item string) ([] byte, error) { info.Println(time.Now().Format(time.RFC3339) +" Downloaded file "+item+ " from bucket "+bucket) - return buff.Bytes(), nil + return buff, nil } func copyFile(bucket string, item string, other string) (error){ - awsSession, _ := session.NewSession(&aws.Config{ - Region: aws.String(getRegion())}, + // Load AWS Config + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(getRegion()), ) + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + " copyFile: Filed to load config to read file " +item+ " from bucket "+bucket + " error : " + err.Error()) + return errors.New("Filed to load config") + } - // Create S3 service client - svc := s3.New(awsSession) + // Create an S3 client using the loaded configuration + s3client := s3.NewFromConfig(cfg) source := bucket + "/" + item - // Copy the file - _, err := svc.CopyObject(&s3.CopyObjectInput{Bucket: aws.String(other), - CopySource: aws.String(url.PathEscape(source)), Key: aws.String(item), ACL: aws.String("bucket-owner-full-control")}) + _, err = s3client.CopyObject(context.TODO(), &s3.CopyObjectInput{ + Bucket: aws.String(other), + CopySource: aws.String(url.PathEscape(source)), + Key: aws.String(item), + }) if err != nil { elog.Println( time.Now().Format(time.RFC3339) + " Unable to read file " +item+ " from bucket "+bucket+ " to bucket "+other+" error : " + err.Error()) return errors.New("Unable to copy file") } - // Wait to see if the file got copied - err = svc.WaitUntilObjectExists(&s3.HeadObjectInput{Bucket: aws.String(other), Key: aws.String(item)}) - if err != nil { - elog.Println( time.Now().Format(time.RFC3339) + " Error occurred while waiting for file " +item+ " to be copied to bucket "+other+ " error: "+ fmt.Sprint(err)) - return errors.New("Error while waiting for file to copy") - } - info.Println( time.Now().Format(time.RFC3339) + " File "+ item+ " successfully copied from bucket "+bucket+ " to bucket "+other) return nil } func deleteFile(bucket string, item string) (error) { - awsSession, _ := session.NewSession(&aws.Config{ - Region: aws.String(getRegion())}, + + // Load AWS Config + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(getRegion()), ) + if err != nil { + elog.Println( time.Now().Format(time.RFC3339) + " deleteFile: Filed to load config to read file " +item+ " from bucket "+bucket + " error : " + err.Error()) + return errors.New("Filed to load config") + } - // Create S3 service client - svc := s3.New(awsSession) + // Create an S3 client using the loaded configuration + s3client := s3.NewFromConfig(cfg) - params := &s3.DeleteObjectInput{ + _, err = s3client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ Bucket: aws.String(bucket), Key: aws.String(item), - } - _, err := svc.DeleteObject(params) + }) + if err != nil { elog.Println( time.Now().Format(time.RFC3339) + " Error occurred while deleting file " +item+ " from bucket "+bucket+" err: "+ fmt.Sprint(err)) return errors.New("Error occurred while deleting file") diff --git a/scanner.go b/scanner.go index 55c1a0f..4e36532 100644 --- a/scanner.go +++ b/scanner.go @@ -42,17 +42,22 @@ func (self *Scanner) scanstream(data []byte) (*ScanResponse) { scanResponse.data = yaraScannerResponse scanResponse.err = yaraerr + yaraRespJson, _ := json.Marshal(yaraScannerResponse) + info.Println( time.Now().Format(time.RFC3339) + " yarascan scan result " + string(yaraRespJson)) + if (yaraerr == nil) && len(yaraScannerResponse.Matches) > 0 { - resp, _ := json.Marshal(yaraScannerResponse) - info.Println( time.Now().Format(time.RFC3339) + " Found matches with yara " + string(resp)) + info.Println( time.Now().Format(time.RFC3339) + " Found matches with yara " + string(yaraRespJson)) } info.Println("Running clamscan on addr: "+ clamdaddr) clamScannerResponse,clamerr := ScanStream(&self.clamscanner, data) + + clamRespJson, _ := json.Marshal(clamScannerResponse) + info.Println( time.Now().Format(time.RFC3339) + " clamav scan result " + string(clamRespJson)) + if (clamerr == nil) && len(clamScannerResponse.Matches) > 0 { - resp, _ := json.Marshal(yaraScannerResponse) - info.Println( time.Now().Format(time.RFC3339) + " Found matches with clamav" + string(resp)) + info.Println( time.Now().Format(time.RFC3339) + " Found matches with clamav" + string(clamRespJson)) scanResponse.data = clamScannerResponse }