From 12023ece6adc65022215c8b74f11cbbdb328b1cc Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 13 Feb 2024 16:34:02 +0000 Subject: [PATCH] Started tidying up migrator --- cmd/connect.go | 2 +- cmd/serve.go | 11 ++++++++-- pkg/storage/migrator.go | 44 +++++++++++----------------------------- testing/migrator_test.go | 7 ++++++- 4 files changed, 28 insertions(+), 36 deletions(-) diff --git a/cmd/connect.go b/cmd/connect.go index df48b118..f9d01c8c 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -67,8 +67,8 @@ func runConnect(ccmd *cobra.Command, args []string) { pro := protocol.NewProtocolRW(context.TODO(), con, con) dest := modules.NewFromProtocol(777, destStorageMetrics, pro) + // Connect the waitingCache to the FromProtocol destWaiting.NeedAt = func(offset int64, length int32) { - fmt.Printf("Asking to prioritize range %d len %d\n", offset, length) dest.NeedAt(offset, length) } diff --git a/cmd/serve.go b/cmd/serve.go index fbea96dc..73a34a03 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -161,20 +161,24 @@ func runServe(ccmd *cobra.Command, args []string) { b_end := int((end-1)/uint64(block_size)) + 1 for b := b_start; b < b_end; b++ { // Ask the orderer to prioritize these blocks... - fmt.Printf("ORDER - Prioritizing block %d\n", b) orderer.PrioritiseBlock(b) } }) - mig := storage.NewMigrator(sourceDirty, + mig, err := storage.NewMigrator(sourceDirty, dest, block_size, locker, unlocker, orderer) + if err != nil { + panic(err) + } + // Now do the migration... err = mig.Migrate() + mig.ShowProgress() for { blocks := mig.GetLatestDirty() @@ -189,6 +193,8 @@ func runServe(ccmd *cobra.Command, args []string) { if err != nil { panic(err) } + fmt.Printf("DIRTY BLOCKS %d\n", len(blocks)) + mig.ShowProgress() } err = mig.WaitForCompletion() @@ -197,6 +203,7 @@ func runServe(ccmd *cobra.Command, args []string) { } fmt.Printf("MIGRATION DONE %v\n", err) + mig.ShowProgress() c.Close() } diff --git a/pkg/storage/migrator.go b/pkg/storage/migrator.go index 7d5e0b3d..c5aebbeb 100644 --- a/pkg/storage/migrator.go +++ b/pkg/storage/migrator.go @@ -1,6 +1,7 @@ package storage import ( + "errors" "fmt" "sync" "sync/atomic" @@ -13,7 +14,7 @@ type MigratorConfig struct { } type Migrator struct { - src_track TrackingStorageProvider + src_track TrackingStorageProvider // Tracks writes so we know which are dirty dest StorageProvider src_lock_fn func() src_unlock_fn func() @@ -25,13 +26,8 @@ type Migrator struct { clean_blocks *util.Bitfield block_order BlockOrder ctime time.Time - - // Our queue for dirty blocks (remigration) - blocks_to_move chan uint - blocks_by_n map[uint]bool - - concurrency map[int]chan bool - wg sync.WaitGroup + concurrency map[int]chan bool + wg sync.WaitGroup } func NewMigrator(source TrackingStorageProvider, @@ -39,7 +35,7 @@ func NewMigrator(source TrackingStorageProvider, block_size int, lock_fn func(), unlock_fn func(), - block_order BlockOrder) *Migrator { + block_order BlockOrder) (*Migrator, error) { // TODO: Pass in configuration... concurrency_by_block := map[int]int{ @@ -61,25 +57,22 @@ func NewMigrator(source TrackingStorageProvider, block_order: block_order, migrated_blocks: util.NewBitfield(num_blocks), clean_blocks: util.NewBitfield(num_blocks), - blocks_to_move: make(chan uint, num_blocks), - blocks_by_n: make(map[uint]bool), concurrency: make(map[int]chan bool), } if m.dest.Size() != m.src_track.Size() { - // TODO: Return as error - panic("Sizes must be equal for migration") + return nil, errors.New("source and destination sizes must be equal for migration.") } // Initialize concurrency channels for b, v := range concurrency_by_block { m.concurrency[b] = make(chan bool, v) } - return m + return m, nil } /** - * Migrate storage to dest. + * Migrate all storage to dest. */ func (m *Migrator) Migrate() error { m.ctime = time.Now() @@ -101,16 +94,13 @@ func (m *Migrator) Migrate() error { go func(block_no *BlockInfo) { err := m.migrateBlock(block_no.Block) if err != nil { - // TODO: Collect errors properly. Abort? Retry? fail? + // If there was an error, we'll simply add it back to the block_order to be retried later for now. + m.block_order.Add(block_no.Block) + // TODO: Allow other options for error handling fmt.Printf("ERROR moving block %v\n", err) } - m.showProgress() - - fmt.Printf("Block moved %d\n", block_no) - fmt.Printf("DATA %d,%d,,,,\n", time.Now().UnixMilli(), block_no) m.wg.Done() - cc, ok := m.concurrency[block_no.Type] if !ok { cc = m.concurrency[BlockTypeAny] @@ -164,12 +154,7 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { fmt.Printf("ERROR moving block %v\n", err) } - m.showProgress() - - fmt.Printf("Dirty block moved %d\n", block_no) - fmt.Printf("DATA %d,%d,,,,\n", time.Now().UnixMilli(), block_no) m.wg.Done() - cc, ok := m.concurrency[block_no.Type] if !ok { cc = m.concurrency[BlockTypeAny] @@ -178,17 +163,12 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { }(i) m.clean_blocks.ClearBit(int(pos)) - - m.showProgress() } - return nil } func (m *Migrator) WaitForCompletion() error { - fmt.Printf("Waiting for pending transfers...\n") m.wg.Wait() - m.showProgress() return nil } @@ -196,7 +176,7 @@ func (m *Migrator) WaitForCompletion() error { * Show progress... * */ -func (m *Migrator) showProgress() { +func (m *Migrator) ShowProgress() { migrated := m.migrated_blocks.Count(0, uint(m.num_blocks)) perc_mig := float64(migrated*100) / float64(m.num_blocks) diff --git a/testing/migrator_test.go b/testing/migrator_test.go index 06e0dc95..74157742 100644 --- a/testing/migrator_test.go +++ b/testing/migrator_test.go @@ -118,13 +118,14 @@ func TestMigrator(t *testing.T) { destWaiting := modules.NewWaitingCache(destStorage, blockSize) destStorageMetrics := modules.NewMetrics(destWaiting) - mig := storage.NewMigrator(sourceDirty, + mig, err := storage.NewMigrator(sourceDirty, destWaiting, blockSize, locker, unlocker, orderer) + assert.NoError(t, err) lat_avg := pkg.NewReadings() // Set something up to read dest... @@ -172,19 +173,23 @@ func TestMigrator(t *testing.T) { m_start := time.Now() mig.Migrate() + mig.ShowProgress() for { blocks := mig.GetLatestDirty() if blocks == nil { break } + fmt.Printf("Got %d dirty blocks to move...\n", len(blocks)) err := mig.MigrateDirty(blocks) assert.NoError(t, err) + mig.ShowProgress() } err = mig.WaitForCompletion() assert.NoError(t, err) fmt.Printf("Migration took %d ms\n", time.Since(m_start).Milliseconds()) + mig.ShowProgress() // This will end with migration completed, and consumer Locked. destStorageMetrics.ShowStats("dest")