Skip to content

Commit

Permalink
Initial impl of WriteCombinator (#52)
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod authored Nov 19, 2024
1 parent dbd2d24 commit a54a0e2
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
137 changes: 137 additions & 0 deletions pkg/storage/modules/write_combinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package modules

import (
"sync"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/util"
)

// Restrictions
// * All WriteAts should have a block aligned offset.
// * WriteAt buffer size should be block multiple, unless it includes the END of the storage, in which case can be less.
// * priority should be unique amongst sources.

type WriteCombinator struct {
prov storage.Provider
blockSize int
numBlocks int
size uint64
writeLock sync.Mutex
sources map[int]*writeSource
}

// Create a new combinator
func NewWriteCombinator(prov storage.Provider, blockSize int) *WriteCombinator {
numBlocks := (prov.Size() + uint64(blockSize) - 1) / uint64(blockSize)
return &WriteCombinator{
prov: prov,
blockSize: blockSize,
size: prov.Size(),
numBlocks: int(numBlocks),
sources: make(map[int]*writeSource, 0),
}
}

// Add a new source to write into the combinator, with specified priority. Priority must be unique.
func (i *WriteCombinator) AddSource(priority int) storage.Provider {
i.writeLock.Lock()
defer i.writeLock.Unlock()
ws := &writeSource{
priority: priority,
combinator: i,
available: util.NewBitfield(i.numBlocks),
}
i.sources[priority] = ws
return ws
}

// Remove a source from this combinator.
func (i *WriteCombinator) RemoveSource(priority int) {
i.writeLock.Lock()
defer i.writeLock.Unlock()
delete(i.sources, priority)
}

// Find the highest priority write for a block, or -1 if no writes
func (i *WriteCombinator) getHighestPriorityForBlock(b uint) int {
highestPriority := -1
for _, ws := range i.sources {
if ws.priority > highestPriority && ws.available.BitSet(int(b)) {
highestPriority = ws.priority
}
}
return highestPriority
}

type writeSource struct {
storage.ProviderWithEvents
priority int
combinator *WriteCombinator
available *util.Bitfield
}

// Relay events to embedded StorageProvider
func (ws *writeSource) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData {
data := ws.ProviderWithEvents.SendSiloEvent(eventType, eventData)
return append(data, storage.SendSiloEvent(ws.combinator.prov, eventType, eventData)...)
}

// Writes only allowed through if they beat any existing writes
func (ws *writeSource) WriteAt(buffer []byte, offset int64) (int, error) {
ws.combinator.writeLock.Lock()
defer ws.combinator.writeLock.Unlock()

end := uint64(offset + int64(len(buffer)))
if end > ws.combinator.size {
end = ws.combinator.size
}

bStart := uint(offset / int64(ws.combinator.blockSize))
bEnd := uint((end-1)/uint64(ws.combinator.blockSize)) + 1

blockOffset := int64(0)

// Check block by block if we should let it through...
for b := bStart; b < bEnd; b++ {
existingPriority := ws.combinator.getHighestPriorityForBlock(b)
if ws.priority > existingPriority {
// Allow the write through, and update our availability
blockEnd := blockOffset + int64(ws.combinator.blockSize)
if blockEnd > int64(ws.combinator.size) {
blockEnd = int64(ws.combinator.size)
}
blockData := buffer[blockOffset:blockEnd]
_, err := ws.combinator.prov.WriteAt(blockData, offset+blockOffset)
if err != nil {
return 0, err
}
ws.available.SetBit(int(b))
}
blockOffset += int64(ws.combinator.blockSize)
}

// Report no error.
return len(buffer), nil
}

// Route everything else through to prov
func (ws *writeSource) ReadAt(buffer []byte, offset int64) (int, error) {
return ws.combinator.prov.ReadAt(buffer, offset)
}

func (ws *writeSource) Flush() error {
return ws.combinator.prov.Flush()
}

func (ws *writeSource) Size() uint64 {
return ws.combinator.prov.Size()
}

func (ws *writeSource) Close() error {
return ws.combinator.prov.Close()
}

func (ws *writeSource) CancelWrites(offset int64, length int64) {
ws.combinator.prov.CancelWrites(offset, length)
}
43 changes: 43 additions & 0 deletions pkg/storage/modules/write_combinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package modules

import (
"testing"

"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/stretchr/testify/assert"
)

func TestWriteCombinatorBasic(t *testing.T) {
storage := sources.NewMemoryStorage(64 * 1024)

blockSize := 4 * 1024

combinator := NewWriteCombinator(storage, blockSize)

// Try doing a couple of writes and make sure they're dealt with correctly...
source1 := combinator.AddSource(1)
source2 := combinator.AddSource(2)

// Make a couple of buffers with 1s and 2s in them so we can tell the data apart...
buffer1 := make([]byte, blockSize)
buffer2 := make([]byte, blockSize)
for i := 0; i < blockSize; i++ {
buffer1[i] = 1
buffer2[i] = 2
}

// Do a couple of writes with different priority
source1.WriteAt(buffer1, 0)
source2.WriteAt(buffer2, 0)

// Swap the order of the writes this time...
source2.WriteAt(buffer2, int64(blockSize))
source1.WriteAt(buffer1, int64(blockSize))

// Now check... source2 should have won in both cases as it has higher priority...
checkBuffer := make([]byte, blockSize*2)
storage.ReadAt(checkBuffer, 0)
for i := 0; i < len(checkBuffer); i++ {
assert.Equal(t, uint8(2), checkBuffer[i])
}
}

0 comments on commit a54a0e2

Please sign in to comment.