Skip to content

Commit

Permalink
feat(restore_test): extend TestRestoreTablesPreparationIntegration wi…
Browse files Browse the repository at this point in the history
…th transfers

This way this test also checks transfers before and after backup.
It also checks transfers before, in the middle, when paused,
when resumed, and after restore.
This required some changes to the test like swapping
src and dst cluster (increase batch count) and hanging on LAS instead of
copy paths (transfer change is applied as a part of copy paths).
  • Loading branch information
Michal-Leszczynski committed Oct 11, 2024
1 parent ecf58bb commit 1506638
Showing 1 changed file with 83 additions and 42 deletions.
125 changes: 83 additions & 42 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
// Validate setup
// Validate restore success

h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())
h := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts())

Print("Keyspace setup")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
Expand All @@ -498,22 +498,77 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Validate compaction
for _, host := range ch.Client.Config().Hosts {
enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
if err != nil {
t.Fatal(errors.Wrapf(err, "check compaction on host %s", host))
}
if compaction != enabled {
t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host)
}
}
// Validate transfers
for _, host := range ch.Client.Config().Hosts {
got, err := ch.Client.RcloneGetTransfers(context.Background(), host)
if err != nil {
t.Fatal(errors.Wrapf(err, "check transfers on host %s", host))
}
if transfers != got {
t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host)
}
}
}

shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost())
if err != nil {
t.Fatal(err)
}
transfers0 := 2 * int(shardCnt)

// Set initial transfers
for _, host := range ManagedClusterHosts() {
err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10)
if err != nil {
t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host))
}
}
for _, host := range ManagedSecondClusterHosts() {
err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10)
if err != nil {
t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host))
}
}

Print("Validate state before backup")
validateState(h.srcCluster, "repair", true, 10)

Print("Run backup")
loc := []Location{testLocation("preparation", "")}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
"location": loc,
"keyspace": ksFilter,
"transfers": 3,
})

Print("Validate state after backup")
validateState(h.srcCluster, "repair", true, 3)

runRestore := func(ctx context.Context, finishedRestore chan error) {
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
h.dstCluster.RunID = uuid.NewTime()
rawProps, err := json.Marshal(map[string]any{
"location": loc,
"keyspace": ksFilter,
"snapshot_tag": tag,
"transfers": 0,
"restore_tables": true,
})
if err != nil {
Expand All @@ -522,44 +577,31 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps)
}

validateState := func(tombstone string, compaction bool) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Validate compaction
for _, host := range ManagedClusterHosts() {
enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
if err != nil {
t.Fatal(errors.Wrapf(err, "check compaction on host %s", host))
}
if compaction != enabled {
t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host)
}
}
}

makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) {
makeLASHang := func(reachedDataStageChan, hangLAS chan struct{}) {
cnt := atomic.Int64{}
cnt.Add(int64(len(h.dstCluster.Client.Config().Hosts)))
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") {
if reachedDataStage.CompareAndSwap(false, true) {
if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") {
if curr := cnt.Add(-1); curr == 0 {
Print("Reached data stage")
close(reachedDataStageChan)
}
Print("Wait for copy paths to stop hanging")
<-hangCopyPaths
Print("Wait for LAS to stop hanging")
<-hangLAS
}
return nil, nil
}))
}

var (
reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{})
hangCopyPathsChan = make(chan struct{})
hangLAS = make(chan struct{})
)
Print("Make copy paths hang")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)
Print("Make LAS hang")
makeLASHang(reachedDataStageChan, hangLAS)

Print("Validate state before restore")
validateState(h.dstCluster, "repair", true, 10)

Print("Run restore")
finishedRestore := make(chan error)
Expand All @@ -570,28 +612,27 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
<-reachedDataStageChan

Print("Validate state during restore data")
validateState("disabled", false)
validateState(h.dstCluster, "disabled", false, transfers0)

Print("Pause restore")
restoreCancel()

Print("Release copy paths")
close(hangCopyPathsChan)
Print("Release LAS")
close(hangLAS)

Print("Wait for restore")
err := <-finishedRestore
err = <-finishedRestore
if !errors.Is(err, context.Canceled) {
t.Fatalf("Expected restore to be paused, got: %s", err)
}

Print("Validate state during pause")
validateState("disabled", true)
validateState(h.dstCluster, "disabled", true, transfers0)

reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{})
hangCopyPathsChan = make(chan struct{})
Print("Make copy paths hang after pause")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)
hangLAS = make(chan struct{})
Print("Make LAS hang after pause")
makeLASHang(reachedDataStageChan, hangLAS)

Print("Run restore after pause")
finishedRestore = make(chan error)
Expand All @@ -601,10 +642,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
<-reachedDataStageChan

Print("Validate state during restore data after pause")
validateState("disabled", false)
validateState(h.dstCluster, "disabled", false, transfers0)

Print("Release copy paths")
close(hangCopyPathsChan)
Print("Release LAS")
close(hangLAS)

Print("Wait for restore")
err = <-finishedRestore
Expand All @@ -613,7 +654,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state after restore success")
validateState("repair", true)
validateState(h.dstCluster, "repair", true, transfers0)

Print("Validate table contents")
h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
Expand Down

0 comments on commit 1506638

Please sign in to comment.