From 53a05e9386d5a8c9f8f0a14fc68305b9bcf14ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 15 Oct 2024 17:32:21 +0200 Subject: [PATCH 1/8] feat(restore): make batches retryable Now, if batch restoration failed on one node, it can still be retried by other nodes. Failed node is no longer used for the restore. Fixes #4065 --- pkg/service/restore/batch.go | 145 ++++++++++++++++++++++----- pkg/service/restore/batch_test.go | 3 +- pkg/service/restore/model.go | 4 + pkg/service/restore/tables_worker.go | 6 +- 4 files changed, 132 insertions(+), 26 deletions(-) diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index 3af2469d4..98dc953ea 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -11,7 +11,10 @@ import ( ) type batchDispatcher struct { - mu sync.Mutex + mu sync.Mutex + wait chan struct{} + + remainingBytes int64 workload []LocationWorkload batchSize int expectedShardWorkload int64 @@ -34,6 +37,8 @@ func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt } return &batchDispatcher{ mu: sync.Mutex{}, + wait: make(chan struct{}), + remainingBytes: size, workload: workload, batchSize: batchSize, expectedShardWorkload: size / int64(shards), @@ -90,8 +95,11 @@ func (b batch) IDs() []string { } // ValidateAllDispatched returns error if not all sstables were dispatched. -func (b *batchDispatcher) ValidateAllDispatched() error { - for _, lw := range b.workload { +func (bd *batchDispatcher) ValidateAllDispatched() error { + bd.mu.Lock() + defer bd.mu.Unlock() + + for _, lw := range bd.workload { if lw.Size != 0 { for _, tw := range lw.Tables { if tw.Size != 0 { @@ -109,44 +117,71 @@ func (b *batchDispatcher) ValidateAllDispatched() error { lw.Location, lw.Size) } } + if !bd.done() { + return errors.Errorf("expected all data to be restored, internal progress calculation error") + } return nil } -// DispatchBatch batch to be restored or false when there is no more work to do. -func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) { - b.mu.Lock() - defer b.mu.Unlock() +// DispatchBatch returns batch restored or false when there is no more work to do. +// This method might hang and wait for sstables that might come from batches that +// failed to be restored. Because of that, it's important to call ReportSuccess +// or ReportFailure after each dispatched batch was attempted to be restored. +func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) { + for { + bd.mu.Lock() + + if bd.done() { + bd.mu.Unlock() + return batch{}, false + } + b, ok := bd.dispatchBatch(host) + wait := bd.wait - l := b.chooseLocation(host) + bd.mu.Unlock() + + if ok { + return b, true + } + <-wait + } +} + +func (bd *batchDispatcher) done() bool { + return bd.remainingBytes == 0 +} + +func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { + l := bd.chooseLocation(host) if l == nil { return batch{}, false } - t := b.chooseTable(l) + t := bd.chooseTable(l) if t == nil { return batch{}, false } - dir := b.chooseRemoteDir(t) + dir := bd.chooseRemoteDir(t) if dir == nil { return batch{}, false } - return b.createBatch(l, t, dir, host) + return bd.createBatch(l, t, dir, host) } // Returns location for which batch should be created. -func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload { - for i := range b.workload { - if b.workload[i].Size == 0 { +func (bd *batchDispatcher) chooseLocation(host string) *LocationWorkload { + for i := range bd.workload { + if bd.workload[i].Size == 0 { continue } - if slices.Contains(b.locationHosts[b.workload[i].Location], host) { - return &b.workload[i] + if slices.Contains(bd.locationHosts[bd.workload[i].Location], host) { + return &bd.workload[i] } } return nil } // Returns table for which batch should be created. -func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { +func (bd *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { for i := range location.Tables { if location.Tables[i].Size == 0 { continue @@ -157,7 +192,7 @@ func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload } // Return remote dir for which batch should be created. -func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { +func (bd *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { for i := range table.RemoteDirs { if table.RemoteDirs[i].Size == 0 { continue @@ -168,18 +203,18 @@ func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorklo } // Returns batch and updates RemoteDirWorkload and its parents. -func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload, host string) (batch, bool) { - shardCnt := b.hostShardCnt[host] +func (bd *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload, host string) (batch, bool) { + shardCnt := bd.hostShardCnt[host] if shardCnt == 0 { shardCnt = 1 } var i int var size int64 - if b.batchSize == maxBatchSize { + if bd.batchSize == maxBatchSize { // Create batch containing multiple of node shard count sstables // and size up to 5% of expected node workload. - expectedNodeWorkload := b.expectedShardWorkload * int64(shardCnt) + expectedNodeWorkload := bd.expectedShardWorkload * int64(shardCnt) sizeLimit := expectedNodeWorkload / 20 for { for j := 0; j < int(shardCnt); j++ { @@ -198,7 +233,7 @@ func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir } } else { // Create batch containing node_shard_count*batch_size sstables. - i = min(b.batchSize*int(shardCnt), len(dir.SSTables)) + i = min(bd.batchSize*int(shardCnt), len(dir.SSTables)) for j := 0; j < i; j++ { size += dir.SSTables[j].Size } @@ -230,6 +265,70 @@ func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir }, true } +// ReportSuccess notifies batchDispatcher that given batch was restored successfully. +func (bd *batchDispatcher) ReportSuccess(b batch) { + bd.mu.Lock() + defer bd.mu.Unlock() + + bd.remainingBytes -= b.Size + if bd.done() { + bd.wakeUpWaiting() + } +} + +// ReportFailure notifies batchDispatcher that given batch failed to be restored. +func (bd *batchDispatcher) ReportFailure(b batch) error { + bd.mu.Lock() + defer bd.mu.Unlock() + + var ( + lw *LocationWorkload + tw *TableWorkload + rw *RemoteDirWorkload + ) + for i := range bd.workload { + if bd.workload[i].Location == b.Location { + lw = &bd.workload[i] + } + } + if lw == nil { + return errors.Errorf("unknown location %s", b.Location) + } + for i := range lw.Tables { + if lw.Tables[i].TableName == b.TableName { + tw = &lw.Tables[i] + } + } + if tw == nil { + return errors.Errorf("unknown table %s", b.TableName) + } + for i := range tw.RemoteDirs { + if tw.RemoteDirs[i].RemoteSSTableDir == b.RemoteSSTableDir { + rw = &tw.RemoteDirs[i] + } + } + if rw == nil { + return errors.Errorf("unknown remote sstable dir %s", b.RemoteSSTableDir) + } + + var newSST []RemoteSSTable + newSST = append(newSST, b.SSTables...) + newSST = append(newSST, rw.SSTables...) + + rw.SSTables = newSST + rw.Size += b.Size + tw.Size += b.Size + lw.Size += b.Size + + bd.wakeUpWaiting() + return nil +} + +func (bd *batchDispatcher) wakeUpWaiting() { + close(bd.wait) + bd.wait = make(chan struct{}) +} + func sortWorkloadBySizeDesc(workload []LocationWorkload) { slices.SortFunc(workload, func(a, b LocationWorkload) int { return int(b.Size - a.Size) diff --git a/pkg/service/restore/batch_test.go b/pkg/service/restore/batch_test.go index cf87a8a51..f8040b0d8 100644 --- a/pkg/service/restore/batch_test.go +++ b/pkg/service/restore/batch_test.go @@ -118,7 +118,7 @@ func TestBatchDispatcher(t *testing.T) { } for _, step := range scenario { - b, ok := bd.DispatchBatch(step.host) + b, ok := bd.dispatchBatch(step.host) if ok != step.ok { t.Fatalf("Step: %+v, expected ok=%v, got ok=%v", step, step.ok, ok) } @@ -134,6 +134,7 @@ func TestBatchDispatcher(t *testing.T) { if len(b.SSTables) != step.count { t.Fatalf("Step: %+v, expected count=%v, got count=%v", step, step.count, len(b.SSTables)) } + bd.ReportSuccess(b) } if err := bd.ValidateAllDispatched(); err != nil { diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index da616b515..b8d4bf19d 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -296,6 +296,10 @@ type TableName struct { Table string } +func (t TableName) String() string { + return t.Keyspace + "." + t.Table +} + // HostInfo represents host with rclone download config. type HostInfo struct { Host string diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index e35409f70..8001d3293 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -16,6 +16,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" "github.com/scylladb/scylla-manager/v3/pkg/util/query" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" + "go.uber.org/multierr" ) type tablesWorker struct { @@ -227,11 +228,12 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { pr, err := w.newRunProgress(ctx, hi, b) if err != nil { - return errors.Wrap(err, "create new run progress") + return multierr.Append(errors.Wrap(err, "create new run progress"), bd.ReportFailure(b)) } if err := w.restoreBatch(ctx, b, pr); err != nil { - return errors.Wrap(err, "restore batch") + return multierr.Append(errors.Wrap(err, "restore batch"), bd.ReportFailure(b)) } + bd.ReportSuccess(b) w.decreaseRemainingBytesMetric(b) } } From 1e0a40d2383f1322b54eaa0a7b1995c14228f560 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 15 Oct 2024 18:14:53 +0200 Subject: [PATCH 2/8] feat(restore_test): test batch retry This commit adds TestRestoreTablesBatchRetryIntegration, which injects errors during download and LAS step and validates that restore finished successfully despite them (thanks to batch retries). --- .../restore/restore_integration_test.go | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 318377cd5..43bd33da6 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -16,6 +16,8 @@ import ( "time" "github.com/pkg/errors" + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/service/backup" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" . "github.com/scylladb/scylla-manager/v3/pkg/testutils" @@ -25,6 +27,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/util/maputil" "github.com/scylladb/scylla-manager/v3/pkg/util/query" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" + "go.uber.org/zap/zapcore" ) func TestRestoreTablesUserIntegration(t *testing.T) { @@ -691,3 +694,88 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) } + +func TestRestoreTablesBatchRetryIntegration(t *testing.T) { + h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) + // Ensure no built-in retries + clientCfg := scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()) + clientCfg.Backoff.MaxRetries = 0 + h.dstCluster.Client = newTestClient(t, h.dstCluster.Hrt, log.NewDevelopmentWithLevel(zapcore.InfoLevel).Named("client"), &clientCfg) + + Print("Keyspace setup") + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}" + ks := randomizedName("batch_retry_1_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab1 := randomizedName("tab_1_") + tab2 := randomizedName("tab_2_") + tab3 := randomizedName("tab_3_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab1)) + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab2)) + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab3)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab1)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab2)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab3)) + + Print("Fill setup") + fillTable(t, h.srcCluster.rootSession, 100, ks, tab1, tab2, tab3) + + Print("Run backup") + loc := []Location{testLocation("batch-retry", "")} + S3InitBucket(t, loc[0].Path) + ksFilter := []string{ks} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Inject errors") + downloadCnt := atomic.Int64{} + lasCnt := atomic.Int64{} + h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + // For this setup, we have 6 remote sstable dirs and 6 workers. + // We inject 2 errors during download and 3 errors during LAS. + // This means that only a single node will be restoring at the end. + // Huge batch size and 3 LAS errors guarantee total 9 calls to LAS. + // The last failed call to LAS (cnt=8) waits a bit so that we test + // that batch dispatcher correctly reuses and releases nodes waiting + // for failed sstables to come back to the batch dispatcher. + if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { + if cnt := downloadCnt.Add(1); cnt == 1 || cnt == 3 { + t.Log("Fake download error ", cnt) + return nil, errors.New("fake download error") + } + } + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { + cnt := lasCnt.Add(1) + if cnt == 8 { + time.Sleep(15 * time.Second) + } + if cnt == 1 || cnt == 5 || cnt == 8 { + t.Log("Fake LAS error ", cnt) + return nil, errors.New("fake las error") + } + } + return nil, nil + })) + + Print("Run restore tables") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "snapshot_tag": tag, + "restore_tables": true, + }) + + if cnt := lasCnt.Add(0); cnt < 9 { + t.Fatalf("Expected at least 9 calls to LAS, got %d", cnt) + } + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab1, "id", "data") + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab2, "id", "data") + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab3, "id", "data") +} From 3a38f28a6be4ddfb2b98c5662c20e1cd0c95c71c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 16 Oct 2024 12:34:47 +0200 Subject: [PATCH 3/8] refactor(restore): flatten workload structure After giving it some more thought, I decided to flatten workload structure. Instead of having location/table/dir layers, now everything operates on the dir layer. It makes the implementation easier, especially for the upcoming changes related to node retries. --- pkg/service/restore/batch.go | 187 ++++++++++-------------------- pkg/service/restore/batch_test.go | 152 ++++++++++++------------ pkg/service/restore/index.go | 177 +++++++++++----------------- 3 files changed, 205 insertions(+), 311 deletions(-) diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index 98dc953ea..d69aab41c 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -15,19 +15,15 @@ type batchDispatcher struct { wait chan struct{} remainingBytes int64 - workload []LocationWorkload + workload Workload batchSize int expectedShardWorkload int64 hostShardCnt map[string]uint locationHosts map[Location][]string } -func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { - sortWorkloadBySizeDesc(workload) - var size int64 - for _, t := range workload { - size += t.Size - } +func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { + sortWorkload(workload) var shards uint for _, sh := range hostShardCnt { shards += sh @@ -38,10 +34,10 @@ func newBatchDispatcher(workload []LocationWorkload, batchSize int, hostShardCnt return &batchDispatcher{ mu: sync.Mutex{}, wait: make(chan struct{}), - remainingBytes: size, + remainingBytes: workload.TotalSize, workload: workload, batchSize: batchSize, - expectedShardWorkload: size / int64(shards), + expectedShardWorkload: workload.TotalSize / int64(shards), hostShardCnt: hostShardCnt, locationHosts: locationHosts, } @@ -99,22 +95,10 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { bd.mu.Lock() defer bd.mu.Unlock() - for _, lw := range bd.workload { - if lw.Size != 0 { - for _, tw := range lw.Tables { - if tw.Size != 0 { - for _, dw := range tw.RemoteDirs { - if dw.Size != 0 || len(dw.SSTables) != 0 { - return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)", - dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size) - } - } - return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)", - tw.Location, tw.Keyspace, tw.Table, tw.Size) - } - } - return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)", - lw.Location, lw.Size) + for _, rdw := range bd.workload.RemoteDir { + if rdw.Size != 0 || len(rdw.SSTables) != 0 { + return errors.Errorf("expected all data to be restored, missing sstables from location %s table %s.%s: %v (%d bytes)", + rdw.Location, rdw.Keyspace, rdw.Table, rdw.SSTables, rdw.Size) } } if !bd.done() { @@ -152,58 +136,27 @@ func (bd *batchDispatcher) done() bool { } func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { - l := bd.chooseLocation(host) - if l == nil { - return batch{}, false - } - t := bd.chooseTable(l) - if t == nil { - return batch{}, false - } - dir := bd.chooseRemoteDir(t) - if dir == nil { - return batch{}, false - } - return bd.createBatch(l, t, dir, host) -} - -// Returns location for which batch should be created. -func (bd *batchDispatcher) chooseLocation(host string) *LocationWorkload { - for i := range bd.workload { - if bd.workload[i].Size == 0 { + var rdw *RemoteDirWorkload + for i, w := range bd.workload.RemoteDir { + // Skip empty dir + if w.Size == 0 { continue } - if slices.Contains(bd.locationHosts[bd.workload[i].Location], host) { - return &bd.workload[i] - } - } - return nil -} - -// Returns table for which batch should be created. -func (bd *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { - for i := range location.Tables { - if location.Tables[i].Size == 0 { + // Sip dir from location without access + if !slices.Contains(bd.locationHosts[w.Location], host) { continue } - return &location.Tables[i] + rdw = &bd.workload.RemoteDir[i] + break } - return nil -} - -// Return remote dir for which batch should be created. -func (bd *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { - for i := range table.RemoteDirs { - if table.RemoteDirs[i].Size == 0 { - continue - } - return &table.RemoteDirs[i] + if rdw == nil { + return batch{}, false } - return nil + return bd.createBatch(rdw, host) } // Returns batch and updates RemoteDirWorkload and its parents. -func (bd *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload, host string) (batch, bool) { +func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (batch, bool) { shardCnt := bd.hostShardCnt[host] if shardCnt == 0 { shardCnt = 1 @@ -218,13 +171,13 @@ func (bd *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, di sizeLimit := expectedNodeWorkload / 20 for { for j := 0; j < int(shardCnt); j++ { - if i >= len(dir.SSTables) { + if i >= len(rdw.SSTables) { break } - size += dir.SSTables[i].Size + size += rdw.SSTables[i].Size i++ } - if i >= len(dir.SSTables) { + if i >= len(rdw.SSTables) { break } if size > sizeLimit { @@ -233,9 +186,9 @@ func (bd *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, di } } else { // Create batch containing node_shard_count*batch_size sstables. - i = min(bd.batchSize*int(shardCnt), len(dir.SSTables)) + i = min(bd.batchSize*int(shardCnt), len(rdw.SSTables)) for j := 0; j < i; j++ { - size += dir.SSTables[j].Size + size += rdw.SSTables[j].Size } } @@ -244,22 +197,23 @@ func (bd *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, di } // Extend batch if it was to leave less than // 1 sstable per shard for the next one. - if len(dir.SSTables)-i < int(shardCnt) { - for ; i < len(dir.SSTables); i++ { - size += dir.SSTables[i].Size + if len(rdw.SSTables)-i < int(shardCnt) { + for ; i < len(rdw.SSTables); i++ { + size += rdw.SSTables[i].Size } } - sstables := dir.SSTables[:i] - dir.SSTables = dir.SSTables[i:] + sstables := rdw.SSTables[:i] + rdw.SSTables = rdw.SSTables[i:] - dir.Size -= size - t.Size -= size - l.Size -= size + rdw.Size -= size + bd.workload.TableSize[rdw.TableName] -= size + bd.workload.LocationSize[rdw.Location] -= size + bd.workload.TotalSize -= size return batch{ - TableName: dir.TableName, - ManifestInfo: dir.ManifestInfo, - RemoteSSTableDir: dir.RemoteSSTableDir, + TableName: rdw.TableName, + ManifestInfo: rdw.ManifestInfo, + RemoteSSTableDir: rdw.RemoteSSTableDir, Size: size, SSTables: sstables, }, true @@ -281,44 +235,24 @@ func (bd *batchDispatcher) ReportFailure(b batch) error { bd.mu.Lock() defer bd.mu.Unlock() - var ( - lw *LocationWorkload - tw *TableWorkload - rw *RemoteDirWorkload - ) - for i := range bd.workload { - if bd.workload[i].Location == b.Location { - lw = &bd.workload[i] - } - } - if lw == nil { - return errors.Errorf("unknown location %s", b.Location) - } - for i := range lw.Tables { - if lw.Tables[i].TableName == b.TableName { - tw = &lw.Tables[i] - } - } - if tw == nil { - return errors.Errorf("unknown table %s", b.TableName) - } - for i := range tw.RemoteDirs { - if tw.RemoteDirs[i].RemoteSSTableDir == b.RemoteSSTableDir { - rw = &tw.RemoteDirs[i] + var rdw *RemoteDirWorkload + for i := range bd.workload.RemoteDir { + if bd.workload.RemoteDir[i].RemoteSSTableDir == b.RemoteSSTableDir { + rdw = &bd.workload.RemoteDir[i] } } - if rw == nil { + if rdw == nil { return errors.Errorf("unknown remote sstable dir %s", b.RemoteSSTableDir) } var newSST []RemoteSSTable newSST = append(newSST, b.SSTables...) - newSST = append(newSST, rw.SSTables...) + newSST = append(newSST, rdw.SSTables...) - rw.SSTables = newSST - rw.Size += b.Size - tw.Size += b.Size - lw.Size += b.Size + rdw.SSTables = newSST + rdw.Size += b.Size + bd.workload.TableSize[b.TableName] += b.Size + bd.workload.LocationSize[b.Location] += b.Size bd.wakeUpWaiting() return nil @@ -329,23 +263,20 @@ func (bd *batchDispatcher) wakeUpWaiting() { bd.wait = make(chan struct{}) } -func sortWorkloadBySizeDesc(workload []LocationWorkload) { - slices.SortFunc(workload, func(a, b LocationWorkload) int { +func sortWorkload(workload Workload) { + // Order remote sstable dirs by table size, then by their size (decreasing). + slices.SortFunc(workload.RemoteDir, func(a, b RemoteDirWorkload) int { + ats := workload.TableSize[a.TableName] + bts := workload.TableSize[b.TableName] + if ats != bts { + return int(bts - ats) + } return int(b.Size - a.Size) }) - for _, loc := range workload { - slices.SortFunc(loc.Tables, func(a, b TableWorkload) int { + // Order sstables by their size (decreasing) + for _, rdw := range workload.RemoteDir { + slices.SortFunc(rdw.SSTables, func(a, b RemoteSSTable) int { return int(b.Size - a.Size) }) - for _, tab := range loc.Tables { - slices.SortFunc(tab.RemoteDirs, func(a, b RemoteDirWorkload) int { - return int(b.Size - a.Size) - }) - for _, dir := range tab.RemoteDirs { - slices.SortFunc(dir.SSTables, func(a, b RemoteSSTable) int { - return int(b.Size - a.Size) - }) - } - } } } diff --git a/pkg/service/restore/batch_test.go b/pkg/service/restore/batch_test.go index f8040b0d8..6fc4c118b 100644 --- a/pkg/service/restore/batch_test.go +++ b/pkg/service/restore/batch_test.go @@ -17,76 +17,88 @@ func TestBatchDispatcher(t *testing.T) { Provider: "s3", Path: "l2", } - workload := []LocationWorkload{ + + rawWorkload := []RemoteDirWorkload{ + { + ManifestInfo: &backupspec.ManifestInfo{ + Location: l1, + }, + TableName: TableName{ + Keyspace: "ks1", + Table: "t1", + }, + RemoteSSTableDir: "a", + Size: 20, + SSTables: []RemoteSSTable{ + {Size: 5}, + {Size: 15}, + }, + }, + { + ManifestInfo: &backupspec.ManifestInfo{ + Location: l1, + }, + TableName: TableName{ + Keyspace: "ks1", + Table: "t1", + }, + RemoteSSTableDir: "e", + Size: 10, + SSTables: []RemoteSSTable{ + {Size: 2}, + {Size: 4}, + {Size: 4}, + }, + }, + { + ManifestInfo: &backupspec.ManifestInfo{ + Location: l1, + }, + TableName: TableName{ + Keyspace: "ks1", + Table: "t1", + }, + RemoteSSTableDir: "b", + Size: 30, + SSTables: []RemoteSSTable{ + {Size: 10}, + {Size: 20}, + }, + }, { - Location: l1, - Size: 170, - Tables: []TableWorkload{ - { - Size: 60, - RemoteDirs: []RemoteDirWorkload{ - { - RemoteSSTableDir: "a", - Size: 20, - SSTables: []RemoteSSTable{ - {Size: 5}, - {Size: 15}, - }, - }, - { - RemoteSSTableDir: "e", - Size: 10, - SSTables: []RemoteSSTable{ - {Size: 2}, - {Size: 4}, - {Size: 4}, - }, - }, - { - RemoteSSTableDir: "b", - Size: 30, - SSTables: []RemoteSSTable{ - {Size: 10}, - {Size: 20}, - }, - }, - }, - }, - { - Size: 110, - RemoteDirs: []RemoteDirWorkload{ - { - RemoteSSTableDir: "c", - Size: 110, - SSTables: []RemoteSSTable{ - {Size: 50}, - {Size: 60}, - }, - }, - }, - }, + ManifestInfo: &backupspec.ManifestInfo{ + Location: l1, + }, + TableName: TableName{ + Keyspace: "ks1", + Table: "t2", + }, + RemoteSSTableDir: "c", + Size: 110, + SSTables: []RemoteSSTable{ + {Size: 50}, + {Size: 60}, }, }, { - Location: l2, - Size: 200, - Tables: []TableWorkload{ - { - Size: 200, - RemoteDirs: []RemoteDirWorkload{ - { - RemoteSSTableDir: "d", - Size: 200, - SSTables: []RemoteSSTable{ - {Size: 110}, - {Size: 90}, - }, - }, - }, - }, + ManifestInfo: &backupspec.ManifestInfo{ + Location: l2, + }, + TableName: TableName{ + Keyspace: "ks1", + Table: "t2", + }, + RemoteSSTableDir: "d", + Size: 200, + SSTables: []RemoteSSTable{ + {Size: 110}, + {Size: 90}, }, }, } + + workload := aggregateWorkload(rawWorkload) + locationHosts := map[backupspec.Location][]string{ l1: {"h1", "h2"}, l2: {"h3"}, @@ -120,19 +132,13 @@ func TestBatchDispatcher(t *testing.T) { for _, step := range scenario { b, ok := bd.dispatchBatch(step.host) if ok != step.ok { - t.Fatalf("Step: %+v, expected ok=%v, got ok=%v", step, step.ok, ok) + t.Errorf("Expected %v, got %#v", step, b) } if ok == false { - continue - } - if b.RemoteSSTableDir != step.dir { - t.Fatalf("Step: %+v, expected dir=%v, got dir=%v", step, step.dir, b.RemoteSSTableDir) - } - if b.Size != step.size { - t.Fatalf("Step: %+v, expected size=%v, got size=%v", step, step.size, b.Size) + return } - if len(b.SSTables) != step.count { - t.Fatalf("Step: %+v, expected count=%v, got count=%v", step, step.count, len(b.SSTables)) + if b.RemoteSSTableDir != step.dir || b.Size != step.size || len(b.SSTables) != step.count { + t.Errorf("Expected %v, got %#v", step, b) } bd.ReportSuccess(b) } diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index dd7b7b72c..32f91dee9 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -12,23 +12,12 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/sstable" ) -// LocationWorkload represents aggregated restore workload -// in given backup location. -type LocationWorkload struct { - Location - - Size int64 - Tables []TableWorkload -} - -// TableWorkload represents restore workload -// from many manifests for given table in given backup location. -type TableWorkload struct { - Location - TableName - - Size int64 - RemoteDirs []RemoteDirWorkload +// Workload represents total restore workload. +type Workload struct { + TotalSize int64 + LocationSize map[Location]int64 + TableSize map[TableName]int64 + RemoteDir []RemoteDirWorkload } // RemoteDirWorkload represents restore workload @@ -56,32 +45,32 @@ type SSTable struct { } // IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir. -func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) ([]LocationWorkload, error) { - var workload []LocationWorkload +func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) (Workload, error) { + var rawWorkload []RemoteDirWorkload for _, l := range locations { lw, err := w.indexLocationWorkload(ctx, l) if err != nil { - return nil, errors.Wrapf(err, "index workload in %s", l) + return Workload{}, errors.Wrapf(err, "index workload in %s", l) } - workload = append(workload, lw) + rawWorkload = append(rawWorkload, lw...) } + workload := aggregateWorkload(rawWorkload) + w.logWorkloadInfo(ctx, workload) return workload, nil } -func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) (LocationWorkload, error) { +func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { rawWorkload, err := w.createRemoteDirWorkloads(ctx, location) if err != nil { - return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") + return nil, errors.Wrap(err, "create remote dir workloads") } if w.target.Continue { rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload) if err != nil { - return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") + return nil, errors.Wrap(err, "filter already restored sstables") } } - workload := aggregateLocationWorkload(rawWorkload) - w.logWorkloadInfo(ctx, workload) - return workload, nil + return rawWorkload, nil } func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { @@ -179,26 +168,22 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, raw return filtered, nil } -func (w *tablesWorker) initMetrics(workload []LocationWorkload) { +func (w *tablesWorker) initMetrics(workload Workload) { // For now, the only persistent across task runs metrics are progress and remaining_bytes. // The rest: state, view_build_status, batch_size are calculated from scratch. w.metrics.ResetClusterMetrics(w.run.ClusterID) // Init remaining bytes - for _, wl := range workload { - for _, twl := range wl.Tables { - for _, rdwl := range twl.RemoteDirs { - w.metrics.SetRemainingBytes(metrics.RestoreBytesLabels{ - ClusterID: rdwl.ClusterID.String(), - SnapshotTag: rdwl.SnapshotTag, - Location: rdwl.Location.String(), - DC: rdwl.DC, - Node: rdwl.NodeID, - Keyspace: rdwl.Keyspace, - Table: rdwl.Table, - }, rdwl.Size) - } - } + for _, rdw := range workload.RemoteDir { + w.metrics.SetRemainingBytes(metrics.RestoreBytesLabels{ + ClusterID: rdw.ClusterID.String(), + SnapshotTag: rdw.SnapshotTag, + Location: rdw.Location.String(), + DC: rdw.DC, + Node: rdw.NodeID, + Keyspace: rdw.Keyspace, + Table: rdw.Table, + }, rdw.Size) } // Init progress @@ -206,87 +191,59 @@ func (w *tablesWorker) initMetrics(workload []LocationWorkload) { for _, u := range w.run.Units { totalSize += u.Size } - var workloadSize int64 - for _, wl := range workload { - workloadSize += wl.Size - } w.metrics.SetProgress(metrics.RestoreProgressLabels{ ClusterID: w.run.ClusterID.String(), SnapshotTag: w.run.SnapshotTag, - }, float64(totalSize-workloadSize)/float64(totalSize)*100) + }, float64(totalSize-workload.TotalSize)/float64(totalSize)*100) } -func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) { - if workload.Size == 0 { - return +func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload Workload) { + for loc, size := range workload.LocationSize { + w.logger.Info(ctx, "Location workload", + "location", loc, + "size", size) + } + for tab, size := range workload.TableSize { + w.logger.Info(ctx, "Table workload", + "table", tab, + "size", size) } - var locMax, locCnt int64 - for _, twl := range workload.Tables { - if twl.Size == 0 { + for _, rdw := range workload.RemoteDir { + cnt := int64(len(rdw.SSTables)) + if cnt == 0 { + w.logger.Info(ctx, "Empty remote dir workload", "path", rdw.RemoteSSTableDir) continue } - var tabMax, tabCnt int64 - for _, rdwl := range twl.RemoteDirs { - if rdwl.Size == 0 { - continue - } - var dirMax int64 - for _, sst := range rdwl.SSTables { - dirMax = max(dirMax, sst.Size) - } - dirCnt := int64(len(rdwl.SSTables)) - w.logger.Info(ctx, "Remote sstable dir workload info", - "path", rdwl.RemoteSSTableDir, - "max size", dirMax, - "average size", rdwl.Size/dirCnt, - "count", dirCnt) - tabCnt += dirCnt - tabMax = max(tabMax, dirMax) - } - w.logger.Info(ctx, "Table workload info", - "keyspace", twl.Keyspace, - "table", twl.Table, - "max size", tabMax, - "average size", twl.Size/tabCnt, - "count", tabCnt) - locCnt += tabCnt - locMax = max(locMax, tabMax) - } - w.logger.Info(ctx, "Location workload info", - "location", workload.Location.String(), - "max size", locMax, - "average size", workload.Size/locCnt, - "count", locCnt) -} -func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { - remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) - for _, rw := range rawWorkload { - remoteDirWorkloads[rw.TableName] = append(remoteDirWorkloads[rw.TableName], rw) - } - - var tableWorkloads []TableWorkload - for _, tw := range remoteDirWorkloads { - var size int64 - for _, rdw := range tw { - size += rdw.Size + var maxSST int64 + for _, sst := range rdw.SSTables { + maxSST = max(maxSST, sst.Size) } - tableWorkloads = append(tableWorkloads, TableWorkload{ - Location: tw[0].Location, - TableName: tw[0].TableName, - Size: size, - RemoteDirs: tw, - }) + w.logger.Info(ctx, "Remote sstable dir workload info", + "path", rdw.RemoteSSTableDir, + "total size", rdw.Size, + "max size", maxSST, + "average size", rdw.Size/cnt, + "count", cnt) } +} - var size int64 - for _, tw := range tableWorkloads { - size += tw.Size +func aggregateWorkload(rawWorkload []RemoteDirWorkload) Workload { + var ( + totalSize int64 + locationSize = make(map[Location]int64) + tableSize = make(map[TableName]int64) + ) + for _, rdw := range rawWorkload { + totalSize += rdw.Size + locationSize[rdw.Location] += rdw.Size + tableSize[rdw.TableName] += rdw.Size } - return LocationWorkload{ - Location: tableWorkloads[0].Location, - Size: size, - Tables: tableWorkloads, + return Workload{ + TotalSize: totalSize, + LocationSize: locationSize, + TableSize: tableSize, + RemoteDir: rawWorkload, } } From 88f0547066dcad58581466272bf505be5db945c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 16 Oct 2024 12:35:26 +0200 Subject: [PATCH 4/8] feat(restore): add host retries Now, even if host failed to restore given batch it can still try to restore batches originating from different dcs. This improves retries in general, but also should help with #3871. --- pkg/service/restore/batch.go | 11 ++++++++++- pkg/service/restore/tables_worker.go | 20 +++++++++++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index d69aab41c..e5c57c0d7 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -20,6 +20,7 @@ type batchDispatcher struct { expectedShardWorkload int64 hostShardCnt map[string]uint locationHosts map[Location][]string + hostToFailedDC map[string][]string } func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { @@ -40,6 +41,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin expectedShardWorkload: workload.TotalSize / int64(shards), hostShardCnt: hostShardCnt, locationHosts: locationHosts, + hostToFailedDC: make(map[string][]string), } } @@ -142,6 +144,10 @@ func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { if w.Size == 0 { continue } + // Skip dir from already failed dc + if slices.Contains(bd.hostToFailedDC[host], w.DC) { + continue + } // Sip dir from location without access if !slices.Contains(bd.locationHosts[w.Location], host) { continue @@ -231,10 +237,13 @@ func (bd *batchDispatcher) ReportSuccess(b batch) { } // ReportFailure notifies batchDispatcher that given batch failed to be restored. -func (bd *batchDispatcher) ReportFailure(b batch) error { +func (bd *batchDispatcher) ReportFailure(host string, b batch) error { bd.mu.Lock() defer bd.mu.Unlock() + // Mark failed DC for host + bd.hostToFailedDC[host] = append(bd.hostToFailedDC[host], b.DC) + var rdw *RemoteDirWorkload for i := range bd.workload.RemoteDir { if bd.workload.RemoteDir[i].RemoteSSTableDir == b.RemoteSSTableDir { diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 8001d3293..86316c7bb 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -202,7 +202,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts) - f := func(n int) (err error) { + f := func(n int) error { host := hosts[n] dc, err := w.client.HostDatacenter(ctx, host) if err != nil { @@ -228,10 +228,24 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { pr, err := w.newRunProgress(ctx, hi, b) if err != nil { - return multierr.Append(errors.Wrap(err, "create new run progress"), bd.ReportFailure(b)) + err = multierr.Append(errors.Wrap(err, "create new run progress"), bd.ReportFailure(hi.Host, b)) + w.logger.Error(ctx, "Failed to create new run progress", + "host", hi.Host, + "error", err) + if ctx.Err() != nil { + return err + } + continue } if err := w.restoreBatch(ctx, b, pr); err != nil { - return multierr.Append(errors.Wrap(err, "restore batch"), bd.ReportFailure(b)) + err = multierr.Append(errors.Wrap(err, "restore batch"), bd.ReportFailure(hi.Host, b)) + w.logger.Error(ctx, "Failed to restore batch", + "host", hi.Host, + "error", err) + if ctx.Err() != nil { + return err + } + continue } bd.ReportSuccess(b) w.decreaseRemainingBytesMetric(b) From f3baac6b0daa5dc698d9693113be653694f61f85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 16 Oct 2024 11:58:35 +0200 Subject: [PATCH 5/8] feat(restore): extend batch test with host retry This commit extends TestBatchDispatcher to include failures in its scenario and to validate host retry in a different datacenter. --- pkg/service/restore/batch_test.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/service/restore/batch_test.go b/pkg/service/restore/batch_test.go index 6fc4c118b..41e6066dc 100644 --- a/pkg/service/restore/batch_test.go +++ b/pkg/service/restore/batch_test.go @@ -22,6 +22,7 @@ func TestBatchDispatcher(t *testing.T) { { ManifestInfo: &backupspec.ManifestInfo{ Location: l1, + DC: "dc1", }, TableName: TableName{ Keyspace: "ks1", @@ -37,6 +38,7 @@ func TestBatchDispatcher(t *testing.T) { { ManifestInfo: &backupspec.ManifestInfo{ Location: l1, + DC: "dc1", }, TableName: TableName{ Keyspace: "ks1", @@ -53,6 +55,7 @@ func TestBatchDispatcher(t *testing.T) { { ManifestInfo: &backupspec.ManifestInfo{ Location: l1, + DC: "dc2", }, TableName: TableName{ Keyspace: "ks1", @@ -68,6 +71,7 @@ func TestBatchDispatcher(t *testing.T) { { ManifestInfo: &backupspec.ManifestInfo{ Location: l1, + DC: "dc1", }, TableName: TableName{ Keyspace: "ks1", @@ -83,6 +87,7 @@ func TestBatchDispatcher(t *testing.T) { { ManifestInfo: &backupspec.ManifestInfo{ Location: l2, + DC: "dc1", }, TableName: TableName{ Keyspace: "ks1", @@ -117,19 +122,27 @@ func TestBatchDispatcher(t *testing.T) { dir string size int64 count int + err bool }{ {host: "h1", ok: true, dir: "c", size: 60, count: 1}, - {host: "h1", ok: true, dir: "c", size: 50, count: 1}, + {host: "h1", ok: true, dir: "c", size: 50, count: 1, err: true}, + {host: "h1", ok: true, dir: "b", size: 20, count: 1}, // host retry in different dc + {host: "h2", ok: true, dir: "c", size: 50, count: 1}, // batch retry + {host: "h1", ok: true, dir: "b", size: 10, count: 1, err: true}, + {host: "h1"}, // already failed in all dcs + {host: "h2", ok: true, dir: "b", size: 10, count: 1}, // batch retry {host: "h2", ok: true, dir: "b", size: 30, count: 2}, {host: "h3", ok: true, dir: "d", size: 200, count: 2}, - {host: "h3", ok: false}, + {host: "h3"}, {host: "h2", ok: true, dir: "a", size: 20, count: 2}, {host: "h2", ok: true, dir: "e", size: 10, count: 3}, // batch extended with leftovers < shard_cnt - {host: "h1", ok: false}, - {host: "h2", ok: false}, + {host: "h1"}, + {host: "h2"}, } for _, step := range scenario { + // use dispatchBatch instead of DispatchBatch because + // we don't want to hang here. b, ok := bd.dispatchBatch(step.host) if ok != step.ok { t.Errorf("Expected %v, got %#v", step, b) @@ -140,7 +153,13 @@ func TestBatchDispatcher(t *testing.T) { if b.RemoteSSTableDir != step.dir || b.Size != step.size || len(b.SSTables) != step.count { t.Errorf("Expected %v, got %#v", step, b) } - bd.ReportSuccess(b) + if step.err { + if err := bd.ReportFailure(step.host, b); err != nil { + t.Fatal(err) + } + } else { + bd.ReportSuccess(b) + } } if err := bd.ValidateAllDispatched(); err != nil { From 62c8c1d0c076ea2f97f99226892cd11da92477d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 18 Oct 2024 12:01:19 +0200 Subject: [PATCH 6/8] refactor(restore): make batchDispatcher more comprehensible Previously Workload structure was created during indexing and was updated during batching in order to keep its progress. This was confusing, because it wasn't obvious whether size and SSTable fields were describing the initial Workload state or the updated one. This commit makes it so Workload structure is not changed during batching. Instead, workloadProgress was added to in order to store batching progress. Moreover, this commit also adds a lot of documentation about batchDispatcher internal behavior. --- pkg/service/restore/batch.go | 195 ++++++++++++++++++++++++----------- 1 file changed, 136 insertions(+), 59 deletions(-) diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index e5c57c0d7..f542cc454 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -10,17 +10,58 @@ import ( . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" ) +// batchDispatcher is a tool for batching SSTables from +// Workload across different hosts during restore. +// It follows a few rules: +// +// - it dispatches batches from the RemoteDirWorkload with the biggest +// initial size first +// +// - it aims to optimize batch size according to batchSize param +// +// - it selects the biggest SSTables from RemoteDirWorkload first, +// so that batch contains SSTables of similar size (improved shard utilization) +// +// - it supports batch retry - failed batch can be re-tried by other +// hosts (see wait description for more information) +// +// - it supports host retry - host that failed to restore batch can still +// restore other batches (see hostToFailedDC description for more information). type batchDispatcher struct { - mu sync.Mutex + // Guards all exported methods + mu sync.Mutex + // When there are no more batches to be restored, + // but some already dispatched batches are still + // being processed, idle hosts waits on wait chan. + // They should wait, as in case currently processed + // batch fails to be restored, they can be waked up + // by batchDispatcher, and re-try to restore returned + // batch on their own. wait chan struct{} - remainingBytes int64 - workload Workload - batchSize int + // Const workload defined during indexing + workload Workload + // Mutable workloadProgress updated as batches are dispatched + workloadProgress workloadProgress + // For batchSize X, batches contain X*node_shard_cnt SSTables. + // We always multiply batchSize by node_shard_cnt in order to + // utilize all shards more equally. + // For batchSize 0, batches contain N*node_shard_cnt SSTables + // of total size up to 5% of node expected workload + // (expectedShardWorkload*node_shard_cnt). + batchSize int + // Equals total_backup_size/($\sum_{node} shard_cnt(node)$) expectedShardWorkload int64 - hostShardCnt map[string]uint - locationHosts map[Location][]string - hostToFailedDC map[string][]string + + // Stores host shard count + hostShardCnt map[string]uint + // Stores which hosts have access to which locations + locationHosts map[Location][]string + // Marks which host failed to restore batches from which DCs. + // When host failed to restore a batch from one backed up DC, + // it can still restore other batches coming from different + // DCs. This is a host re-try mechanism aiming to help with #3871. + hostToFailedDC map[string][]string } func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { @@ -35,8 +76,8 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin return &batchDispatcher{ mu: sync.Mutex{}, wait: make(chan struct{}), - remainingBytes: workload.TotalSize, workload: workload, + workloadProgress: newWorkloadProgress(workload), batchSize: batchSize, expectedShardWorkload: workload.TotalSize / int64(shards), hostShardCnt: hostShardCnt, @@ -45,6 +86,44 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin } } +// Describes current state of SSTables that are yet to be batched. +type workloadProgress struct { + // Bytes that are yet to be restored. + // They are decreased after a successful batch restoration. + bytesToBeRestored int64 + // SSTables grouped by RemoteSSTableDir that are yet to + // be batched. They are removed on batch dispatch, but can + // be re-added when batch failed to be restored. + // workloadProgress.remoteDir and Workload.RemoteDir have + // corresponding indexes. + remoteDir []remoteSSTableDirProgress +} + +// Describes current state of SSTables from given RemoteSSTableDir +// that are yet to be batched. +type remoteSSTableDirProgress struct { + RemainingSize int64 + RemainingSSTables []RemoteSSTable +} + +func newWorkloadProgress(workload Workload) workloadProgress { + p := make([]remoteSSTableDirProgress, len(workload.RemoteDir)) + for i := range workload.RemoteDir { + p[i] = remoteSSTableDirProgress{ + RemainingSize: workload.RemoteDir[i].Size, + RemainingSSTables: workload.RemoteDir[i].SSTables, + } + } + return workloadProgress{ + bytesToBeRestored: workload.TotalSize, + remoteDir: p, + } +} + +func (wp workloadProgress) done() bool { + return wp.bytesToBeRestored == 0 +} + type batch struct { TableName *ManifestInfo @@ -92,32 +171,34 @@ func (b batch) IDs() []string { return ids } -// ValidateAllDispatched returns error if not all sstables were dispatched. +// ValidateAllDispatched returns error if not all SSTables were dispatched. func (bd *batchDispatcher) ValidateAllDispatched() error { bd.mu.Lock() defer bd.mu.Unlock() - for _, rdw := range bd.workload.RemoteDir { - if rdw.Size != 0 || len(rdw.SSTables) != 0 { + for i, rdp := range bd.workloadProgress.remoteDir { + if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 { + rdw := bd.workload.RemoteDir[i] return errors.Errorf("expected all data to be restored, missing sstables from location %s table %s.%s: %v (%d bytes)", rdw.Location, rdw.Keyspace, rdw.Table, rdw.SSTables, rdw.Size) } } - if !bd.done() { + if !bd.workloadProgress.done() { return errors.Errorf("expected all data to be restored, internal progress calculation error") } return nil } -// DispatchBatch returns batch restored or false when there is no more work to do. +// DispatchBatch returns batch to be restored or false when there is no more work to do. // This method might hang and wait for sstables that might come from batches that -// failed to be restored. Because of that, it's important to call ReportSuccess -// or ReportFailure after each dispatched batch was attempted to be restored. +// failed to be restored (see batchDispatcher.wait description for more information). +// Because of that, it's important to call ReportSuccess or ReportFailure after +// each dispatched batch was attempted to be restored. func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) { for { bd.mu.Lock() - if bd.done() { + if bd.workloadProgress.done() { bd.mu.Unlock() return batch{}, false } @@ -133,41 +214,43 @@ func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) { } } -func (bd *batchDispatcher) done() bool { - return bd.remainingBytes == 0 +func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { + dirIdx := bd.chooseDir(host) + if dirIdx < 0 { + return batch{}, false + } + return bd.createBatch(dirIdx, host) } -func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { - var rdw *RemoteDirWorkload - for i, w := range bd.workload.RemoteDir { +func (bd *batchDispatcher) chooseDir(host string) int { + dirIdx := -1 + for i := range bd.workloadProgress.remoteDir { + rdw := bd.workload.RemoteDir[i] // Skip empty dir - if w.Size == 0 { + if bd.workloadProgress.remoteDir[i].RemainingSize == 0 { continue } // Skip dir from already failed dc - if slices.Contains(bd.hostToFailedDC[host], w.DC) { + if slices.Contains(bd.hostToFailedDC[host], rdw.DC) { continue } // Sip dir from location without access - if !slices.Contains(bd.locationHosts[w.Location], host) { + if !slices.Contains(bd.locationHosts[rdw.Location], host) { continue } - rdw = &bd.workload.RemoteDir[i] + dirIdx = i break } - if rdw == nil { - return batch{}, false - } - return bd.createBatch(rdw, host) + return dirIdx } -// Returns batch and updates RemoteDirWorkload and its parents. -func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (batch, bool) { +// Returns batch from given RemoteSSTableDir and updates workloadProgress. +func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) { + rdp := &bd.workloadProgress.remoteDir[dirIdx] shardCnt := bd.hostShardCnt[host] if shardCnt == 0 { shardCnt = 1 } - var i int var size int64 if bd.batchSize == maxBatchSize { @@ -177,13 +260,13 @@ func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (bat sizeLimit := expectedNodeWorkload / 20 for { for j := 0; j < int(shardCnt); j++ { - if i >= len(rdw.SSTables) { + if i >= len(rdp.RemainingSSTables) { break } - size += rdw.SSTables[i].Size + size += rdp.RemainingSSTables[i].Size i++ } - if i >= len(rdw.SSTables) { + if i >= len(rdp.RemainingSSTables) { break } if size > sizeLimit { @@ -192,9 +275,9 @@ func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (bat } } else { // Create batch containing node_shard_count*batch_size sstables. - i = min(bd.batchSize*int(shardCnt), len(rdw.SSTables)) + i = min(bd.batchSize*int(shardCnt), len(rdp.RemainingSSTables)) for j := 0; j < i; j++ { - size += rdw.SSTables[j].Size + size += rdp.RemainingSSTables[j].Size } } @@ -203,19 +286,17 @@ func (bd *batchDispatcher) createBatch(rdw *RemoteDirWorkload, host string) (bat } // Extend batch if it was to leave less than // 1 sstable per shard for the next one. - if len(rdw.SSTables)-i < int(shardCnt) { - for ; i < len(rdw.SSTables); i++ { - size += rdw.SSTables[i].Size + if len(rdp.RemainingSSTables)-i < int(shardCnt) { + for ; i < len(rdp.RemainingSSTables); i++ { + size += rdp.RemainingSSTables[i].Size } } - sstables := rdw.SSTables[:i] - rdw.SSTables = rdw.SSTables[i:] + sstables := rdp.RemainingSSTables[:i] + rdp.RemainingSSTables = rdp.RemainingSSTables[i:] + rdw := bd.workload.RemoteDir[dirIdx] - rdw.Size -= size - bd.workload.TableSize[rdw.TableName] -= size - bd.workload.LocationSize[rdw.Location] -= size - bd.workload.TotalSize -= size + rdp.RemainingSize -= size return batch{ TableName: rdw.TableName, ManifestInfo: rdw.ManifestInfo, @@ -230,8 +311,8 @@ func (bd *batchDispatcher) ReportSuccess(b batch) { bd.mu.Lock() defer bd.mu.Unlock() - bd.remainingBytes -= b.Size - if bd.done() { + bd.workloadProgress.bytesToBeRestored -= b.Size + if bd.workloadProgress.done() { bd.wakeUpWaiting() } } @@ -244,24 +325,20 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error { // Mark failed DC for host bd.hostToFailedDC[host] = append(bd.hostToFailedDC[host], b.DC) - var rdw *RemoteDirWorkload + dirIdx := -1 for i := range bd.workload.RemoteDir { if bd.workload.RemoteDir[i].RemoteSSTableDir == b.RemoteSSTableDir { - rdw = &bd.workload.RemoteDir[i] + dirIdx = i + break } } - if rdw == nil { + if dirIdx < 0 { return errors.Errorf("unknown remote sstable dir %s", b.RemoteSSTableDir) } - var newSST []RemoteSSTable - newSST = append(newSST, b.SSTables...) - newSST = append(newSST, rdw.SSTables...) - - rdw.SSTables = newSST - rdw.Size += b.Size - bd.workload.TableSize[b.TableName] += b.Size - bd.workload.LocationSize[b.Location] += b.Size + rdp := &bd.workloadProgress.remoteDir[dirIdx] + rdp.RemainingSSTables = append(b.SSTables, rdp.RemainingSSTables...) + rdp.RemainingSize += b.Size bd.wakeUpWaiting() return nil From 473000f7cafb5272d9874b801afddeea261c70e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 21 Oct 2024 09:13:22 +0200 Subject: [PATCH 7/8] fix(restore): free host when it can't restore anymore Consider a scenario with parallel=1 and multi-dc and multi-location. Note that SM is using 'parallel.Run' for restoring in parallel. Note that previous batching changes made host hang in 'batchDispatcher.DispatchBatch' if there were no more SSTables to restore, because it was still possible that another node failed to restore some SSTables, so that the hanging host could be awakened and restore failed SSTables returned to batchDispatcher. All of this meant that batching process could hang, because 'parallel.Run' would allow only a single host to restore SSTables at the time, but batching mechanism wouldn't free it until all SSTables are restored. Another scenario when batching mechanism could fail would be that all hosts failed (with re-tries) to restore all SSTables. Because of that, I changed batching mechanism to be more DC oriented. Now, 'workloadProgress' keeps track of remaining bytes to be restored per DC, and it also keeps host DC access instead of location access (the assumption being that a single DC can be backed up to only single location). This information allow to free hosts that can't restore any SSTables because they either already failed to restore some SSTables from given DCs, or all SSTables from given DCs were already restored. --- pkg/service/backup/backupspec/location.go | 8 +- pkg/service/restore/batch.go | 109 ++++++---- pkg/service/restore/batch_test.go | 2 +- .../restore/restore_integration_test.go | 202 ++++++++++++++---- 4 files changed, 239 insertions(+), 82 deletions(-) diff --git a/pkg/service/backup/backupspec/location.go b/pkg/service/backup/backupspec/location.go index fe10d9a38..5586a0e41 100644 --- a/pkg/service/backup/backupspec/location.go +++ b/pkg/service/backup/backupspec/location.go @@ -102,13 +102,19 @@ func NewLocation(location string) (l Location, err error) { } func (l Location) String() string { - p := l.Provider.String() + ":" + l.Path + p := l.StringWithoutDC() if l.DC != "" { p = l.DC + ":" + p } return p } +// StringWithoutDC returns Location string representation +// that lacks DC information. +func (l Location) StringWithoutDC() string { + return l.Provider.String() + ":" + l.Path +} + // Datacenter returns location's datacenter. func (l Location) Datacenter() string { return l.DC diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index f542cc454..8dca0b0c5 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -26,7 +26,7 @@ import ( // hosts (see wait description for more information) // // - it supports host retry - host that failed to restore batch can still -// restore other batches (see hostToFailedDC description for more information). +// restore other batches (see hostFailedDC description for more information). type batchDispatcher struct { // Guards all exported methods mu sync.Mutex @@ -52,16 +52,8 @@ type batchDispatcher struct { batchSize int // Equals total_backup_size/($\sum_{node} shard_cnt(node)$) expectedShardWorkload int64 - // Stores host shard count hostShardCnt map[string]uint - // Stores which hosts have access to which locations - locationHosts map[Location][]string - // Marks which host failed to restore batches from which DCs. - // When host failed to restore a batch from one backed up DC, - // it can still restore other batches coming from different - // DCs. This is a host re-try mechanism aiming to help with #3871. - hostToFailedDC map[string][]string } func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { @@ -77,20 +69,27 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin mu: sync.Mutex{}, wait: make(chan struct{}), workload: workload, - workloadProgress: newWorkloadProgress(workload), + workloadProgress: newWorkloadProgress(workload, locationHosts), batchSize: batchSize, expectedShardWorkload: workload.TotalSize / int64(shards), hostShardCnt: hostShardCnt, - locationHosts: locationHosts, - hostToFailedDC: make(map[string][]string), } } // Describes current state of SSTables that are yet to be batched. type workloadProgress struct { - // Bytes that are yet to be restored. + // Bytes that are yet to be restored from given backed up DC. // They are decreased after a successful batch restoration. - bytesToBeRestored int64 + dcBytesToBeRestored map[string]int64 + // Marks which host failed to restore batches from which DCs. + // When host failed to restore a batch from one backed up DC, + // it can still restore other batches coming from different + // DCs. This is a host re-try mechanism aiming to help with #3871. + hostFailedDC map[string][]string + // Stores which hosts have access to restore which DCs. + // It assumes that the whole DC is backed up to a single + // backup location. + hostDCAccess map[string][]string // SSTables grouped by RemoteSSTableDir that are yet to // be batched. They are removed on batch dispatch, but can // be re-added when batch failed to be restored. @@ -106,22 +105,44 @@ type remoteSSTableDirProgress struct { RemainingSSTables []RemoteSSTable } -func newWorkloadProgress(workload Workload) workloadProgress { +func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress { + dcBytes := make(map[string]int64) + locationDC := make(map[string][]string) p := make([]remoteSSTableDirProgress, len(workload.RemoteDir)) - for i := range workload.RemoteDir { + for i, rdw := range workload.RemoteDir { + dcBytes[rdw.DC] += rdw.Size + locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC) p[i] = remoteSSTableDirProgress{ - RemainingSize: workload.RemoteDir[i].Size, - RemainingSSTables: workload.RemoteDir[i].SSTables, + RemainingSize: rdw.Size, + RemainingSSTables: rdw.SSTables, + } + } + hostDCAccess := make(map[string][]string) + for loc, hosts := range locationHosts { + for _, h := range hosts { + hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...) } } return workloadProgress{ - bytesToBeRestored: workload.TotalSize, - remoteDir: p, + dcBytesToBeRestored: dcBytes, + hostFailedDC: make(map[string][]string), + hostDCAccess: hostDCAccess, + remoteDir: p, } } -func (wp workloadProgress) done() bool { - return wp.bytesToBeRestored == 0 +// Checks if given host finished restoring all that it could. +func (wp workloadProgress) isDone(host string) bool { + failed := wp.hostFailedDC[host] + for _, dc := range wp.hostDCAccess[host] { + // Host isn't done when there is still some data to be restored + // from a DC that it has access to, and it didn't previously fail + // to restore data from this DC. + if !slices.Contains(failed, dc) && wp.dcBytesToBeRestored[dc] != 0 { + return false + } + } + return true } type batch struct { @@ -179,12 +200,15 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { for i, rdp := range bd.workloadProgress.remoteDir { if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 { rdw := bd.workload.RemoteDir[i] - return errors.Errorf("expected all data to be restored, missing sstables from location %s table %s.%s: %v (%d bytes)", - rdw.Location, rdw.Keyspace, rdw.Table, rdw.SSTables, rdw.Size) + return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info", + rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size) } } - if !bd.workloadProgress.done() { - return errors.Errorf("expected all data to be restored, internal progress calculation error") + for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored { + if bytes != 0 { + return errors.Errorf("expected all data from DC %q to be restored (missing %d bytes): "+ + "internal progress calculation error", dc, bytes) + } } return nil } @@ -197,32 +221,24 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) { for { bd.mu.Lock() - - if bd.workloadProgress.done() { + // Check if there is anything to do for this host + if bd.workloadProgress.isDone(host) { bd.mu.Unlock() return batch{}, false } + // Try to dispatch batch b, ok := bd.dispatchBatch(host) wait := bd.wait - bd.mu.Unlock() - if ok { return b, true } + // Wait for SSTables that might return after failure <-wait } } func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { - dirIdx := bd.chooseDir(host) - if dirIdx < 0 { - return batch{}, false - } - return bd.createBatch(dirIdx, host) -} - -func (bd *batchDispatcher) chooseDir(host string) int { dirIdx := -1 for i := range bd.workloadProgress.remoteDir { rdw := bd.workload.RemoteDir[i] @@ -231,17 +247,20 @@ func (bd *batchDispatcher) chooseDir(host string) int { continue } // Skip dir from already failed dc - if slices.Contains(bd.hostToFailedDC[host], rdw.DC) { + if slices.Contains(bd.workloadProgress.hostFailedDC[host], rdw.DC) { continue } // Sip dir from location without access - if !slices.Contains(bd.locationHosts[rdw.Location], host) { + if !slices.Contains(bd.workloadProgress.hostDCAccess[host], rdw.DC) { continue } dirIdx = i break } - return dirIdx + if dirIdx < 0 { + return batch{}, false + } + return bd.createBatch(dirIdx, host) } // Returns batch from given RemoteSSTableDir and updates workloadProgress. @@ -311,8 +330,10 @@ func (bd *batchDispatcher) ReportSuccess(b batch) { bd.mu.Lock() defer bd.mu.Unlock() - bd.workloadProgress.bytesToBeRestored -= b.Size - if bd.workloadProgress.done() { + dcBytes := bd.workloadProgress.dcBytesToBeRestored + dcBytes[b.DC] -= b.Size + // Mark batching as finished due to successful restore + if dcBytes[b.DC] == 0 { bd.wakeUpWaiting() } } @@ -323,7 +344,7 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error { defer bd.mu.Unlock() // Mark failed DC for host - bd.hostToFailedDC[host] = append(bd.hostToFailedDC[host], b.DC) + bd.workloadProgress.hostFailedDC[host] = append(bd.workloadProgress.hostFailedDC[host], b.DC) dirIdx := -1 for i := range bd.workload.RemoteDir { diff --git a/pkg/service/restore/batch_test.go b/pkg/service/restore/batch_test.go index 41e6066dc..9f206716e 100644 --- a/pkg/service/restore/batch_test.go +++ b/pkg/service/restore/batch_test.go @@ -87,7 +87,7 @@ func TestBatchDispatcher(t *testing.T) { { ManifestInfo: &backupspec.ManifestInfo{ Location: l2, - DC: "dc1", + DC: "dc3", }, TableName: TableName{ Keyspace: "ks1", diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 43bd33da6..581e33594 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "maps" "net/http" "strings" "sync/atomic" @@ -733,49 +734,178 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { "batch_size": 100, }) - Print("Inject errors") - downloadCnt := atomic.Int64{} - lasCnt := atomic.Int64{} - h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - // For this setup, we have 6 remote sstable dirs and 6 workers. - // We inject 2 errors during download and 3 errors during LAS. - // This means that only a single node will be restoring at the end. - // Huge batch size and 3 LAS errors guarantee total 9 calls to LAS. - // The last failed call to LAS (cnt=8) waits a bit so that we test - // that batch dispatcher correctly reuses and releases nodes waiting - // for failed sstables to come back to the batch dispatcher. - if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { - if cnt := downloadCnt.Add(1); cnt == 1 || cnt == 3 { - t.Log("Fake download error ", cnt) - return nil, errors.New("fake download error") + downloadErr := errors.New("fake download error") + lasErr := errors.New("fake las error") + props := map[string]any{ + "location": loc, + "keyspace": ksFilter, + "snapshot_tag": tag, + "restore_tables": true, + } + + t.Run("batch retry finished with success", func(t *testing.T) { + Print("Inject errors to some download and las calls") + downloadCnt := atomic.Int64{} + lasCnt := atomic.Int64{} + h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + // For this setup, we have 6 remote sstable dirs and 6 workers. + // We inject 2 errors during download and 3 errors during LAS. + // This means that only a single node will be restoring at the end. + // Huge batch size and 3 LAS errors guarantee total 9 calls to LAS. + // The last failed call to LAS (cnt=8) waits a bit so that we test + // that batch dispatcher correctly reuses and releases nodes waiting + // for failed sstables to come back to the batch dispatcher. + if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { + if cnt := downloadCnt.Add(1); cnt == 1 || cnt == 3 { + t.Log("Fake download error ", cnt) + return nil, downloadErr + } } + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { + cnt := lasCnt.Add(1) + if cnt == 8 { + time.Sleep(15 * time.Second) + } + if cnt == 1 || cnt == 5 || cnt == 8 { + t.Log("Fake LAS error ", cnt) + return nil, lasErr + } + } + return nil, nil + })) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + h.runRestore(t, props) + + Print("Validate success") + if cnt := lasCnt.Add(0); cnt < 9 { + t.Fatalf("Expected at least 9 calls to LAS, got %d", cnt) } - if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { - cnt := lasCnt.Add(1) - if cnt == 8 { - time.Sleep(15 * time.Second) + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab1, "id", "data") + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab2, "id", "data") + validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab3, "id", "data") + }) + + t.Run("restore with injected failures only", func(t *testing.T) { + Print("Inject errors to all download and las calls") + reachedDataStage := atomic.Bool{} + reachedDataStageChan := make(chan struct{}) + h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { + if reachedDataStage.CompareAndSwap(false, true) { + close(reachedDataStageChan) + } + return nil, downloadErr } - if cnt == 1 || cnt == 5 || cnt == 8 { - t.Log("Fake LAS error ", cnt) - return nil, errors.New("fake las error") + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { + return nil, lasErr } + return nil, nil + })) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + h.dstCluster.TaskID = uuid.NewTime() + h.dstCluster.RunID = uuid.NewTime() + rawProps, err := json.Marshal(props) + if err != nil { + t.Fatal(errors.Wrap(err, "marshal properties")) + } + res := make(chan error) + go func() { + res <- h.dstRestoreSvc.Restore(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) + }() + + Print("Wait for data stage") + select { + case <-reachedDataStageChan: + case err := <-res: + t.Fatalf("Restore finished before reaching data stage with: %s", err) } - return nil, nil - })) - Print("Run restore tables") - grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) - h.runRestore(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, - "snapshot_tag": tag, - "restore_tables": true, + Print("Validate restore failure and that it does not hang") + select { + case err := <-res: + if err == nil { + t.Fatalf("Expected restore to end with error") + } + case <-time.NewTimer(time.Minute).C: + t.Fatal("Restore hanged") + } }) +} + +func TestRestoreTablesMultiLocationIntegration(t *testing.T) { + // Since we need multi-dc clusters for multi-dc backup/restore + // we will use the same cluster as both src and dst. + h := newTestHelper(t, ManagedClusterHosts(), ManagedClusterHosts()) + + Print("Keyspace setup") + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}" + ks := randomizedName("multi_location_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab := randomizedName("tab_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + + Print("Fill setup") + fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + + Print("Save filled table into map") + srcM := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ks, tab, "id", "data") + + Print("Run backup") + loc := []Location{ + testLocation("multi-location-1", "dc1"), + testLocation("multi-location-2", "dc2"), + } + S3InitBucket(t, loc[0].Path) + S3InitBucket(t, loc[1].Path) + ksFilter := []string{ks} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Truncate backed up table") + truncateStmt := "TRUNCATE TABLE %q.%q" + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(truncateStmt, ks, tab)) + + // Reverse dcs - just for fun + loc[0].DC = "dc2" + loc[1].DC = "dc1" + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + res := make(chan struct{}) + go func() { + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + // Test if batching does not hang with + // limited parallel and location access. + "parallel": 1, + "snapshot_tag": tag, + "restore_tables": true, + }) + close(res) + }() + + select { + case <-res: + case <-time.NewTimer(2 * time.Minute).C: + t.Fatal("Restore hanged") + } + + Print("Save restored table into map") + dstM := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ks, tab, "id", "data") - if cnt := lasCnt.Add(0); cnt < 9 { - t.Fatalf("Expected at least 9 calls to LAS, got %d", cnt) + Print("Validate success") + if !maps.Equal(srcM, dstM) { + t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM) } - validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab1, "id", "data") - validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab2, "id", "data") - validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab3, "id", "data") } From 2eb9e4a0bb9b47555a9dc8a60250857e3144063f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 21 Oct 2024 20:56:36 +0200 Subject: [PATCH 8/8] feat(restore): improve context cancelling during batching I'm not sure if previous behavior was bugged, but changes introduced in this commit should make it more clear that batching mechanism respects context cancellation. This commit also adds a simple test validating that pausing restore during batching ends quickly. --- pkg/service/restore/batch.go | 11 ++++- .../restore/restore_integration_test.go | 49 +++++++++++++++++++ pkg/service/restore/tables_worker.go | 14 +++--- 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index 8dca0b0c5..d7f8ee7c9 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -3,6 +3,7 @@ package restore import ( + "context" "slices" "sync" @@ -218,8 +219,11 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { // failed to be restored (see batchDispatcher.wait description for more information). // Because of that, it's important to call ReportSuccess or ReportFailure after // each dispatched batch was attempted to be restored. -func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) { +func (bd *batchDispatcher) DispatchBatch(ctx context.Context, host string) (batch, bool) { for { + if ctx.Err() != nil { + return batch{}, false + } bd.mu.Lock() // Check if there is anything to do for this host if bd.workloadProgress.isDone(host) { @@ -234,7 +238,10 @@ func (bd *batchDispatcher) DispatchBatch(host string) (batch, bool) { return b, true } // Wait for SSTables that might return after failure - <-wait + select { + case <-ctx.Done(): + case <-wait: + } } } diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 581e33594..7503e1d76 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -834,6 +834,55 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { t.Fatal("Restore hanged") } }) + + t.Run("paused restore with slow calls to download and las", func(t *testing.T) { + Print("Make download and las calls slow") + reachedDataStage := atomic.Bool{} + reachedDataStageChan := make(chan struct{}) + h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") || + strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") { + if reachedDataStage.CompareAndSwap(false, true) { + close(reachedDataStageChan) + } + time.Sleep(time.Second) + return nil, nil + } + return nil, nil + })) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + h.dstCluster.TaskID = uuid.NewTime() + h.dstCluster.RunID = uuid.NewTime() + rawProps, err := json.Marshal(props) + if err != nil { + t.Fatal(errors.Wrap(err, "marshal properties")) + } + ctx, cancel := context.WithCancel(context.Background()) + res := make(chan error) + go func() { + res <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) + }() + + Print("Wait for data stage") + select { + case <-reachedDataStageChan: + cancel() + case err := <-res: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } + + Print("Validate restore was paused in time") + select { + case err := <-res: + if !errors.Is(err, context.Canceled) { + t.Fatalf("Expected restore to end with context cancelled, got %q", err) + } + case <-time.NewTimer(2 * time.Second).C: + t.Fatal("Restore wasn't paused in time") + } + }) } func TestRestoreTablesMultiLocationIntegration(t *testing.T) { diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 86316c7bb..4ce143517 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -211,8 +211,11 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { hi := w.hostInfo(host, dc, hostToShard[host]) w.logger.Info(ctx, "Host info", "host", hi.Host, "transfers", hi.Transfers, "rate limit", hi.RateLimit) for { + if ctx.Err() != nil { + return ctx.Err() + } // Download and stream in parallel - b, ok := bd.DispatchBatch(hi.Host) + b, ok := bd.DispatchBatch(ctx, hi.Host) if !ok { w.logger.Info(ctx, "No more batches to restore", "host", hi.Host) return nil @@ -232,9 +235,6 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Error(ctx, "Failed to create new run progress", "host", hi.Host, "error", err) - if ctx.Err() != nil { - return err - } continue } if err := w.restoreBatch(ctx, b, pr); err != nil { @@ -242,9 +242,6 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Error(ctx, "Failed to restore batch", "host", hi.Host, "error", err) - if ctx.Err() != nil { - return err - } continue } bd.ReportSuccess(b) @@ -261,6 +258,9 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { err = parallel.Run(len(hosts), w.target.Parallel, f, notify) if err == nil { + if ctx.Err() != nil { + return ctx.Err() + } return bd.ValidateAllDispatched() } return err