-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrandomaccess.go
215 lines (195 loc) · 5.48 KB
/
randomaccess.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package s2randomaccess
import (
"bytes"
"errors"
"io"
"runtime"
"sync"
"github.com/klauspost/compress/s2"
)
type Seeker struct {
data []byte
idx s2.Index
*inner
sentinel *sentinel
indexGiven bool
allowBuildIndex bool
}
// inner is the subset of fields of the Seeker that is needed from a cached entry in the blockcache.
// It is separate from Seeker so the Seeker can be garbage collected even while some blocks are still in the cache.
type inner struct {
allocator Allocator
mtx sync.Mutex
active map[int64]*decompressedBlock
dying bool
}
// sentinel has a finalizer to drop everything from this Seeker from the cache.
// We could've also set the finalizer on *Seeker itself, but that would've kept the reference for s.data one cycle longer.
type sentinel struct {
inner *inner
}
type Option func(*Seeker) error
// New creates a new Seeker from a slice of compressed S2 (or Snappy) stream.
// The given slice is never written to.
// The index must be either:
// 1. At the end of the stream
// 2. Built during new (if WithAllowBuildIndex() is passed)
// 3. Given with WithIndex()
func New(data []byte, options ...Option) (*Seeker, error) {
ret := &Seeker{
data: data,
inner: &inner{
active: map[int64]*decompressedBlock{},
},
}
for _, o := range options {
if err := o(ret); err != nil {
return nil, err
}
}
if err := ret.loadIndex(); err != nil {
return nil, err
}
ret.sentinel = &sentinel{ret.inner}
runtime.SetFinalizer(ret.sentinel, func(s *sentinel) {
s.inner.removeFromGlobalCache()
})
return ret, nil
}
// WithIndex passes the index rather than having it loaded from the stream.
func WithIndex(idx s2.Index) Option {
return func(s *Seeker) error {
s.idx = idx
s.indexGiven = true
return nil
}
}
// WithAllowBuildIndex fall back to indexing the stream ourselves if it isn't present in the stream.
func WithAllowBuildIndex() Option {
return func(s *Seeker) error {
s.allowBuildIndex = true
return nil
}
}
func (s *Seeker) loadIndex() error {
if s.indexGiven {
// Given through WithIndex.
return nil
}
switch err := s.idx.LoadStream(bytes.NewReader(s.data)); err {
case nil:
return nil
default:
return err
case s2.ErrUnsupported:
}
if !s.allowBuildIndex {
return errors.New("s2randomaccess: didn't find index in data and WithAllowBuildIndex() is not enabled")
}
idx, err := s2.IndexStream(bytes.NewReader(s.data))
if err != nil {
return err
}
if _, err := s.idx.Load(idx); err != nil {
return err
}
return nil
}
const (
headerSize = 4
checksumSize = 4
chunkTypeCompressedData = 0x00
chunkTypeUncompressedData = 0x01
)
// Get the uncompressed data at the given (uncompressed) offset.
// Will return a slice with length $length and a deref function that should be called exactly once.
// The returned slice is valid until a call to deref and should not be modified.
// If an error is returned, no deref function will be returned.
func (s *Seeker) Get(offset, length int64) ([]byte, func(), error) {
defer runtime.KeepAlive(s)
comprOff, uncomprOff, err := s.idx.Find(offset)
if err != nil {
return nil, nil, err
}
skipUncompr := offset - uncomprOff
var partial []byte
partialDeref := noop
for comprOff < int64(len(s.data)) {
chunkHeader := s.data[comprOff:][:headerSize]
chunkType := chunkHeader[0]
chunkLen := int(chunkHeader[1]) | int(chunkHeader[2])<<8 | int(chunkHeader[3])<<16
chunk := s.data[comprOff+headerSize:][:chunkLen]
var plain []byte
var plainDeref func()
switch chunkType {
case chunkTypeCompressedData:
dLen, err := s2.DecodedLen(chunk[checksumSize:])
if err != nil {
partialDeref()
return nil, nil, err
}
if skipUncompr >= int64(dLen) {
skipUncompr -= int64(dLen)
break
}
block, deref, err := s.getDecompressedBlock(comprOff+headerSize+checksumSize, len(chunk)-checksumSize, dLen)
if err != nil {
partialDeref()
return nil, nil, err
}
plain = block[skipUncompr:]
plainDeref = deref
case chunkTypeUncompressedData:
dLen := len(chunk) - checksumSize
if skipUncompr >= int64(dLen) {
skipUncompr -= int64(dLen)
break
}
plain = chunk[checksumSize+skipUncompr:]
plainDeref = noop
default:
if chunkType <= 0x7f {
// Unknown reserved unskippable chunk
partialDeref()
return nil, nil, s2.ErrUnsupported
}
}
if plain != nil {
if partial == nil {
if length <= int64(len(plain)) {
return plain[:length], plainDeref, nil
}
partial = s.allocator.Alloc(int(length))[:0]
partialDeref = func() {
s.allocator.Free(partial)
}
}
if len(plain) > int(length)-len(partial) {
plain = plain[:int(length)-len(partial)]
}
partial = append(partial, plain...)
plainDeref()
if len(partial) == int(length) {
return partial, partialDeref, nil
}
skipUncompr = 0
}
comprOff += headerSize + int64(chunkLen)
}
partialDeref()
return nil, nil, io.ErrUnexpectedEOF
}
// ReadAt reads uncompressed data from the compressed stream at the given (uncompressed) offset.
// Currently reads where offset+len(dst) is past the end of the stream fail with 0, io.ErrUnexpectedEOF, but that might be changed in the future to give a partial result together with io.EOF.
func (s *Seeker) ReadAt(dst []byte, offset int64) (int, error) {
buf, deref, err := s.Get(offset, int64(len(dst)))
if err != nil {
return 0, err
}
defer deref()
// TODO: We could optimize reads that span chunks by using dst as the buffer.
return copy(dst, buf), nil
}
var _ io.ReaderAt = &Seeker{}
func noop() {
}