forked from shelmangroup/lfs-server-s3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tus.go
160 lines (146 loc) · 3.83 KB
/
tus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"bufio"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type TusServer struct {
serverMutex sync.Mutex
tusProcess *exec.Cmd
dataPath string
tusBaseUrl string
httpClient *http.Client
oidToTusUrl map[string]string
}
var (
tusServer *TusServer = &TusServer{}
)
// Start launches the tus server & stores uploads in the given contentPath
func (t *TusServer) Start() {
t.serverMutex.Lock()
defer t.serverMutex.Unlock()
if t.tusProcess != nil {
return
}
t.dataPath = filepath.Join(os.TempDir(), "lfs_tusserver")
hostparts := strings.Split(Config.TusHost, ":")
host := "localhost"
port := "1080"
if len(hostparts) > 0 {
host = hostparts[0]
}
if len(hostparts) > 1 {
port = hostparts[1]
}
t.tusProcess = exec.Command("tusd",
"-dir", t.dataPath,
"-host", host,
"-port", port)
// Make sure tus server is started before continuing
var procWait sync.WaitGroup
procWait.Add(1)
go func(p *exec.Cmd) {
stdout, err := p.StdoutPipe()
if err != nil {
panic(fmt.Sprintf("Error getting tus server stdout: %v", err))
}
stderr, err := p.StderrPipe()
if err != nil {
panic(fmt.Sprintf("Error getting tus server stderr: %v", err))
}
err = p.Start()
if err != nil {
panic(fmt.Sprintf("Error starting tus server: %v", err))
}
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.WithFields(log.Fields{"fn": "tusout", "msg": scanner.Text()})
}
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.WithFields(log.Fields{"fn": "tuserr", "msg": scanner.Text()})
}
}()
time.Sleep(2)
procWait.Done()
defer p.Wait()
}(t.tusProcess)
procWait.Wait()
log.WithFields(log.Fields{"fn": "Start", "msg": "Tus server started"})
t.tusBaseUrl = fmt.Sprintf("http://%s:%s/files/", host, port)
t.httpClient = &http.Client{}
t.oidToTusUrl = make(map[string]string)
}
func (t *TusServer) Stop() {
t.serverMutex.Lock()
defer t.serverMutex.Unlock()
if t.tusProcess != nil {
t.tusProcess.Process.Kill()
t.tusProcess = nil
}
log.WithFields(log.Fields{"fn": "Stop", "msg": "Tus server stopped"})
}
// Create a new upload URL for the given object
// Required to call CREATE on the tus API before uploading but not part of LFS API
func (t *TusServer) Create(oid string, size int64) (string, error) {
t.serverMutex.Lock()
defer t.serverMutex.Unlock()
req, err := http.NewRequest("POST", t.tusBaseUrl, nil)
if err != nil {
return "", err
}
req.Header.Set("Tus-Resumable", "1.0.0")
req.Header.Set("Upload-Length", fmt.Sprintf("%d", size))
req.Header.Set("Upload-Metadata", fmt.Sprintf("oid %s", oid))
res, err := t.httpClient.Do(req)
if err != nil {
return "", err
}
if res.StatusCode != 201 {
return "", fmt.Errorf("Expected tus status code 201, got %d", res.StatusCode)
}
loc := res.Header.Get("Location")
if len(loc) == 0 {
return "", fmt.Errorf("Missing Location header in tus response")
}
t.oidToTusUrl[oid] = loc
return loc, nil
}
// Move the finished uploaded data from TUS to the content store (called by verify)
func (t *TusServer) Finish(oid string, store *S3ContentStore) error {
t.serverMutex.Lock()
defer t.serverMutex.Unlock()
loc, ok := t.oidToTusUrl[oid]
if !ok {
return fmt.Errorf("Unable to find upload for %s", oid)
}
parts := strings.Split(loc, "/")
filename := filepath.Join(t.dataPath, fmt.Sprintf("%s.bin", parts[len(parts)-1]))
stat, err := os.Stat(filename)
if err != nil {
return err
}
meta := &MetaObject{Oid: oid, Size: stat.Size(), Existing: false}
f, err := os.Open(filename)
if err != nil {
return err
}
defer f.Close()
err = store.Put(meta, f)
if err == nil {
os.Remove(filename)
// tus also stores a .info file, remove that
os.Remove(filepath.Join(t.dataPath, fmt.Sprintf("%s.info", parts[len(parts)-1])))
}
return err
}