-
Notifications
You must be signed in to change notification settings - Fork 5
/
stream.go
83 lines (74 loc) · 2.28 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package simpleblob
import (
"bytes"
"context"
"io"
"io/fs"
)
// A StreamReader is an Interface providing an optimized way to create an io.ReadCloser.
type StreamReader interface {
Interface
// NewReader returns an io.ReadCloser, allowing stream reading
// of named blob from the underlying backend.
NewReader(ctx context.Context, name string) (io.ReadCloser, error)
}
// A StreamWriter is an Interface providing an optimized way to create an io.WriteCloser.
type StreamWriter interface {
Interface
// NewWriter returns an io.WriteCloser, allowing stream writing
// to named blob in the underlying backend.
NewWriter(ctx context.Context, name string) (io.WriteCloser, error)
}
// NewReader allows reading a named blob from st.
// It returns an optimized io.ReadCloser if available, else a basic buffered implementation.
func NewReader(ctx context.Context, st Interface, name string) (io.ReadCloser, error) {
if sst, ok := st.(StreamReader); ok {
return sst.NewReader(ctx, name)
}
b, err := st.Load(ctx, name)
if err != nil {
return nil, err
}
return io.NopCloser(bytes.NewReader(b)), nil
}
// A fallbackWriter wraps a backend to satisfy io.WriteCloser.
// The bytes written to it are buffered, then sent to backend when closed.
type fallbackWriter struct {
st Interface
ctx context.Context
name string
closed bool
buf bytes.Buffer
}
// ErrClosed implies that the Close function has already been called.
var ErrClosed = fs.ErrClosed
// Write appends p to the data ready to be stored.
//
// Content will only be sent to backend when w.Close is called.
func (w *fallbackWriter) Write(p []byte) (int, error) {
if w.closed {
return 0, ErrClosed
}
return w.buf.Write(p)
}
// Close signifies operations on writer are over.
// The file is sent to backend when called.
func (w *fallbackWriter) Close() error {
if w.closed {
return ErrClosed
}
w.closed = true
return w.st.Store(w.ctx, w.name, w.buf.Bytes())
}
// NewWriter allows writing a named blob to st.
// It returns an optimized io.WriteCloser if available, else a basic buffered implementation.
func NewWriter(ctx context.Context, st Interface, name string) (io.WriteCloser, error) {
if sst, ok := st.(StreamWriter); ok {
return sst.NewWriter(ctx, name)
}
return &fallbackWriter{
st: st,
ctx: ctx,
name: name,
}, nil
}