Skip to content

Commit

Permalink
chore(restore_test): adjust tests to the Scylla restore API
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Jan 8, 2025
1 parent 9e96cf9 commit 0792ccb
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 31 deletions.
10 changes: 10 additions & 0 deletions pkg/service/restore/helper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,13 @@ func runPausedRestore(t *testing.T, restore func(ctx context.Context) error, int
}
}
}

func isDownloadOrRestoreEndpoint(path string) bool {
return strings.HasPrefix(path, "/agent/rclone/sync/copypaths") ||
strings.HasPrefix(path, "/storage_service/restore")
}

func isLasOrRestoreEndpoint(path string) bool {
return strings.HasPrefix(path, "/storage_service/sstables") ||
strings.HasPrefix(path, "/storage_service/restore")
}
60 changes: 32 additions & 28 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,25 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

ni, err := h.dstCluster.Client.AnyNodeInfo(context.Background())
if err != nil {
t.Fatal(err)
}
supportScyllaRestoreAPI, err := ni.SupportsScyllaBackupRestoreAPI()
if err != nil {
t.Fatal(err)
}

validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int, cpus []int64) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Restore with Scylla API does not need to directly control
// compaction, transfers, rate limit, cpu pinning.
if supportScyllaRestoreAPI {
return
}
// Validate compaction
for _, host := range ch.Client.Config().Hosts {
enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
Expand Down Expand Up @@ -634,7 +648,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
cnt := atomic.Int64{}
cnt.Add(int64(len(h.dstCluster.Client.Config().Hosts)))
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") {
if isLasOrRestoreEndpoint(req.URL.Path) {
if curr := cnt.Add(-1); curr == 0 {
Print("Reached data stage")
close(reachedDataStageChan)
Expand Down Expand Up @@ -762,8 +776,6 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {
"batch_size": 100,
})

downloadErr := errors.New("fake download error")
lasErr := errors.New("fake las error")
props := map[string]any{
"location": loc,
"keyspace": ksFilter,
Expand All @@ -773,30 +785,23 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {

t.Run("batch retry finished with success", func(t *testing.T) {
Print("Inject errors to some download and las calls")
downloadCnt := atomic.Int64{}
lasCnt := atomic.Int64{}
counter := atomic.Int64{}
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
// For this setup, we have 6 remote sstable dirs and 6 workers.
// We inject 2 errors during download and 3 errors during LAS.
// We inject 5 errors during LAS or Scylla Restore API.
// This means that only a single node will be restoring at the end.
// Huge batch size and 3 LAS errors guarantee total 9 calls to LAS.
// The last failed call to LAS (cnt=8) waits a bit so that we test
// Huge batch size and 5 errors guarantee total 11 calls to LAS or Scylla API.
// The last failed call (cnt=10) waits a bit so that we test
// that batch dispatcher correctly reuses and releases nodes waiting
// for failed sstables to come back to the batch dispatcher.
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") {
if cnt := downloadCnt.Add(1); cnt == 1 || cnt == 3 {
t.Log("Fake download error ", cnt)
return nil, downloadErr
}
}
if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") {
cnt := lasCnt.Add(1)
if cnt == 8 {
if isLasOrRestoreEndpoint(req.URL.Path) {
cnt := counter.Add(1)
switch cnt {
case 10:
time.Sleep(15 * time.Second)
}
if cnt == 1 || cnt == 5 || cnt == 8 {
t.Log("Fake LAS error ", cnt)
return nil, lasErr
return nil, errors.New("fake error")
case 1, 3, 5, 8:
return nil, errors.New("fake error")
}
}
return nil, nil
Expand All @@ -807,7 +812,7 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {
h.runRestore(t, props)

Print("Validate success")
if cnt := lasCnt.Add(0); cnt < 9 {
if cnt := counter.Add(0); cnt < 11 {
t.Fatalf("Expected at least 9 calls to LAS, got %d", cnt)
}
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab1, "id", "data")
Expand All @@ -820,14 +825,14 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {
reachedDataStage := atomic.Bool{}
reachedDataStageChan := make(chan struct{})
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") {
if isDownloadOrRestoreEndpoint(req.URL.Path) {
if reachedDataStage.CompareAndSwap(false, true) {
close(reachedDataStageChan)
}
return nil, downloadErr
return nil, errors.New("fake error")
}
if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") {
return nil, lasErr
if isLasOrRestoreEndpoint(req.URL.Path) {
return nil, errors.New("fake error")
}
return nil, nil
}))
Expand Down Expand Up @@ -868,8 +873,7 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) {
reachedDataStage := atomic.Bool{}
reachedDataStageChan := make(chan struct{})
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") ||
strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") {
if isDownloadOrRestoreEndpoint(req.URL.Path) || isLasOrRestoreEndpoint(req.URL.Path) {
if reachedDataStage.CompareAndSwap(false, true) {
close(reachedDataStageChan)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,9 +820,11 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo

a := atomic.NewInt64(0)
dstH.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/storage_service/sstables/") && a.Inc() == 1 {
Print("And: context1 is canceled")
cancel1()
if isLasOrRestoreEndpoint(req.URL.Path) {
if a.Inc() == 1 {
Print("And: context1 is canceled")
cancel1()
}
}
return nil, nil
}))
Expand Down

0 comments on commit 0792ccb

Please sign in to comment.