From 7340936e74ec26f1db1ddc6bcb3dc1f182ae3e66 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 8 Jul 2024 14:32:40 -0700 Subject: [PATCH] integrate the tablet balancer into vtgate --- go/flags/endtoend/vtgate.txt | 4 ++- go/vt/vtgate/tabletgateway.go | 66 +++++++++++++++++++++++++++++++---- go/vt/vtgate/vtgate.go | 7 ++++ 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 68899cecd56..ae5d3eb1c58 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -28,7 +28,9 @@ Flags: --allow-kill-statement Allows the execution of kill statement --allowed_tablet_types strings Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types. --alsologtostderr log to standard error as well as files - --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. + --balancer_enabled Whether to enable the tablet balancer to evenly spread query load + --balancer_keyspaces strings When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional) + --balancer_vtgate_cells strings When in balanced mode, a comma-separated list of cells that contain vtgates (required) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --buffer_drain_concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1) --buffer_keyspace_shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true. --buffer_max_failover_duration duration Stop buffering completely if a failover takes longer than this duration. (default 20s) diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 63ae836d715..8363f95e891 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -20,7 +20,9 @@ import ( "context" "fmt" "math/rand/v2" + "net/http" "runtime/debug" + "slices" "sort" "sync" "sync/atomic" @@ -37,6 +39,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/balancer" "vitess.io/vitess/go/vt/vtgate/buffer" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -54,6 +57,11 @@ var ( // retryCount is the number of times a query will be retried on error retryCount = 2 + // configuration flags for the tablet balancer + balancerEnabled bool + balancerVtgateCells []string + balancerKeyspaces []string + logCollations = logutil.NewThrottledLogger("CollationInconsistent", 1*time.Minute) ) @@ -62,6 +70,9 @@ func init() { fs.StringVar(&CellsToWatch, "cells_to_watch", "", "comma-separated list of cells for watching tablets") fs.DurationVar(&initialTabletTimeout, "gateway_initial_tablet_timeout", 30*time.Second, "At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type") fs.IntVar(&retryCount, "retry-count", 2, "retry count") + fs.BoolVar(&balancerEnabled, "balancer_enabled", false, "Whether to enable the tablet balancer to evenly spread query load") + fs.StringSliceVar(&balancerVtgateCells, "balancer_vtgate_cells", []string{}, "When in balanced mode, a comma-separated list of cells that contain vtgates (required)") + fs.StringSliceVar(&balancerKeyspaces, "balancer_keyspaces", []string{}, "When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)") }) } @@ -84,6 +95,9 @@ type TabletGateway struct { // buffer, if enabled, buffers requests during a detected PRIMARY failover. buffer *buffer.Buffer + + // balancer used for routing to tablets + balancer balancer.TabletBalancer } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { @@ -112,6 +126,9 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop statusAggregators: make(map[string]*TabletStatusAggregator), } gw.setupBuffering(ctx) + if balancerEnabled { + gw.setupBalancer(ctx) + } gw.QueryService = queryservice.Wrap(nil, gw.withRetry) return gw } @@ -145,6 +162,13 @@ func (gw *TabletGateway) setupBuffering(ctx context.Context) { }(bufferCtx, ksChan, gw.buffer) } +func (gw *TabletGateway) setupBalancer(ctx context.Context) { + if len(balancerVtgateCells) == 0 { + log.Exitf("balancer_vtgate_cells is required for balanced mode") + } + gw.balancer = balancer.NewTabletBalancer(gw.localCell, balancerVtgateCells) +} + // QueryServiceByAlias satisfies the Gateway interface func (gw *TabletGateway) QueryServiceByAlias(ctx context.Context, alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { qs, err := gw.hc.TabletConnection(ctx, alias, target) @@ -220,6 +244,15 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList { return res } +func (gw *TabletGateway) DebugBalancerHandler(w http.ResponseWriter, r *http.Request) { + if balancerEnabled { + gw.balancer.DebugHandler(w, r) + } else { + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte("not enabled")) + } +} + // withRetry gets available connections and executes the action. If there are retryable errors, // it retries retryCount times before failing. It does not retry if the connection is in // the middle of a transaction. While returning the error check if it maybe a result of @@ -306,16 +339,35 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, break } - gw.shuffleTablets(gw.localCell, tablets) - var th *discovery.TabletHealth - // skip tablets we tried before - for _, t := range tablets { - if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok { - th = t - break + + useBalancer := balancerEnabled + if balancerEnabled && len(balancerKeyspaces) > 0 { + useBalancer = slices.Contains(balancerKeyspaces, target.Keyspace) + } + if useBalancer { + // filter out the tablets that we've tried before (if any), then pick the best one + if len(invalidTablets) > 0 { + tablets = slices.DeleteFunc(tablets, func(t *discovery.TabletHealth) bool { + _, isInvalid := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)] + return isInvalid + }) + } + + th = gw.balancer.Pick(target, tablets) + + } else { + gw.shuffleTablets(gw.localCell, tablets) + + // skip tablets we tried before + for _, t := range tablets { + if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok { + th = t + break + } } } + if th == nil { // do not override error from last attempt. if err == nil { diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 9ea5da7a7e3..e23e313108c 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -373,6 +373,7 @@ func Init( }) vtgateInst.registerDebugHealthHandler() vtgateInst.registerDebugEnvHandler() + vtgateInst.registerDebugBalancerHandler() initAPI(gw.hc) return vtgateInst @@ -437,6 +438,12 @@ func (vtg *VTGate) registerDebugHealthHandler() { }) } +func (vtg *VTGate) registerDebugBalancerHandler() { + http.HandleFunc("/debug/balancer", func(w http.ResponseWriter, r *http.Request) { + vtg.Gateway().DebugBalancerHandler(w, r) + }) +} + // IsHealthy returns nil if server is healthy. // Otherwise, it returns an error indicating the reason. func (vtg *VTGate) IsHealthy() error {