Skip to content

Commit

Permalink
Added writeAt block splitter and test
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 12, 2024
1 parent 1468f97 commit 8182755
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func runServe(ccmd *cobra.Command, args []string) {
sourceMonitor := modules.NewVolatilityMonitor(sourceDirty, block_size, 10*time.Second)
sourceStorage := modules.NewLockable(sourceMonitor)

// Write some random stuff to the device...
go writeRandom(sourceStorage)

// Start monitoring blocks.
Expand Down
62 changes: 58 additions & 4 deletions pkg/storage/modules/block_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,66 @@ func (i *BlockSplitter) ReadAt(buffer []byte, offset int64) (n int, err error) {
return len(buffer), nil
}

func (i *BlockSplitter) WriteAt(p []byte, off int64) (n int, err error) {
// Split the write up into blocks, and concurrenty perform the writes...
func (i *BlockSplitter) WriteAt(buffer []byte, offset int64) (n int, err error) {
// Split the read up into blocks, and concurrenty perform the reads...
end := uint64(offset + int64(len(buffer)))
if end > i.size {
end = i.size
}

b_start := uint(offset / int64(i.block_size))
b_end := uint((end-1)/uint64(i.block_size)) + 1

blocks := b_end - b_start
errs := make(chan error, blocks)

for b := b_start; b < b_end; b++ {
go func(block_no uint) {
block_offset := int64(block_no) * int64(i.block_size)
var err error
if block_offset > offset {
// Partial write at the end
if len(buffer[block_offset-offset:]) < i.block_size {
block_buffer := make([]byte, i.block_size)
// Read existing data
_, err = i.prov.ReadAt(block_buffer, block_offset)
if err == nil {
// Update the data
copy(block_buffer, buffer[block_offset-offset:])
// Write back
_, err = i.prov.WriteAt(block_buffer, block_offset)
}
} else {
// Complete write in the middle
s := block_offset - offset
e := s + int64(i.block_size)
if e > int64(len(buffer)) {
e = int64(len(buffer))
}
_, err = i.prov.WriteAt(buffer[s:e], block_offset)
}
} else {
// Partial write at the start
block_buffer := make([]byte, i.block_size)
_, err = i.prov.ReadAt(block_buffer, block_offset)
if err == nil {
copy(block_buffer[offset-block_offset:], buffer)
_, err = i.prov.WriteAt(block_buffer, block_offset)
}
}
errs <- err
}(b)
}

// TODO
// Wait for completion, Check for errors and return...
for b := b_start; b < b_end; b++ {
e := <-errs
if e != nil {
return 0, e
}
}

return i.prov.WriteAt(p, off)
return len(buffer), nil
}

func (i *BlockSplitter) Flush() error {
Expand Down
38 changes: 37 additions & 1 deletion pkg/storage/modules/block_splitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestBlockSplitter(t *testing.T) {
func TestBlockSplitterRead(t *testing.T) {

// Create a new block storage, backed by memory storage
size := 1024 * 1024 * 32
Expand Down Expand Up @@ -47,3 +47,39 @@ func TestBlockSplitter(t *testing.T) {
fmt.Printf("Read took %dms. Read split took %dms.\n", read_duration.Milliseconds(), read_duration_split.Milliseconds())
metrics.ShowStats("Source")
}

func TestBlockSplitterWrite(t *testing.T) {

// Create a new block storage, backed by memory storage
size := 1024 * 1024 * 32
mem := sources.NewMemoryStorage(size)
// metrics := NewMetrics(mem)
split := NewBlockSplitter(mem, 100) // Block size of 100 bytes

write_len := 1234567

// Write some stuff to the mem...
b := make([]byte, size)
rand.Read(b)
_, err := mem.WriteAt(b, 0)
assert.NoError(t, err)

offset := int64(10)
// Write using a splitter (splits the read up into concurrent block writes)
buffer := make([]byte, write_len)
rand.Read(buffer)
_, err = split.WriteAt(buffer, offset)
assert.NoError(t, err)

// Check that the write was performed properly and didn't mess up anything

copy(b[offset:], buffer) // Perform the write in the b buffer.

check := make([]byte, size)
_, err = mem.ReadAt(check, 0)
assert.NoError(t, err)

assert.Equal(t, b, check)

// metrics.ShowStats("source")
}

0 comments on commit 8182755

Please sign in to comment.