From 1506638b282029420ac140ab956f735dc5a5652a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 8 Oct 2024 14:42:55 +0200 Subject: [PATCH] feat(restore_test): extend TestRestoreTablesPreparationIntegration with transfers This way this test also checks transfers before and after backup. It also checks transfers before, in the middle, when paused, when resumed, and after restore. This required some changes to the test like swapping src and dst cluster (increase batch count) and hanging on LAS instead of copy paths (transfer change is applied as a part of copy paths). --- .../restore/restore_integration_test.go | 125 ++++++++++++------ 1 file changed, 83 insertions(+), 42 deletions(-) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index d233b668f..cf81acae7 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -481,7 +481,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { // Validate setup // Validate restore success - h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) + h := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts()) Print("Keyspace setup") ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}" @@ -498,15 +498,69 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + // Validate tombstone_gc mode + if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { + t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) + } + // Validate compaction + for _, host := range ch.Client.Config().Hosts { + enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) + if err != nil { + t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) + } + if compaction != enabled { + t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) + } + } + // Validate transfers + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetTransfers(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + if transfers != got { + t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) + } + } + } + + shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) + if err != nil { + t.Fatal(err) + } + transfers0 := 2 * int(shardCnt) + + // Set initial transfers + for _, host := range ManagedClusterHosts() { + err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + for _, host := range ManagedSecondClusterHosts() { + err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + + Print("Validate state before backup") + validateState(h.srcCluster, "repair", true, 10) + Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, }) + Print("Validate state after backup") + validateState(h.srcCluster, "repair", true, 3) + runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() @@ -514,6 +568,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "location": loc, "keyspace": ksFilter, "snapshot_tag": tag, + "transfers": 0, "restore_tables": true, }) if err != nil { @@ -522,44 +577,31 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } - validateState := func(tombstone string, compaction bool) { - // Validate tombstone_gc mode - if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got { - t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) - } - // Validate compaction - for _, host := range ManagedClusterHosts() { - enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) - if err != nil { - t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) - } - if compaction != enabled { - t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) - } - } - } - - makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) { + makeLASHang := func(reachedDataStageChan, hangLAS chan struct{}) { + cnt := atomic.Int64{} + cnt.Add(int64(len(h.dstCluster.Client.Config().Hosts))) h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { - if reachedDataStage.CompareAndSwap(false, true) { + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") { + if curr := cnt.Add(-1); curr == 0 { Print("Reached data stage") close(reachedDataStageChan) } - Print("Wait for copy paths to stop hanging") - <-hangCopyPaths + Print("Wait for LAS to stop hanging") + <-hangLAS } return nil, nil })) } var ( - reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) - hangCopyPathsChan = make(chan struct{}) + hangLAS = make(chan struct{}) ) - Print("Make copy paths hang") - makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + Print("Make LAS hang") + makeLASHang(reachedDataStageChan, hangLAS) + + Print("Validate state before restore") + validateState(h.dstCluster, "repair", true, 10) Print("Run restore") finishedRestore := make(chan error) @@ -570,28 +612,27 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) Print("Pause restore") restoreCancel() - Print("Release copy paths") - close(hangCopyPathsChan) + Print("Release LAS") + close(hangLAS) Print("Wait for restore") - err := <-finishedRestore + err = <-finishedRestore if !errors.Is(err, context.Canceled) { t.Fatalf("Expected restore to be paused, got: %s", err) } Print("Validate state during pause") - validateState("disabled", true) + validateState(h.dstCluster, "disabled", true, transfers0) - reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) - hangCopyPathsChan = make(chan struct{}) - Print("Make copy paths hang after pause") - makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + hangLAS = make(chan struct{}) + Print("Make LAS hang after pause") + makeLASHang(reachedDataStageChan, hangLAS) Print("Run restore after pause") finishedRestore = make(chan error) @@ -601,10 +642,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data after pause") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) - Print("Release copy paths") - close(hangCopyPathsChan) + Print("Release LAS") + close(hangLAS) Print("Wait for restore") err = <-finishedRestore @@ -613,7 +654,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState("repair", true) + validateState(h.dstCluster, "repair", true, transfers0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})