From a0f24faf9e4985fd027fa06ece34ffd88f213131 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 6 Feb 2024 13:19:09 +0000 Subject: [PATCH] Started on dirtylist packet etc. Diagram. --- cmd/connect.go | 8 +- images/Silo.drawio | 224 ++++++++++++++++++++++++++ images/Silo.drawio.svg | 4 + pkg/storage/modules/from_protocol.go | 16 ++ pkg/storage/modules/read_only_gate.go | 60 +++++++ pkg/storage/modules/to_protocol.go | 6 + pkg/storage/modules/waiting_cache.go | 32 +++- pkg/storage/protocol/dirty_list.go | 28 ++++ pkg/storage/protocol/protocol.go | 1 + pkg/storage/protocol/protocol_test.go | 40 +++++ 10 files changed, 415 insertions(+), 4 deletions(-) create mode 100644 images/Silo.drawio create mode 100644 images/Silo.drawio.svg create mode 100644 pkg/storage/modules/read_only_gate.go create mode 100644 pkg/storage/protocol/dirty_list.go diff --git a/cmd/connect.go b/cmd/connect.go index 94482edd..9aa8eef1 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -73,7 +73,13 @@ func runConnect(ccmd *cobra.Command, args []string) { dest.NeedAt(offset, length) } - go pro.Handle() + go func() { + err := pro.Handle() + fmt.Printf("PROTOCOL ERROR %v\n", err) + + // If it's EOF then the migration is completed, and we can switch to r/w + + }() go dest.HandleSend(context.TODO()) go dest.HandleReadAt() diff --git a/images/Silo.drawio b/images/Silo.drawio new file mode 100644 index 00000000..68a503b6 --- /dev/null +++ b/images/Silo.drawio @@ -0,0 +1,224 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/images/Silo.drawio.svg b/images/Silo.drawio.svg new file mode 100644 index 00000000..b7ad825a --- /dev/null +++ b/images/Silo.drawio.svg @@ -0,0 +1,4 @@ + + + +
Shard 2
Shard 1
ShardedStorage
DirtyTracker
VolatilityMonitor
Lockable
WORKLOAD
PriorityBlockOrderer
ProtocolRW
ToProtocol
Migrator
Lock / Unlock
Pause / Resume
TRANSPORT
ProtocolRW
FromProtocol
WaitingCache
Shard 2
Shard 1
ShardedStorage
WORKLOAD
ReadOnlyGate
\ No newline at end of file diff --git a/pkg/storage/modules/from_protocol.go b/pkg/storage/modules/from_protocol.go index a6a02f29..3324a8f6 100644 --- a/pkg/storage/modules/from_protocol.go +++ b/pkg/storage/modules/from_protocol.go @@ -100,6 +100,22 @@ func (fp *FromProtocol) HandleWriteAt() error { } } +// Handle any DirtyList commands +func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint32)) error { + for { + _, data, err := fp.protocol.WaitForCommand(fp.dev, protocol.COMMAND_DIRTY_LIST) + if err != nil { + return err + } + blocks, err := protocol.DecodeDirtyList(data) + if err != nil { + return err + } + + cb(blocks) + } +} + func (i *FromProtocol) NeedAt(offset int64, length int32) error { b := protocol.EncodeNeedAt(offset, length) _, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b) diff --git a/pkg/storage/modules/read_only_gate.go b/pkg/storage/modules/read_only_gate.go new file mode 100644 index 00000000..94503b63 --- /dev/null +++ b/pkg/storage/modules/read_only_gate.go @@ -0,0 +1,60 @@ +package modules + +import ( + "sync" + + "github.com/loopholelabs/silo/pkg/storage" +) + +/** + * + */ + +type ReadOnlyGate struct { + prov storage.StorageProvider + lock *sync.Cond + locked bool +} + +func NewReadOnlyGate(prov storage.StorageProvider) *Lockable { + return &Lockable{ + prov: prov, + lock: sync.NewCond(&sync.Mutex{}), + locked: false, + } +} + +func (i *ReadOnlyGate) ReadAt(p []byte, off int64) (n int, err error) { + return i.prov.ReadAt(p, off) +} + +func (i *ReadOnlyGate) WriteAt(p []byte, off int64) (n int, err error) { + i.lock.L.Lock() + if i.locked { + i.lock.Wait() + } + i.lock.L.Unlock() + + return i.prov.WriteAt(p, off) +} + +func (i *ReadOnlyGate) Flush() error { + return i.prov.Flush() +} + +func (i *ReadOnlyGate) Size() uint64 { + return i.prov.Size() +} + +func (i *ReadOnlyGate) Unlock() { + i.lock.L.Lock() + i.locked = false + i.lock.Broadcast() + i.lock.L.Unlock() +} + +func (i *ReadOnlyGate) Lock() { + i.lock.L.Lock() + i.locked = true + i.lock.L.Unlock() +} diff --git a/pkg/storage/modules/to_protocol.go b/pkg/storage/modules/to_protocol.go index 090af8a7..da0dbdad 100644 --- a/pkg/storage/modules/to_protocol.go +++ b/pkg/storage/modules/to_protocol.go @@ -18,6 +18,12 @@ func NewToProtocol(size uint64, deviceID uint32, p protocol.Protocol) *ToProtoco } } +func (i *ToProtocol) DirtyList(blocks []uint32) error { + b := protocol.EncodeDirtyList(blocks) + _, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b) + return err +} + func (i *ToProtocol) ReadAt(buffer []byte, offset int64) (int, error) { b := protocol.EncodeReadAt(offset, int32(len(buffer))) id, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b) diff --git a/pkg/storage/modules/waiting_cache.go b/pkg/storage/modules/waiting_cache.go index 6b243a45..2244426e 100644 --- a/pkg/storage/modules/waiting_cache.go +++ b/pkg/storage/modules/waiting_cache.go @@ -10,6 +10,8 @@ import ( /** * Waiting cache StorageProvider * + * NB: This tracks COMPLETE blocks only. Not partial ones. + * */ type WaitingCache struct { prov storage.StorageProvider @@ -42,7 +44,7 @@ func (i *WaitingCache) ReadAt(buffer []byte, offset int64) (int, error) { b_start := uint(offset / int64(i.block_size)) b_end := uint((end-1)/uint64(i.block_size)) + 1 - // Check if we have all the data + // Check if we have all the data required i.lockers_lock.Lock() avail := i.available.BitsSet(uint(b_start), uint(b_end)) i.lockers_lock.Unlock() @@ -79,6 +81,15 @@ func (i *WaitingCache) haveBlock(b uint) { i.lockers_lock.Unlock() } +func (i *WaitingCache) haveNotBlock(b uint) { + i.lockers_lock.Lock() + avail := i.available.BitSet(int(b)) + if avail { + i.available.ClearBit(int(b)) + } + i.lockers_lock.Unlock() +} + func (i *WaitingCache) waitForBlock(b uint) { i.lockers_lock.Lock() avail := i.available.BitSet(int(b)) @@ -109,13 +120,28 @@ func (i *WaitingCache) WriteAt(buffer []byte, offset int64) (int, error) { n, err := i.prov.WriteAt(buffer, offset) - for b := b_start; b < b_end; b++ { - i.haveBlock(b) + // If the first block is incomplete, we won't mark it. + if offset > (int64(b_start) * int64(i.block_size)) { + b_start++ + } + // If the last block is incomplete, we won't mark it. + if (end % uint64(i.block_size)) > 0 { + b_end-- + } + + if b_end > b_start { + for b := b_start; b < b_end; b++ { + i.haveBlock(b) + } } return n, err } +func (i *WaitingCache) Unmark(block uint) { + i.haveNotBlock(block) +} + func (i *WaitingCache) Flush() error { return i.prov.Flush() } diff --git a/pkg/storage/protocol/dirty_list.go b/pkg/storage/protocol/dirty_list.go new file mode 100644 index 00000000..4f4e48cb --- /dev/null +++ b/pkg/storage/protocol/dirty_list.go @@ -0,0 +1,28 @@ +package protocol + +import ( + "encoding/binary" + "errors" +) + +func EncodeDirtyList(blocks []uint32) []byte { + buff := make([]byte, 1+4+4*len(blocks)) + buff[0] = COMMAND_DIRTY_LIST + binary.LittleEndian.PutUint32(buff[1:], uint32(len(blocks))) + for i, v := range blocks { + binary.LittleEndian.PutUint32(buff[(5+i*4):], v) + } + return buff +} + +func DecodeDirtyList(buff []byte) ([]uint32, error) { + if buff == nil || len(buff) < 9 || buff[0] != COMMAND_DIRTY_LIST { + return nil, errors.New("Invalid packet") + } + length := binary.LittleEndian.Uint32(buff[1:]) + blocks := make([]uint32, length) + for i := 0; i < int(length); i++ { + blocks[i] = binary.LittleEndian.Uint32(buff[(5 + i*4):]) + } + return blocks, nil +} diff --git a/pkg/storage/protocol/protocol.go b/pkg/storage/protocol/protocol.go index 2c687a45..a3accae3 100644 --- a/pkg/storage/protocol/protocol.go +++ b/pkg/storage/protocol/protocol.go @@ -6,6 +6,7 @@ const COMMAND_RESPONSE = byte(0x80) const COMMAND_READ_AT = COMMAND_REQUEST | byte(1) const COMMAND_WRITE_AT = COMMAND_REQUEST | byte(2) const COMMAND_NEED_AT = COMMAND_REQUEST | byte(3) +const COMMAND_DIRTY_LIST = COMMAND_REQUEST | byte(4) const COMMAND_READ_AT_RESPONSE = COMMAND_RESPONSE | byte(1) const COMMAND_READ_AT_RESPONSE_ERR = COMMAND_RESPONSE | byte(2) diff --git a/pkg/storage/protocol/protocol_test.go b/pkg/storage/protocol/protocol_test.go index a73327f7..c24e7e72 100644 --- a/pkg/storage/protocol/protocol_test.go +++ b/pkg/storage/protocol/protocol_test.go @@ -111,3 +111,43 @@ func TestWriteAtResponse(t *testing.T) { assert.Error(t, rare.Error) } + +func TestNeedAt(t *testing.T) { + + b := EncodeNeedAt(12345, 10) + + off, length, err := DecodeNeedAt(b) + assert.NoError(t, err) + assert.Equal(t, int64(12345), off) + assert.Equal(t, int32(10), length) + + // Make sure we can't decode silly things + _, _, err = DecodeNeedAt(nil) + assert.Error(t, err) + + _, _, err = DecodeNeedAt([]byte{ + 99, + }) + assert.Error(t, err) + +} + +func TestDirtyList(t *testing.T) { + + blocks := []uint32{1, 7, 100} + b := EncodeDirtyList(blocks) + + blocks2, err := DecodeDirtyList(b) + assert.NoError(t, err) + assert.Equal(t, blocks, blocks2) + + // Make sure we can't decode silly things + _, err = DecodeDirtyList(nil) + assert.Error(t, err) + + _, err = DecodeDirtyList([]byte{ + 99, + }) + assert.Error(t, err) + +}