-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
143 lines (126 loc) · 4.14 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
const name = "PoCSequentiality"
var (
bucket = aws.String("aev-autonomous-driving-dataset")
key = aws.String("tutorial.html") //camera_lidar-20180810150607_bus_signals.tar") //camera_lidar_semantic_bus.tar") //"camera_lidar-20190401121727_lidar_frontcenter.tar")
region = aws.String("eu-central-1")
concurrency = 16
partSize = int64(32 * 1024 * 1024) // 32 MiB
logEnabled bool
notAnon bool
)
func main() {
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "%s is a simple CLI tool that aims to increase AWS S3 download throughput by taking advantage of sequential writes in addition to concurreny.\n", name)
fmt.Fprintf(flag.CommandLine.Output(), "%s is designed to be Proof of Concept and prototype not an end product in any way.\n", name)
fmt.Fprintln(flag.CommandLine.Output())
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", name)
flag.PrintDefaults()
}
flag.BoolVar(¬Anon, "notanon", false, "use default credentials instead of anonymous credentials")
flag.BoolVar(&logEnabled, "log", false, "whether logs will be printed")
flag.StringVar(bucket, "bucket", aws.ToString(bucket), "bucket name")
flag.StringVar(key, "key", aws.ToString(key), "object key")
flag.StringVar(region, "region", aws.ToString(region), "bucket region")
flag.IntVar(&concurrency, "concurrency", concurrency, "concurrency")
flag.Int64Var(&partSize, "partsize", partSize, "bucket region")
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if logEnabled {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
} else {
log.SetFlags(0)
log.SetOutput(io.Discard)
}
// Using the SDK's default configuration, loading additional config
// and credentials values from the environment variables, shared
// credentials, and shared configuration files
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
fnsl := make([]func(options *s3.Options), 0, 2)
fnsl = append(fnsl, func(options *s3.Options) {
options.Region = aws.ToString(region)
})
if !notAnon {
fnsl = append(fnsl, func(options *s3.Options) {
options.Credentials = aws.AnonymousCredentials{}
})
}
client := s3.NewFromConfig(cfg, fnsl...)
f, err := os.Create(*key)
if err != nil {
log.Fatalf("unable to create file, %v", err)
}
log.Printf("file: %q\n", f.Name())
obj, err := client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: bucket,
Key: key,
})
if err != nil {
log.Fatalf("unable to get object attr, %v", err)
}
log.Printf("Etag %s, size in bytes %d, size in KB ~%d, size in MB ~%d\n", *obj.ETag, obj.ContentLength, obj.ContentLength>>10, obj.ContentLength>>20)
var r int64 = 0
size := obj.ContentLength
sem := make(chan struct{}, concurrency)
for i := 0; i < concurrency; i++ {
sem <- struct{}{}
}
var wg sync.WaitGroup
// there is no enforced limitation in number of workers but the number of
// concurrently working workers are limited with semaphore "sem"
for r < size {
log.Printf("r: %v, size: %v\n", r, size)
wg.Add(1)
rStart := r
r = Min(r+partSize, size)
go func(start, end int64) {
<-sem
defer func() {
sem <- struct{}{}
wg.Done()
}()
rng := strconv.FormatInt(start, 10) + "-" + strconv.FormatInt(end-1, 10)
log.Printf("Range: %20s, %20s\n", rng, "BEGIN")
out, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: bucket,
Key: key,
Range: aws.String("bytes=" + rng),
})
log.Printf("Range: %20s, %20s\n", rng, "DOWNLOADED")
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
log.Printf("Range: %20s, %20s\n", rng, "COPY BUFFER")
buf := bytes.NewBuffer(make([]byte, 0, end-start))
io.Copy(buf, out.Body)
log.Printf("Range: %20s, %20s\n", rng, "STARTED WRITING")
f.WriteAt(buf.Bytes(), start)
log.Printf("Range: %20s, %20s\n", rng, "WROTE")
}(rStart, r)
}
wg.Wait()
}
func Min(x, y int64) int64 {
if x < y {
return x
}
return y
}