Skip to content

Commit

Permalink
Started on dirtylist packet etc. Diagram.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 6, 2024
1 parent 5b99da4 commit a0f24fa
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 4 deletions.
8 changes: 7 additions & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func runConnect(ccmd *cobra.Command, args []string) {
dest.NeedAt(offset, length)
}

go pro.Handle()
go func() {
err := pro.Handle()
fmt.Printf("PROTOCOL ERROR %v\n", err)

// If it's EOF then the migration is completed, and we can switch to r/w

}()

go dest.HandleSend(context.TODO())
go dest.HandleReadAt()
Expand Down
224 changes: 224 additions & 0 deletions images/Silo.drawio

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions images/Silo.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 16 additions & 0 deletions pkg/storage/modules/from_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ func (fp *FromProtocol) HandleWriteAt() error {
}
}

// Handle any DirtyList commands
func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint32)) error {
for {
_, data, err := fp.protocol.WaitForCommand(fp.dev, protocol.COMMAND_DIRTY_LIST)
if err != nil {
return err
}
blocks, err := protocol.DecodeDirtyList(data)
if err != nil {
return err
}

cb(blocks)
}
}

func (i *FromProtocol) NeedAt(offset int64, length int32) error {
b := protocol.EncodeNeedAt(offset, length)
_, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b)
Expand Down
60 changes: 60 additions & 0 deletions pkg/storage/modules/read_only_gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package modules

import (
"sync"

"github.com/loopholelabs/silo/pkg/storage"
)

/**
*
*/

type ReadOnlyGate struct {
prov storage.StorageProvider
lock *sync.Cond
locked bool
}

func NewReadOnlyGate(prov storage.StorageProvider) *Lockable {
return &Lockable{
prov: prov,
lock: sync.NewCond(&sync.Mutex{}),
locked: false,
}
}

func (i *ReadOnlyGate) ReadAt(p []byte, off int64) (n int, err error) {
return i.prov.ReadAt(p, off)
}

func (i *ReadOnlyGate) WriteAt(p []byte, off int64) (n int, err error) {
i.lock.L.Lock()
if i.locked {
i.lock.Wait()
}
i.lock.L.Unlock()

return i.prov.WriteAt(p, off)
}

func (i *ReadOnlyGate) Flush() error {
return i.prov.Flush()
}

func (i *ReadOnlyGate) Size() uint64 {
return i.prov.Size()
}

func (i *ReadOnlyGate) Unlock() {
i.lock.L.Lock()
i.locked = false
i.lock.Broadcast()
i.lock.L.Unlock()
}

func (i *ReadOnlyGate) Lock() {
i.lock.L.Lock()
i.locked = true
i.lock.L.Unlock()
}
6 changes: 6 additions & 0 deletions pkg/storage/modules/to_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ func NewToProtocol(size uint64, deviceID uint32, p protocol.Protocol) *ToProtoco
}
}

func (i *ToProtocol) DirtyList(blocks []uint32) error {
b := protocol.EncodeDirtyList(blocks)
_, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b)
return err
}

func (i *ToProtocol) ReadAt(buffer []byte, offset int64) (int, error) {
b := protocol.EncodeReadAt(offset, int32(len(buffer)))
id, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b)
Expand Down
32 changes: 29 additions & 3 deletions pkg/storage/modules/waiting_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
/**
* Waiting cache StorageProvider
*
* NB: This tracks COMPLETE blocks only. Not partial ones.
*
*/
type WaitingCache struct {
prov storage.StorageProvider
Expand Down Expand Up @@ -42,7 +44,7 @@ func (i *WaitingCache) ReadAt(buffer []byte, offset int64) (int, error) {
b_start := uint(offset / int64(i.block_size))
b_end := uint((end-1)/uint64(i.block_size)) + 1

// Check if we have all the data
// Check if we have all the data required
i.lockers_lock.Lock()
avail := i.available.BitsSet(uint(b_start), uint(b_end))
i.lockers_lock.Unlock()
Expand Down Expand Up @@ -79,6 +81,15 @@ func (i *WaitingCache) haveBlock(b uint) {
i.lockers_lock.Unlock()
}

func (i *WaitingCache) haveNotBlock(b uint) {
i.lockers_lock.Lock()
avail := i.available.BitSet(int(b))
if avail {
i.available.ClearBit(int(b))
}
i.lockers_lock.Unlock()
}

func (i *WaitingCache) waitForBlock(b uint) {
i.lockers_lock.Lock()
avail := i.available.BitSet(int(b))
Expand Down Expand Up @@ -109,13 +120,28 @@ func (i *WaitingCache) WriteAt(buffer []byte, offset int64) (int, error) {

n, err := i.prov.WriteAt(buffer, offset)

for b := b_start; b < b_end; b++ {
i.haveBlock(b)
// If the first block is incomplete, we won't mark it.
if offset > (int64(b_start) * int64(i.block_size)) {
b_start++
}
// If the last block is incomplete, we won't mark it.
if (end % uint64(i.block_size)) > 0 {
b_end--
}

if b_end > b_start {
for b := b_start; b < b_end; b++ {
i.haveBlock(b)
}
}

return n, err
}

func (i *WaitingCache) Unmark(block uint) {
i.haveNotBlock(block)
}

func (i *WaitingCache) Flush() error {
return i.prov.Flush()
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/storage/protocol/dirty_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package protocol

import (
"encoding/binary"
"errors"
)

func EncodeDirtyList(blocks []uint32) []byte {
buff := make([]byte, 1+4+4*len(blocks))
buff[0] = COMMAND_DIRTY_LIST
binary.LittleEndian.PutUint32(buff[1:], uint32(len(blocks)))
for i, v := range blocks {
binary.LittleEndian.PutUint32(buff[(5+i*4):], v)
}
return buff
}

func DecodeDirtyList(buff []byte) ([]uint32, error) {
if buff == nil || len(buff) < 9 || buff[0] != COMMAND_DIRTY_LIST {
return nil, errors.New("Invalid packet")
}
length := binary.LittleEndian.Uint32(buff[1:])
blocks := make([]uint32, length)
for i := 0; i < int(length); i++ {
blocks[i] = binary.LittleEndian.Uint32(buff[(5 + i*4):])
}
return blocks, nil
}
1 change: 1 addition & 0 deletions pkg/storage/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const COMMAND_RESPONSE = byte(0x80)
const COMMAND_READ_AT = COMMAND_REQUEST | byte(1)
const COMMAND_WRITE_AT = COMMAND_REQUEST | byte(2)
const COMMAND_NEED_AT = COMMAND_REQUEST | byte(3)
const COMMAND_DIRTY_LIST = COMMAND_REQUEST | byte(4)

const COMMAND_READ_AT_RESPONSE = COMMAND_RESPONSE | byte(1)
const COMMAND_READ_AT_RESPONSE_ERR = COMMAND_RESPONSE | byte(2)
Expand Down
40 changes: 40 additions & 0 deletions pkg/storage/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,43 @@ func TestWriteAtResponse(t *testing.T) {
assert.Error(t, rare.Error)

}

func TestNeedAt(t *testing.T) {

b := EncodeNeedAt(12345, 10)

off, length, err := DecodeNeedAt(b)
assert.NoError(t, err)
assert.Equal(t, int64(12345), off)
assert.Equal(t, int32(10), length)

// Make sure we can't decode silly things
_, _, err = DecodeNeedAt(nil)
assert.Error(t, err)

_, _, err = DecodeNeedAt([]byte{
99,
})
assert.Error(t, err)

}

func TestDirtyList(t *testing.T) {

blocks := []uint32{1, 7, 100}
b := EncodeDirtyList(blocks)

blocks2, err := DecodeDirtyList(b)
assert.NoError(t, err)
assert.Equal(t, blocks, blocks2)

// Make sure we can't decode silly things
_, err = DecodeDirtyList(nil)
assert.Error(t, err)

_, err = DecodeDirtyList([]byte{
99,
})
assert.Error(t, err)

}

0 comments on commit a0f24fa

Please sign in to comment.