Skip to content

Commit

Permalink
upgrade s3 go sdk (#7)
Browse files Browse the repository at this point in the history
Co-authored-by: venki <bot@cloudina.co.uk>
  • Loading branch information
venky999 and venki authored Oct 31, 2022
1 parent 356d45a commit a9697af
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 75 deletions.
272 changes: 201 additions & 71 deletions s3.go
Original file line number Diff line number Diff line change
@@ -1,97 +1,221 @@
package main

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"context"
"net/http"
"github.com/aws/smithy-go"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"net/url"
"time"
"fmt"
"os"
"strconv"
"errors"
)


func getPartSize() int64 {
var partSize int64

strSizeInMb, err := os.LookupEnv("DOWNLOAD_PART_SIZE")

if !err {
elog.Println(time.Now().Format(time.RFC3339) + "DOWNLOAD_PART_SIZE is not present..using DefaultDownloadPartSize ")
partSize = manager.DefaultDownloadPartSize
} else {
sizeInMb, err := strconv.Atoi(strSizeInMb)
if err != nil {
elog.Println(time.Now().Format(time.RFC3339) + "DOWNLOAD_PART_SIZE conversion issue..using DefaultDownloadPartSize ")
partSize = manager.DefaultDownloadPartSize
} else {
partSize = int64(sizeInMb) * 1024 * 1204
}
}
return partSize
}

func getRegion() string {
region, err := os.LookupEnv("AWS_REGION")
if !err {
elog.Println(time.Now().Format(time.RFC3339) + "AWS_REGION is not present..using us-east-1")
region = "us-east-1"
}
return region
}

// check if a bucket exists.
func bucketExists(bucket string) (bool, error) {
awsSession, _ := session.NewSession(&aws.Config{
Region: aws.String(getRegion())},
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(getRegion()),
)

svc := s3.New(awsSession)
input := &s3.HeadBucketInput{
Bucket: aws.String(bucket),
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: Filed to load config for bucket "+bucket + " error : " + err.Error())
return false, errors.New("Filed to load config")
}

_, err := svc.HeadBucket(input)
s3client := s3.NewFromConfig(cfg)

_, err = s3client.HeadBucket(context.TODO(),&s3.HeadBucketInput{Bucket: aws.String(bucket)})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
return false,nil
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) {
switch httpResponseErr.HTTPStatusCode() {
case http.StatusMovedPermanently:
elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + " error : " + err.Error())
return false, errors.New("Bucket StatusMovedPermanently ")
case http.StatusForbidden:
elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + " error : " + err.Error())
return false, errors.New("Bucket StatusForbidden")
case http.StatusNotFound:
elog.Println( time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + " error : " + err.Error())
return false, nil
default:
elog.Println( time.Now().Format(time.RFC3339) + " bucketExists failed for bucket "+bucket + " error : " + err.Error())
return false, errors.New("Filed to find bucket")
elog.Println(time.Now().Format(time.RFC3339) + " bucketExists: ResponseError failed for bucket "+bucket + "with error: "+err.Error())
return false, errors.New("Filed to find bucket")
}
} else {
elog.Println(time.Now().Format(time.RFC3339) + " bucketExists: ApiError failed for bucket "+bucket + "with error: "+err.Error())
return false, errors.New("Filed to find bucket")
}
} else {
elog.Println(time.Now().Format(time.RFC3339) + " bucketExists: failed for bucket "+bucket + "with error: "+err.Error())
return false, errors.New("Filed to find bucket")
}
elog.Println( time.Now().Format(time.RFC3339) + " bucketExists got unknown error for bucket "+bucket + " error : " + err.Error())
return false, errors.New("Filed to find bucket")
}

return true,nil
}

// check if a file exists.
func keyExists(bucket string, key string) (bool, error) {
awsSession, _ := session.NewSession(&aws.Config{
Region: aws.String(getRegion())},
func getHeadObject(bucket string, key string) (*s3.HeadObjectOutput, error) {

cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(getRegion()),
)
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: Filed to load config for bucket "+bucket + " error : " + err.Error())
return nil, errors.New("Filed to load config")
}

svc := s3.New(awsSession)
s3client := s3.NewFromConfig(cfg)

_, err := svc.HeadObject(&s3.HeadObjectInput{
headObjectResponse, err := s3client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case "NotFound": // s3.ErrCodeNoSuchKey does not work, aws is missing this error code so we hardwire a string
elog.Println( time.Now().Format(time.RFC3339) + " keyExists got NotFound error for " +key+ " bucket "+bucket + " error : " + err.Error())
return false, nil
default:
elog.Println( time.Now().Format(time.RFC3339) + " keyExists failed for " +key+ " bucket "+bucket + " error : " + err.Error())
return false, errors.New("Filed to find file")
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) {
switch httpResponseErr.HTTPStatusCode() {
case http.StatusMovedPermanently:
elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" error : " + err.Error())
return nil, errors.New("Bucket StatusMovedPermanently ")
case http.StatusForbidden:
elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" error : " + err.Error())
return nil, errors.New("Bucket StatusForbidden")
case http.StatusNotFound:
elog.Println( time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" error : " + err.Error())
return nil, errors.New("Bucket StatusNotFound")
default:
elog.Println(time.Now().Format(time.RFC3339) + " getHeadObject: ResponseError failed for bucket "+bucket +" key "+key+" with error: "+err.Error())
return nil, errors.New("Filed to find object")
}
} else {
elog.Println(time.Now().Format(time.RFC3339) + " getHeadObject: APIError failed for bucket "+bucket +" key "+key+" with error: "+err.Error())
return nil, errors.New("Filed to find object")
}
} else {
elog.Println(time.Now().Format(time.RFC3339) + " getHeadObject: failed for bucket "+bucket +" key "+key+" with error: "+err.Error())
return nil, errors.New("Filed to find object")
}
elog.Println( time.Now().Format(time.RFC3339) + " keyExists got unknown error for " +key+ " bucket "+bucket + " error : " + err.Error())
return false, errors.New("Filed to find file")
}
return true, nil

return headObjectResponse, nil
}

// check if a file exists.
func keyExists(bucket string, key string) (bool, error) {

func getRegion() string {
region, err := os.LookupEnv("AWS_REGION")
if !err {
fmt.Println("AWS_REGION is not present..using us-east-1")
region = "us-east-1"
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(getRegion()),
)
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + "keyExists: Filed to load config for bucket "+bucket +" key "+key+" error : " + err.Error())
return false, errors.New("Filed to load config")
}
return region

s3client := s3.NewFromConfig(cfg)

_, err = s3client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})

if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) {
switch httpResponseErr.HTTPStatusCode() {
case http.StatusNotFound:
elog.Println( time.Now().Format(time.RFC3339) + " keyExists: failed for bucket "+bucket +" key "+key+" error : " + err.Error())
return false, nil
default:
elog.Println(time.Now().Format(time.RFC3339) + " keyExists: ResponseError failed for bucket "+bucket +" key "+key+" with error: "+err.Error())
return false, errors.New("Filed to find key")
}
} else {
elog.Println(time.Now().Format(time.RFC3339) + " keyExists: APIErrorfailed for bucket "+bucket +" key "+key+" with error: "+err.Error())
return false, errors.New("Filed to find key")
}
} else {
elog.Println(time.Now().Format(time.RFC3339) + " keyExists: failed for bucket "+bucket +" key "+key+" with error: "+err.Error())
return false, errors.New("Filed to find key")
}
}

return true, nil
}

func readFile(bucket string, item string) ([] byte, error) {

awsSession, _ := session.NewSession(&aws.Config{
Region: aws.String(getRegion())},
// Load AWS Config
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(getRegion()),
)

buff := &aws.WriteAtBuffer{}
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " readFile: Filed to load config to read file " +item+ " from bucket "+bucket + " error : " + err.Error())
return nil, errors.New("Filed to load config")
}

// Create an S3 client using the loaded configuration
s3client := s3.NewFromConfig(cfg)

s3dl := s3manager.NewDownloader(awsSession)
// Create a downloader with the client and custom downloader options
downloader := manager.NewDownloader(s3client, func(d *manager.Downloader) {
d.PartSize = getPartSize()
})

_, err := s3dl.Download(buff, &s3.GetObjectInput{
headObject, err := getHeadObject(bucket,item)
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " readFile: getHeadObject failed " +item+ " from bucket "+bucket + " error : " + err.Error())
return nil, errors.New("Filed to read file")
}
// pre-allocate in memory buffer, where headObject type is *s3.HeadObjectOutput
buff := make([]byte, int(headObject.ContentLength))
// wrap with aws.WriteAtBuffer
w := manager.NewWriteAtBuffer(buff)
// download file into the memory
_, err = downloader.Download(context.TODO(), w, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(item),
})
Expand All @@ -103,54 +227,60 @@ func readFile(bucket string, item string) ([] byte, error) {

info.Println(time.Now().Format(time.RFC3339) +" Downloaded file "+item+ " from bucket "+bucket)

return buff.Bytes(), nil
return buff, nil
}

func copyFile(bucket string, item string, other string) (error){

awsSession, _ := session.NewSession(&aws.Config{
Region: aws.String(getRegion())},
// Load AWS Config
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(getRegion()),
)
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " copyFile: Filed to load config to read file " +item+ " from bucket "+bucket + " error : " + err.Error())
return errors.New("Filed to load config")
}

// Create S3 service client
svc := s3.New(awsSession)
// Create an S3 client using the loaded configuration
s3client := s3.NewFromConfig(cfg)

source := bucket + "/" + item

// Copy the file
_, err := svc.CopyObject(&s3.CopyObjectInput{Bucket: aws.String(other),
CopySource: aws.String(url.PathEscape(source)), Key: aws.String(item), ACL: aws.String("bucket-owner-full-control")})
_, err = s3client.CopyObject(context.TODO(), &s3.CopyObjectInput{
Bucket: aws.String(other),
CopySource: aws.String(url.PathEscape(source)),
Key: aws.String(item),
})

if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " Unable to read file " +item+ " from bucket "+bucket+ " to bucket "+other+" error : " + err.Error())
return errors.New("Unable to copy file")
}

// Wait to see if the file got copied
err = svc.WaitUntilObjectExists(&s3.HeadObjectInput{Bucket: aws.String(other), Key: aws.String(item)})
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " Error occurred while waiting for file " +item+ " to be copied to bucket "+other+ " error: "+ fmt.Sprint(err))
return errors.New("Error while waiting for file to copy")
}

info.Println( time.Now().Format(time.RFC3339) + " File "+ item+ " successfully copied from bucket "+bucket+ " to bucket "+other)

return nil
}

func deleteFile(bucket string, item string) (error) {
awsSession, _ := session.NewSession(&aws.Config{
Region: aws.String(getRegion())},

// Load AWS Config
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(getRegion()),
)
if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " deleteFile: Filed to load config to read file " +item+ " from bucket "+bucket + " error : " + err.Error())
return errors.New("Filed to load config")
}

// Create S3 service client
svc := s3.New(awsSession)
// Create an S3 client using the loaded configuration
s3client := s3.NewFromConfig(cfg)

params := &s3.DeleteObjectInput{
_, err = s3client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(item),
}
_, err := svc.DeleteObject(params)
})

if err != nil {
elog.Println( time.Now().Format(time.RFC3339) + " Error occurred while deleting file " +item+ " from bucket "+bucket+" err: "+ fmt.Sprint(err))
return errors.New("Error occurred while deleting file")
Expand Down
13 changes: 9 additions & 4 deletions scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,22 @@ func (self *Scanner) scanstream(data []byte) (*ScanResponse) {
scanResponse.data = yaraScannerResponse
scanResponse.err = yaraerr

yaraRespJson, _ := json.Marshal(yaraScannerResponse)
info.Println( time.Now().Format(time.RFC3339) + " yarascan scan result " + string(yaraRespJson))

if (yaraerr == nil) && len(yaraScannerResponse.Matches) > 0 {
resp, _ := json.Marshal(yaraScannerResponse)
info.Println( time.Now().Format(time.RFC3339) + " Found matches with yara " + string(resp))
info.Println( time.Now().Format(time.RFC3339) + " Found matches with yara " + string(yaraRespJson))
}

info.Println("Running clamscan on addr: "+ clamdaddr)

clamScannerResponse,clamerr := ScanStream(&self.clamscanner, data)

clamRespJson, _ := json.Marshal(clamScannerResponse)
info.Println( time.Now().Format(time.RFC3339) + " clamav scan result " + string(clamRespJson))

if (clamerr == nil) && len(clamScannerResponse.Matches) > 0 {
resp, _ := json.Marshal(yaraScannerResponse)
info.Println( time.Now().Format(time.RFC3339) + " Found matches with clamav" + string(resp))
info.Println( time.Now().Format(time.RFC3339) + " Found matches with clamav" + string(clamRespJson))
scanResponse.data = clamScannerResponse
}

Expand Down

0 comments on commit a9697af

Please sign in to comment.