Skip to content

Commit

Permalink
Feat[hash]: Chunker (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunginnanet committed May 28, 2024
1 parent 4e8403a commit b7cf4f6
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 192 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module git.tcp.direct/kayos/common

go 1.19
go 1.21

require (
github.com/davecgh/go-spew v1.1.1
Expand Down
234 changes: 127 additions & 107 deletions hash/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"hash"
"io"
"slices"

Check failure on line 8 in hash/chunker.go

View workflow job for this annotation

GitHub Actions / build

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.18.10/x64/src/slices)
"sync"

"github.com/davecgh/go-spew/spew"
)

// ChunkedHasher is a hash.Hash wrapper that will hash data in chunks, similar to how a torrent client hashes data.
Expand All @@ -18,12 +17,9 @@ type ChunkedHasher struct {
ht Type
h hash.Hash
inBuf []byte
outBuf []byte
outReadIdx int
outWriteIdx int
outBufs [][]byte
chunkSize int
hashedChunkSize int
flushPending bool
mu sync.Mutex
}

Expand All @@ -33,127 +29,148 @@ type ChunkedHasher struct {
// Use [IngressChunkSize] to get the resulting chunk size.
func NewChunkedHasher(ht Type, chunkSize int) *ChunkedHasher {
h := getHasher(ht)
buf := bufPool.Get().([]byte)
buf = buf[:0]
if chunkSize == -1 {
chunkSize = h.BlockSize()
}
if cap(buf) < chunkSize {
buf = append(buf, make([]byte, chunkSize-cap(buf))...)

newInBuf := bufPool.Get().([]byte)
clear(newInBuf)
if cap(newInBuf) < chunkSize {
newInBuf = slices.Grow(newInBuf, chunkSize)
}
newInBuf = newInBuf[:chunkSize]

ch := &ChunkedHasher{
ht: ht,
h: h,
inBuf: buf,
chunkSize: chunkSize,
outBuf: bufPool.Get().([]byte),
outReadIdx: 0,
ht: ht,
h: h,
inBuf: newInBuf,
chunkSize: chunkSize,
outBufs: make([][]byte, 0, 1),
}

dummyBuf := bufPool.Get().([]byte)
if cap(dummyBuf) < chunkSize {
dummyBuf = append(dummyBuf, make([]byte, chunkSize-cap(dummyBuf))...)
}
if len(dummyBuf) < chunkSize {
dummyBuf = dummyBuf[:chunkSize]
newOutBuf := bufPool.Get().([]byte)
clear(newOutBuf)
if cap(newOutBuf) < chunkSize {
newOutBuf = slices.Grow(newOutBuf, chunkSize)
}
newOutBuf = newOutBuf[:chunkSize]

for i := 0; i < chunkSize; i++ {
dummyBuf[i] = 5
newOutBuf[i] = 5
}
ch.h.Write(dummyBuf)

ch.h.Write(newOutBuf[:chunkSize])
_ = ch.h.Sum(nil)
ch.hashedChunkSize = ch.h.Size()
ch.h.Reset()
bufPool.Put(dummyBuf)
clear(newOutBuf)

ch.outBufs = append(ch.outBufs, newInBuf[:chunkSize])

return ch
}

func (c *ChunkedHasher) Close() {
c.mu.Lock()
putHasher(c.ht, c.h)
bufPool.Put(c.inBuf)
bufPool.Put(c.outBuf)
for _, b := range c.outBufs {
clear(b)
bufPool.Put(b)
b = nil
}
c.inBuf = nil
c.outBuf = nil
c.h = nil
c.mu.Unlock()
}

var ErrHasherClosed = errors.New("hasher is closed")
var (
ErrHasherClosed = errors.New("hasher is closed")
)

// spin manages outWriteIdx and outReadIdx as ring buffer style indices.
// caller MUST hold the lock.
func (c *ChunkedHasher) spin(read, written int, flushing bool) {
c.outReadIdx += read
c.outWriteIdx += written
if !flushing && (c.outReadIdx != 0 && c.outReadIdx%c.hashedChunkSize != 0) {
spew.Dump(c)
panic("someone forgot to manage the indexes")
}
if flushing {
c.outWriteIdx = 0
}
if c.outReadIdx >= len(c.outBuf) {
c.outReadIdx = 0
}
if flushing {
return
}
if c.outWriteIdx >= len(c.outBuf) && c.outReadIdx == 0 {
c.outWriteIdx = 0
}
if c.outWriteIdx >= len(c.outBuf) {
c.outBuf = append(c.outBuf, make([]byte, c.hashedChunkSize)...)
}
}
var ErrMrHopefulUnreachable = errors.New("unreachable, or so we hope")

// sum will hash the current chunk of data, caller MUST hold the lock.
func (c *ChunkedHasher) sum(flushing bool) error {
if c.inBuf == nil || c.h == nil || c.outBuf == nil {
if c.inBuf == nil || c.h == nil || c.outBufs == nil {
return ErrHasherClosed
}

if len(c.inBuf) == 0 {
return nil
}

inSize := len(c.inBuf)

switch {
case inSize < c.chunkSize && !flushing:
if inSize == 0 && !flushing {
return nil
case inSize < c.chunkSize && flushing:
c.h.Write(c.inBuf[:inSize])
c.spin(0, c.h.Size(), flushing)
c.outBuf = c.h.Sum(c.outBuf)
c.h.Reset()
c.inBuf = c.inBuf[:0]
return nil
case inSize > c.chunkSize:
for len(c.inBuf) >= c.chunkSize {
c.h.Write(c.inBuf[:c.chunkSize])
c.spin(0, c.h.Size(), flushing)
c.outBuf = c.h.Sum(c.outBuf)
}

latestBuf := len(c.outBufs) - 1

if flushing && len(c.inBuf) > 0 {
switch {
case len(c.inBuf) > c.chunkSize:
panic(ErrMrHopefulUnreachable)
case len(c.inBuf) < c.chunkSize:
c.h.Reset()
c.inBuf = c.inBuf[c.chunkSize:]
flushed := 0
for {
n, e := c.h.Write(c.inBuf[:c.chunkSize-len(c.inBuf)])
if e != nil {
return e
}
if n == 0 {
break
}
c.inBuf = c.inBuf[n:]
flushed += n
if flushed == c.chunkSize || len(c.inBuf) == 0 || c.h.Size() == c.hashedChunkSize {
break
}
}
}
if len(c.inBuf) > 0 && (flushing || len(c.inBuf) == c.chunkSize) {
c.h.Write(c.inBuf)
c.spin(0, c.h.Size(), flushing)
c.outBuf = c.h.Sum(c.outBuf)
c.h.Reset()
c.inBuf = c.inBuf[:0]
switch {
case len(c.outBufs[latestBuf]) == c.hashedChunkSize:
newBuf := bufPool.Get().([]byte)
clear(newBuf)
if cap(newBuf) < c.chunkSize {
newBuf = slices.Grow(newBuf, c.chunkSize-cap(newBuf))
}
newBuf = newBuf[:c.h.Size()]
newBuf = c.h.Sum(newBuf)
c.outBufs = append(c.outBufs, newBuf[:c.h.Size()])
latestBuf++
case len(c.outBufs[latestBuf]) == 0:
c.outBufs[latestBuf] = c.h.Sum(c.outBufs[latestBuf])
case len(c.outBufs[latestBuf]) < c.hashedChunkSize && len(c.outBufs[latestBuf]) > 0:

panic(ErrMrHopefulUnreachable)
}
return nil
case inSize == c.chunkSize:
c.h.Write(c.inBuf)
c.spin(0, c.h.Size(), flushing)
c.outBuf = c.h.Sum(c.outBuf)
}

if !flushing && len(c.inBuf) == c.chunkSize {
c.h.Reset()
c.inBuf = c.inBuf[:0]
return nil
default:
panic("unreachable, or so we hope")
n, e := c.h.Write(c.inBuf)
if e != nil {
return e
}
if n != c.chunkSize {
return fmt.Errorf("%w: expected to write %d bytes, but only wrote %d", io.ErrShortWrite, c.chunkSize, n)
}
newBuf := bufPool.Get().([]byte)
clear(newBuf)
if cap(newBuf) < c.chunkSize {
newBuf = slices.Grow(newBuf, c.chunkSize-cap(newBuf))
}
newBuf = newBuf[:c.h.Size()]
newBuf = c.h.Sum(newBuf)
c.outBufs = append(c.outBufs, newBuf[:c.h.Size()])
latestBuf++
c.h.Reset()
c.inBuf = c.inBuf[:n]
}

return nil
}

// Flush will ensure that any misaligned data at the tail is hashed.
Expand All @@ -180,12 +197,11 @@ func (c *ChunkedHasher) EgressChunkSize() int {
}

func (c *ChunkedHasher) Write(p []byte) (n int, err error) {
n = len(p)
if len(p) == 0 {
if n = len(p); n == 0 {
return
}
c.mu.Lock()
if c.inBuf == nil || c.h == nil || c.outBuf == nil {
if c.inBuf == nil || c.h == nil || c.outBufs == nil {
c.mu.Unlock()
return 0, ErrHasherClosed
}
Expand All @@ -211,35 +227,39 @@ func (c *ChunkedHasher) Write(p []byte) (n int, err error) {
// - If there is no data to hash, it will return an error.
func (c *ChunkedHasher) Next(dst []byte) ([]byte, int, error) {
c.mu.Lock()
if c.inBuf == nil || c.h == nil || c.outBuf == nil {
if c.inBuf == nil || c.h == nil || c.outBufs == nil {
c.mu.Unlock()
return dst, 0, ErrHasherClosed
}
if err := c.sum(false); err != nil {
c.mu.Unlock()
return dst, 0, err
}
if len(c.outBuf) == 0 {

c.outBufs = slices.Clip(c.outBufs)

if len(c.outBufs) == 0 || len(c.outBufs) == 1 && len(c.outBufs[0]) == 0 {
c.mu.Unlock()
return dst, 0, fmt.Errorf("%w: no data to hash", io.EOF)
println("EOF")
return dst, 0, io.EOF
}
if c.outReadIdx >= len(c.outBuf) {
panic("someone forgot to move the index pointer")

if dst == nil {
dst = bufPool.Get().([]byte)
clear(dst)
dst = dst[:c.hashedChunkSize]
}
if cap(dst) < len(dst)+c.hashedChunkSize {
dst = append(dst, make([]byte, c.hashedChunkSize)...)

if cap(dst) < c.hashedChunkSize {
dst = slices.Grow(dst, c.hashedChunkSize-cap(dst))
}
dst = dst[:c.hashedChunkSize]
n := copy(dst, c.outBuf[c.outReadIdx:c.outReadIdx+c.hashedChunkSize])
if n != c.hashedChunkSize {
n := copy(dst, c.outBufs[len(c.outBufs)-1])
if n == len(c.outBufs[len(c.outBufs)-1]) {
c.outBufs = slices.Delete(c.outBufs, len(c.outBufs)-1, len(c.outBufs))
c.outBufs = slices.Clip(c.outBufs)
} else {
c.outBufs[len(c.outBufs)-1] = c.outBufs[len(c.outBufs)-1][n:]
c.mu.Unlock()
spew.Dump(c)
spew.Dump(dst)
// panic justification: handling the index pointer after a failure like this is complex. FUBAR.
// caller just needs to get it together and not fuck this up; else take a panic to the face.
panic(fmt.Errorf("%w: expected to copy %d bytes, but only copied %d", io.ErrShortWrite, c.hashedChunkSize, n))
return dst, n, io.ErrShortBuffer
}
c.spin(n, 0, false)

c.mu.Unlock()
return dst, n, nil
}
Loading

0 comments on commit b7cf4f6

Please sign in to comment.