Skip to content

Commit

Permalink
Lots of changes and updates. block_splitter. sharded_storage now oper…
Browse files Browse the repository at this point in the history
…ates concurrently.
  • Loading branch information
jimmyaxod committed Feb 7, 2024
1 parent a0f24fa commit 3849e99
Show file tree
Hide file tree
Showing 18 changed files with 555 additions and 205 deletions.
4 changes: 4 additions & 0 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func runConnect(ccmd *cobra.Command, args []string) {
go dest.HandleReadAt()
go dest.HandleWriteAt()

go dest.HandleDirtyList(func(dirty []uint) {
fmt.Printf("GOT LIST OF DIRTY BLOCKS %v\n", dirty)
})

// Something to randomly read...
go func() {
for {
Expand Down
21 changes: 21 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,27 @@ func runServe(ccmd *cobra.Command, args []string) {

// Now do the migration...
err = mig.Migrate()

for {
blocks := mig.GetLatestDirty()
if blocks == nil {
break
}

// Optional: Send the list of dirty blocks over...
dest.DirtyList(blocks)

err := mig.MigrateDirty(blocks)
if err != nil {
panic(err)
}
}

err = mig.WaitForCompletion()
if err != nil {
panic(err)
}

fmt.Printf("MIGRATION DONE %v\n", err)

c.Close()
Expand Down
4 changes: 2 additions & 2 deletions internal/expose/ndb_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func BenchmarkDevRead(mb *testing.B) {
// Setup...
// Lets simulate a little latency
store := sources.NewMemoryStorage(int(diskSize))
store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 100*time.Millisecond)
store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0)
driver := modules.NewMetrics(store_latency)

var d NBDDispatcher
Expand Down Expand Up @@ -131,7 +131,7 @@ func BenchmarkDevWrite(mb *testing.B) {
// Setup...
// Lets simulate a little latency
store := sources.NewMemoryStorage(int(diskSize))
store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 100*time.Millisecond)
store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0)
driver := modules.NewMetrics(store_latency)

var d NBDDispatcher
Expand Down
193 changes: 107 additions & 86 deletions pkg/storage/migrator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package storage

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -10,6 +9,9 @@ import (
"github.com/loopholelabs/silo/pkg/storage/util"
)

type MigratorConfig struct {
}

type Migrator struct {
src_track TrackingStorageProvider
dest StorageProvider
Expand All @@ -18,15 +20,18 @@ type Migrator struct {
block_size int
num_blocks int
metric_moved_blocks int64
moving_blocks *util.Bitfield
migrated_blocks *util.Bitfield
clean_blocks *util.Bitfield
block_order BlockOrder
ctime time.Time

// Our queue for dirty blocks (remigration)
blocks_to_move chan uint
blocks_by_n map[uint]bool
blocks_by_n_lock sync.Mutex
blocks_to_move chan uint
blocks_by_n map[uint]bool

concurrency map[int]chan bool
wg sync.WaitGroup
}

func NewMigrator(source TrackingStorageProvider,
Expand All @@ -35,8 +40,17 @@ func NewMigrator(source TrackingStorageProvider,
lock_fn func(),
unlock_fn func(),
block_order BlockOrder) *Migrator {

// TODO: Pass in configuration...
concurrency_by_block := map[int]int{
BlockTypeAny: 32,
BlockTypeStandard: 32,
BlockTypeDirty: 100,
BlockTypePriority: 16,
}

num_blocks := (int(source.Size()) + block_size - 1) / block_size
return &Migrator{
m := &Migrator{
dest: dest,
src_track: source,
src_lock_fn: lock_fn,
Expand All @@ -49,47 +63,40 @@ func NewMigrator(source TrackingStorageProvider,
clean_blocks: util.NewBitfield(num_blocks),
blocks_to_move: make(chan uint, num_blocks),
blocks_by_n: make(map[uint]bool),
concurrency: make(map[int]chan bool),
}
}

/**
* Migrate storage to dest.
*/
func (m *Migrator) Migrate() error {
if m.dest.Size() != m.src_track.Size() {
return errors.New("Sizes must be equal for migration")
// TODO: Return as error
panic("Sizes must be equal for migration")
}

m.ctime = time.Now()

concurrency_by_block := map[int]int{
BlockTypeAny: 32,
BlockTypeStandard: 32,
BlockTypeDirty: 100,
BlockTypePriority: 16,
}

concurrency := make(map[int]chan bool) // max concurrency by block type
// Initialize concurrency channels
for b, v := range concurrency_by_block {
concurrency[b] = make(chan bool, v)
m.concurrency[b] = make(chan bool, v)
}
return m
}

var wg sync.WaitGroup
/**
* Migrate storage to dest.
*/
func (m *Migrator) Migrate() error {
m.ctime = time.Now()

for {
// This will get the next queued migration, OR from the priority list
i, done := m.getNextBlock()
if done {
i := m.block_order.GetNext()
if i == BlockInfoFinish {
break
}

cc, ok := concurrency[i.Type]
cc, ok := m.concurrency[i.Type]
if !ok {
cc = concurrency[BlockTypeAny]
cc = m.concurrency[BlockTypeAny]
}
cc <- true

wg.Add(1)
m.wg.Add(1)

go func(block_no *BlockInfo) {
err := m.migrateBlock(block_no.Block)
Expand All @@ -102,20 +109,86 @@ func (m *Migrator) Migrate() error {

fmt.Printf("Block moved %d\n", block_no)
fmt.Printf("DATA %d,%d,,,,\n", time.Now().UnixMilli(), block_no)
wg.Done()
m.wg.Done()

cc, ok := concurrency[block_no.Type]
cc, ok := m.concurrency[block_no.Type]
if !ok {
cc = concurrency[BlockTypeAny]
cc = m.concurrency[BlockTypeAny]
}
<-cc
}(i)
}
return nil
}

/**
* Get the latest dirty blocks.
* If there a no more dirty blocks, we leave the src locked.
*/
func (m *Migrator) GetLatestDirty() []uint {
// Queue up some dirty blocks
m.src_lock_fn()

// Check for any dirty blocks to be added on
blocks := m.src_track.Sync()
changed := blocks.Count(0, blocks.Length())
if changed != 0 {
m.src_unlock_fn()

block_nos := blocks.Collect(0, blocks.Length())
return block_nos
}
return nil
}

/**
* MigrateDirty migrates a list of dirty blocks.
*/
func (m *Migrator) MigrateDirty(blocks []uint) error {
m.ctime = time.Now()

for _, pos := range blocks {
i := &BlockInfo{Block: int(pos), Type: BlockTypeDirty}

cc, ok := m.concurrency[i.Type]
if !ok {
cc = m.concurrency[BlockTypeAny]
}
cc <- true

m.wg.Add(1)

go func(block_no *BlockInfo) {
err := m.migrateBlock(block_no.Block)
if err != nil {
// TODO: Collect errors properly. Abort? Retry? fail?
fmt.Printf("ERROR moving block %v\n", err)
}

m.showProgress()

fmt.Printf("Dirty block moved %d\n", block_no)
fmt.Printf("DATA %d,%d,,,,\n", time.Now().UnixMilli(), block_no)
m.wg.Done()

cc, ok := m.concurrency[block_no.Type]
if !ok {
cc = m.concurrency[BlockTypeAny]
}
<-cc
}(i)

m.clean_blocks.ClearBit(int(pos))

m.showProgress()
}

fmt.Printf("Waiting for pending transfers...\n")
wg.Wait()
return nil
}

func (m *Migrator) WaitForCompletion() error {
fmt.Printf("Waiting for pending transfers...\n")
m.wg.Wait()
m.showProgress()
return nil
}
Expand Down Expand Up @@ -172,55 +245,3 @@ func (m *Migrator) migrateBlock(block int) error {
m.clean_blocks.SetBit(block)
return nil
}

/**
* Check for any dirty blocks, and add them to the queue
*
*/
func (m *Migrator) queueDirtyBlocks() bool {
m.src_lock_fn()

// Check for any dirty blocks to be added on
blocks := m.src_track.Sync()
changed := blocks.Count(0, blocks.Length())
if changed != 0 {
m.src_unlock_fn()

fmt.Printf("Got %d more dirty blocks...\n", changed)

blocks.Exec(0, blocks.Length(), func(pos uint) bool {
m.blocks_by_n_lock.Lock()
_, ok := m.blocks_by_n[pos] // Dedup pending by block
if !ok {
m.blocks_to_move <- pos
m.blocks_by_n[pos] = true
m.clean_blocks.ClearBit(int(pos))
}
m.blocks_by_n_lock.Unlock()
return true
})
m.showProgress()
return true
}
return false
}

func (m *Migrator) getNextBlock() (*BlockInfo, bool) {
bl := m.block_order.GetNext()
if bl != BlockInfoFinish {
return bl, false
}

// Migration complete, now do dirty blocks...
m.queueDirtyBlocks()

if len(m.blocks_to_move) == 0 {
return nil, true
}

m.blocks_by_n_lock.Lock()
i := <-m.blocks_to_move
delete(m.blocks_by_n, i)
m.blocks_by_n_lock.Unlock()
return &BlockInfo{Block: int(i), Type: BlockTypeDirty}, false
}
26 changes: 18 additions & 8 deletions pkg/storage/modules/artificial_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@ import (
*
*/
type ArtificialLatency struct {
lock sync.RWMutex
prov storage.StorageProvider
latencyRead time.Duration
latencyWrite time.Duration
lock sync.RWMutex
prov storage.StorageProvider
latencyRead time.Duration
latencyWrite time.Duration
latencyReadPerByte time.Duration
latencyWritePerByte time.Duration
}

func NewArtificialLatency(prov storage.StorageProvider, latencyRead time.Duration, latencyWrite time.Duration) *ArtificialLatency {
func NewArtificialLatency(prov storage.StorageProvider, latencyRead time.Duration, latencyReadPerByte time.Duration, latencyWrite time.Duration, latencyWritePerByte time.Duration) *ArtificialLatency {
return &ArtificialLatency{
prov: prov,
latencyRead: latencyRead,
latencyWrite: latencyWrite,
prov: prov,
latencyRead: latencyRead,
latencyWrite: latencyWrite,
latencyReadPerByte: latencyReadPerByte,
latencyWritePerByte: latencyWritePerByte,
}
}

Expand All @@ -33,6 +37,9 @@ func (i *ArtificialLatency) ReadAt(buffer []byte, offset int64) (int, error) {
if i.latencyRead != 0 {
time.Sleep(i.latencyRead)
}
if i.latencyReadPerByte != 0 {
time.Sleep(i.latencyReadPerByte * time.Duration(len(buffer)))
}
return i.prov.ReadAt(buffer, offset)
}

Expand All @@ -42,6 +49,9 @@ func (i *ArtificialLatency) WriteAt(buffer []byte, offset int64) (int, error) {
if i.latencyWrite != 0 {
time.Sleep(i.latencyWrite)
}
if i.latencyWritePerByte != 0 {
time.Sleep(i.latencyWritePerByte * time.Duration(len(buffer)))
}
return i.prov.WriteAt(buffer, offset)
}

Expand Down
Loading

0 comments on commit 3849e99

Please sign in to comment.