-
Notifications
You must be signed in to change notification settings - Fork 5
/
upload.go
99 lines (88 loc) · 2.55 KB
/
upload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package dxfuse
import (
"context"
"math"
"runtime"
"sync"
"github.com/dnanexus/dxda"
)
const (
// Upload up to 4 parts concurrently
maxUploadRoutines = 4
)
// TODO replace this with a more reasonble buffer pool for managing memory use
func (uploader *FileUploader) AllocateWriteBuffer(partId int, block bool) []byte {
if partId < 1 {
partId = 1
}
// Wait for available buffer
if block {
uploader.writeBufferChan <- struct{}{}
}
writeBufferCapacity := math.Min(InitialUploadPartSize*math.Pow(1.1, float64(partId)), MaxUploadPartSize)
writeBufferCapacity = math.Round(writeBufferCapacity)
writeBuffer := make([]byte, 0, int64(writeBufferCapacity))
return writeBuffer
}
type UploadRequest struct {
fh *FileHandle
writeBuffer []byte
fileId string
partId int
}
type FileUploader struct {
verbose bool
uploadQueue chan UploadRequest
wg sync.WaitGroup
numUploadRoutines int
// Max concurrent write buffers to reduce memory consumption
writeBufferChan chan struct{}
// API to dx
ops *DxOps
}
// write a log message, and add a header
func (uploader *FileUploader) log(a string, args ...interface{}) {
LogMsg("uploader", a, args...)
}
func NewFileUploader(verboseLevel int, options Options, dxEnv dxda.DXEnvironment) *FileUploader {
concurrentWriteBufferLimit := 15
if runtime.NumCPU()*3 > concurrentWriteBufferLimit {
concurrentWriteBufferLimit = runtime.NumCPU() * 3
}
uploader := &FileUploader{
verbose: verboseLevel >= 1,
uploadQueue: make(chan UploadRequest),
writeBufferChan: make(chan struct{}, concurrentWriteBufferLimit),
numUploadRoutines: maxUploadRoutines,
ops: NewDxOps(dxEnv, options),
}
uploader.wg.Add(maxUploadRoutines)
for i := 0; i < maxUploadRoutines; i++ {
go uploader.uploadWorker()
}
return uploader
}
func (uploader *FileUploader) Shutdown() {
// Close channel and wait for goroutines to complete
close(uploader.uploadQueue)
close(uploader.writeBufferChan)
uploader.wg.Wait()
}
func (uploader *FileUploader) uploadWorker() {
// reuse this http client
httpClient := dxda.NewHttpClient()
for true {
uploadReq, ok := <-uploader.uploadQueue
if !ok {
uploader.wg.Done()
return
}
err := uploader.ops.DxFileUploadPart(context.TODO(), httpClient, uploadReq.fileId, uploadReq.partId, uploadReq.writeBuffer)
if err != nil {
// Record upload error in FileHandle
uploader.log("Error uploading %s, part %d, %s", uploadReq.fileId, uploadReq.partId, err.Error())
uploadReq.fh.writeError = err
}
uploadReq.fh.wg.Done()
}
}