From e350f8b202edb5743362e8cba86669a116a7f6b2 Mon Sep 17 00:00:00 2001 From: Riley Laine <95257195+rjlaine@users.noreply.github.com> Date: Wed, 3 Jul 2024 10:50:15 -0700 Subject: [PATCH] VTGateProxy CI (#414) * Add vtgateproxy tests to CI * Refactor tests to make them less flappy --------- Signed-off-by: Riley Laine --- .../endtoend/vtgateproxy/rebalance_test.go | 119 +++++++----------- go/test/endtoend/vtgateproxy/scale_test.go | 96 +++++++------- test/config.json | 9 ++ 3 files changed, 106 insertions(+), 118 deletions(-) diff --git a/go/test/endtoend/vtgateproxy/rebalance_test.go b/go/test/endtoend/vtgateproxy/rebalance_test.go index c1bcabe9655..b77382a23a7 100644 --- a/go/test/endtoend/vtgateproxy/rebalance_test.go +++ b/go/test/endtoend/vtgateproxy/rebalance_test.go @@ -19,13 +19,10 @@ package vtgateproxy import ( "context" - "encoding/json" "fmt" - "os" "path/filepath" "strconv" "testing" - "time" _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" @@ -47,8 +44,7 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) { const targetAffinity = "use1-az1" const targetPool = "pool1" - const vtgateCount = 10 - const vtgatesInAffinity = 5 + const vtgateCount = 5 const vtgateproxyConnections = 4 vtgates, err := startAdditionalVtgates(vtgateCount) @@ -61,28 +57,20 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) { var config []map[string]string for i, vtgate := range vtgates { - affinity := targetAffinity - if i >= vtgatesInAffinity { - affinity = "use1-az2" - } config = append(config, map[string]string{ "host": fmt.Sprintf("vtgate%v", i), "address": clusterInstance.Hostname, "grpc": strconv.Itoa(vtgate.GrpcPort), - "az_id": affinity, + "az_id": targetAffinity, "type": targetPool, }) } - vtgateIdx := vtgateproxyConnections - b, err := json.Marshal(config[:vtgateIdx]) - if err != nil { - t.Fatal(err) - } - if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + if err := writeConfig(t, vtgateHostsFile, config, nil); err != nil { t.Fatal(err) } + // Spin up proxy vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() @@ -114,27 +102,32 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) { log.Info("Reading test value while adding vtgates") - const totalQueries = 1000 - addVtgateEveryN := totalQueries / len(vtgates) + // Scale up + for i := 1; i <= vtgateCount; i++ { + if err := writeConfig(t, vtgateHostsFile, config[:i], vtgateproxyProcInstance); err != nil { + t.Fatal(err) + } - for i := 0; i < totalQueries; i++ { - if i%(addVtgateEveryN) == 0 && vtgateIdx <= len(vtgates) { - log.Infof("Adding vtgate %v", vtgateIdx-1) - b, err = json.Marshal(config[:vtgateIdx]) + // Run queries at each configuration + for j := 0; j < 100; j++ { + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") if err != nil { t.Fatal(err) } - if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { - t.Fatal(err) - } - if err := vtgateproxyProcInstance.WaitForConfig(config[:vtgateIdx], 5*time.Second); err != nil { - t.Fatal(err) - } - - vtgateIdx++ + assert.Equal(t, []customerEntry{{1, "email1"}}, result) } + } + + log.Info("Removing first vtgates") + // Pop the first 2 vtgates off to force first_ready to pick a new target + if err := writeConfig(t, vtgateHostsFile, config[2:], vtgateproxyProcInstance); err != nil { + t.Fatal(err) + } + + // Run queries in the last configuration + for j := 0; j < 100; j++ { result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") if err != nil { t.Fatal(err) @@ -143,59 +136,33 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) { assert.Equal(t, []customerEntry{{1, "email1"}}, result) } - // No queries should be sent to vtgates outside target affinity - const expectMaxQueryCountNonAffinity = 0 + var expectVtgatesWithQueries int switch loadBalancer { case "round_robin": - // At least 1 query should be sent to every vtgate matching target - // affinity - const expectMinQueryCountAffinity = 1 - - for i, vtgate := range vtgates { - queryCount, err := getVtgateQueryCount(vtgate) - if err != nil { - t.Fatal(err) - } - - affinity := config[i]["az_id"] - - log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount) - - if affinity == targetAffinity { - assert.GreaterOrEqual(t, queryCount.Sum(), expectMinQueryCountAffinity, "vtgate %v did not recieve the expected number of queries", i) - } else { - assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i) - } - } + // Every vtgate should get some queries. We went from 1 vtgates to + // NumConnections+1 vtgates, and then removed the first vtgate. + expectVtgatesWithQueries = len(vtgates) case "first_ready": - // A single vtgate should become the target, and it should recieve all - // queries - targetVtgate := -1 - - for i, vtgate := range vtgates { - queryCount, err := getVtgateQueryCount(vtgate) - if err != nil { - t.Fatal(err) - } - - affinity := config[i]["az_id"] + // Only 2 vtgates should have queries. The first vtgate should get all + // queries until it is removed, and then a new vtgate should be picked + // to get all subsequent queries. + expectVtgatesWithQueries = 2 + } - log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount) + var vtgatesWithQueries int + for i, vtgate := range vtgates { + queryCount, err := getVtgateQueryCount(vtgate) + if err != nil { + t.Fatal(err) + } - sum := queryCount.Sum() - if sum == 0 { - continue - } + log.Infof("vtgate %v query counts: %+v", i, queryCount) - if targetVtgate != -1 { - t.Logf("only vtgate %v should have received queries; vtgate %v got %v", targetVtgate, i, sum) - t.Fail() - } else if affinity == targetAffinity { - targetVtgate = i - } else { - assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i) - } + if queryCount.Sum() > 0 { + vtgatesWithQueries++ } } + + assert.Equal(t, expectVtgatesWithQueries, vtgatesWithQueries) } diff --git a/go/test/endtoend/vtgateproxy/scale_test.go b/go/test/endtoend/vtgateproxy/scale_test.go index a98f1c38393..77ba163a14b 100644 --- a/go/test/endtoend/vtgateproxy/scale_test.go +++ b/go/test/endtoend/vtgateproxy/scale_test.go @@ -60,6 +60,7 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) { vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") var config []map[string]string + // Spin up vtgates for i, vtgate := range vtgates { pool := targetPool if i == 0 { @@ -76,14 +77,12 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) { "type": pool, }) } - b, err := json.Marshal(config[:1]) - if err != nil { - t.Fatal(err) - } - if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + + if err := writeConfig(t, vtgateHostsFile, config[:1], nil); err != nil { t.Fatal(err) } + // Spin up proxy vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() @@ -113,25 +112,8 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) { t.Fatal("no vtgates in the pool, ping should have failed") } - log.Info("Reading test value while scaling vtgates") - - // Start with an empty list of vtgates, then scale up, then scale back to - // 0. We should expect to see immediate failure when there are no vtgates, - // then success at each scale, until we hit 0 vtgates again, at which point - // we should fail fast again. - i := 0 - scaleUp := true - for { - t.Logf("writing config file with %v vtgates", i) - b, err = json.Marshal(config[:i]) - if err != nil { - t.Fatal(err) - } - if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { - t.Fatal(err) - } - - if err := vtgateproxyProcInstance.WaitForConfig(config[:i], 5*time.Second); err != nil { + testQuery := func(i int) { + if err := writeConfig(t, vtgateHostsFile, config[:i], vtgateproxyProcInstance); err != nil { t.Fatal(err) } @@ -139,9 +121,10 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) { defer cancel() result, err := selectHelper[customerEntry](ctx, conn, "select id, email from customer") - // 0 vtgates should fail - // First vtgate is in the wrong pool, so it should also fail - if i <= 1 { + + switch i { + case 0: + // If there are 0 vtgates, expect a failure if err == nil { t.Fatal("query should have failed with no vtgates") } @@ -150,25 +133,54 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) { if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) { t.Fatal("query timed out but it should have failed fast") } - } else if err != nil { - t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err) - } else { - assert.Equal(t, []customerEntry{{1, "email1"}}, result) - } + case 1: + // If there is 1 vtgate, expect a failure since it's in the wrong pool + if err == nil { + t.Fatal("query should have failed with no vtgates in the pool") + } - if scaleUp { - i++ - if i >= len(config) { - scaleUp = false - i -= 2 + // In first_ready mode, we expect to fail fast and not time out. + if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) { + t.Fatal("query timed out but it should have failed fast") + } + default: + if err != nil { + t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err) } - continue + assert.Equal(t, []customerEntry{{1, "email1"}}, result) } + } - i-- - if i < 0 { - break - } + log.Info("Reading test value while scaling vtgates") + + // Start with an empty list of vtgates, then scale up, then scale back to + // 0. We should expect to see immediate failure when there are no vtgates, + // then success at each scale, until we hit 0 vtgates again, at which point + // we should fail fast again. + for i := 0; i <= len(config); i++ { + testQuery(i) } + + for i := len(config) - 1; i >= 0; i-- { + testQuery(i) + } +} + +func writeConfig(t *testing.T, target string, config []map[string]string, proc *VtgateProxyProcess) error { + t.Logf("writing config with %v vtgates", len(config)) + + b, err := json.Marshal(config) + if err != nil { + return err + } + if err := os.WriteFile(target, b, 0644); err != nil { + return err + } + + if proc != nil { + return proc.WaitForConfig(config, 5*time.Second) + } + + return nil } diff --git a/test/config.json b/test/config.json index 2d5ee723dd3..20d1c288d0b 100644 --- a/test/config.json +++ b/test/config.json @@ -1235,6 +1235,15 @@ "Shard": "topo_connection_cache", "RetryMax": 1, "Tags": [] + }, + "vtgateproxy": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgateproxy"], + "Command": [], + "Manual": false, + "Shard": "vtgate_general_heavy", + "RetryMax": 1, + "Tags": [] } } }