Skip to content

Commit

Permalink
Started tidying up migrator
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 13, 2024
1 parent c686c2d commit 12023ec
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 36 deletions.
2 changes: 1 addition & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func runConnect(ccmd *cobra.Command, args []string) {
pro := protocol.NewProtocolRW(context.TODO(), con, con)
dest := modules.NewFromProtocol(777, destStorageMetrics, pro)

// Connect the waitingCache to the FromProtocol
destWaiting.NeedAt = func(offset int64, length int32) {
fmt.Printf("Asking to prioritize range %d len %d\n", offset, length)
dest.NeedAt(offset, length)
}

Expand Down
11 changes: 9 additions & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,24 @@ func runServe(ccmd *cobra.Command, args []string) {
b_end := int((end-1)/uint64(block_size)) + 1
for b := b_start; b < b_end; b++ {
// Ask the orderer to prioritize these blocks...
fmt.Printf("ORDER - Prioritizing block %d\n", b)
orderer.PrioritiseBlock(b)
}
})

mig := storage.NewMigrator(sourceDirty,
mig, err := storage.NewMigrator(sourceDirty,
dest,
block_size,
locker,
unlocker,
orderer)

if err != nil {
panic(err)
}

// Now do the migration...
err = mig.Migrate()
mig.ShowProgress()

for {
blocks := mig.GetLatestDirty()
Expand All @@ -189,6 +193,8 @@ func runServe(ccmd *cobra.Command, args []string) {
if err != nil {
panic(err)
}
fmt.Printf("DIRTY BLOCKS %d\n", len(blocks))
mig.ShowProgress()
}

err = mig.WaitForCompletion()
Expand All @@ -197,6 +203,7 @@ func runServe(ccmd *cobra.Command, args []string) {
}

fmt.Printf("MIGRATION DONE %v\n", err)
mig.ShowProgress()

c.Close()
}
Expand Down
44 changes: 12 additions & 32 deletions pkg/storage/migrator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -13,7 +14,7 @@ type MigratorConfig struct {
}

type Migrator struct {
src_track TrackingStorageProvider
src_track TrackingStorageProvider // Tracks writes so we know which are dirty
dest StorageProvider
src_lock_fn func()
src_unlock_fn func()
Expand All @@ -25,21 +26,16 @@ type Migrator struct {
clean_blocks *util.Bitfield
block_order BlockOrder
ctime time.Time

// Our queue for dirty blocks (remigration)
blocks_to_move chan uint
blocks_by_n map[uint]bool

concurrency map[int]chan bool
wg sync.WaitGroup
concurrency map[int]chan bool
wg sync.WaitGroup
}

func NewMigrator(source TrackingStorageProvider,
dest StorageProvider,
block_size int,
lock_fn func(),
unlock_fn func(),
block_order BlockOrder) *Migrator {
block_order BlockOrder) (*Migrator, error) {

// TODO: Pass in configuration...
concurrency_by_block := map[int]int{
Expand All @@ -61,25 +57,22 @@ func NewMigrator(source TrackingStorageProvider,
block_order: block_order,
migrated_blocks: util.NewBitfield(num_blocks),
clean_blocks: util.NewBitfield(num_blocks),
blocks_to_move: make(chan uint, num_blocks),
blocks_by_n: make(map[uint]bool),
concurrency: make(map[int]chan bool),
}

if m.dest.Size() != m.src_track.Size() {
// TODO: Return as error
panic("Sizes must be equal for migration")
return nil, errors.New("source and destination sizes must be equal for migration.")
}

// Initialize concurrency channels
for b, v := range concurrency_by_block {
m.concurrency[b] = make(chan bool, v)
}
return m
return m, nil
}

/**
* Migrate storage to dest.
* Migrate all storage to dest.
*/
func (m *Migrator) Migrate() error {
m.ctime = time.Now()
Expand All @@ -101,16 +94,13 @@ func (m *Migrator) Migrate() error {
go func(block_no *BlockInfo) {
err := m.migrateBlock(block_no.Block)
if err != nil {
// TODO: Collect errors properly. Abort? Retry? fail?
// If there was an error, we'll simply add it back to the block_order to be retried later for now.
m.block_order.Add(block_no.Block)
// TODO: Allow other options for error handling
fmt.Printf("ERROR moving block %v\n", err)
}

m.showProgress()

fmt.Printf("Block moved %d\n", block_no)
fmt.Printf("DATA %d,%d,,,,\n", time.Now().UnixMilli(), block_no)
m.wg.Done()

cc, ok := m.concurrency[block_no.Type]
if !ok {
cc = m.concurrency[BlockTypeAny]
Expand Down Expand Up @@ -164,12 +154,7 @@ func (m *Migrator) MigrateDirty(blocks []uint) error {
fmt.Printf("ERROR moving block %v\n", err)
}

m.showProgress()

fmt.Printf("Dirty block moved %d\n", block_no)
fmt.Printf("DATA %d,%d,,,,\n", time.Now().UnixMilli(), block_no)
m.wg.Done()

cc, ok := m.concurrency[block_no.Type]
if !ok {
cc = m.concurrency[BlockTypeAny]
Expand All @@ -178,25 +163,20 @@ func (m *Migrator) MigrateDirty(blocks []uint) error {
}(i)

m.clean_blocks.ClearBit(int(pos))

m.showProgress()
}

return nil
}

func (m *Migrator) WaitForCompletion() error {
fmt.Printf("Waiting for pending transfers...\n")
m.wg.Wait()
m.showProgress()
return nil
}

/**
* Show progress...
*
*/
func (m *Migrator) showProgress() {
func (m *Migrator) ShowProgress() {
migrated := m.migrated_blocks.Count(0, uint(m.num_blocks))
perc_mig := float64(migrated*100) / float64(m.num_blocks)

Expand Down
7 changes: 6 additions & 1 deletion testing/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ func TestMigrator(t *testing.T) {
destWaiting := modules.NewWaitingCache(destStorage, blockSize)
destStorageMetrics := modules.NewMetrics(destWaiting)

mig := storage.NewMigrator(sourceDirty,
mig, err := storage.NewMigrator(sourceDirty,
destWaiting,
blockSize,
locker,
unlocker,
orderer)

assert.NoError(t, err)
lat_avg := pkg.NewReadings()

// Set something up to read dest...
Expand Down Expand Up @@ -172,19 +173,23 @@ func TestMigrator(t *testing.T) {

m_start := time.Now()
mig.Migrate()
mig.ShowProgress()

for {
blocks := mig.GetLatestDirty()
if blocks == nil {
break
}
fmt.Printf("Got %d dirty blocks to move...\n", len(blocks))
err := mig.MigrateDirty(blocks)
assert.NoError(t, err)
mig.ShowProgress()
}

err = mig.WaitForCompletion()
assert.NoError(t, err)
fmt.Printf("Migration took %d ms\n", time.Since(m_start).Milliseconds())
mig.ShowProgress()

// This will end with migration completed, and consumer Locked.
destStorageMetrics.ShowStats("dest")
Expand Down

0 comments on commit 12023ec

Please sign in to comment.