Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-vitess-r15.0.5: backport v20 vtgate syscall/topo-call optimizations #227

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ require (
github.com/bndr/gotabulate v1.1.2
github.com/hashicorp/go-version v1.6.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/sync v0.3.0
golang.org/x/tools/cmd/cover v0.1.0-deprecated
modernc.org/sqlite v1.20.3
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Usage of vtctld:
--grpc_server_initial_window_size int gRPC server initial window size
--grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s)
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
-h, --help display usage and exit
--jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Usage of vtgate:
--grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s)
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
-h, --help display usage and exit
Expand Down
32 changes: 9 additions & 23 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"time"

"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -88,6 +89,9 @@ var (
// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency = 32

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond
)
Expand Down Expand Up @@ -166,6 +170,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -282,6 +287,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -316,6 +323,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -700,30 +708,8 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets
return hc.waitForTablets(ctx, targets, true)
}

// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces
func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target {
filteredTargets := make([]*query.Target, 0)

// Keep them all if there are no keyspaces to watch
if len(KeyspacesToWatch) == 0 {
return append(filteredTargets, targets...)
}

// Let's remove from the target shards that are not in the keyspaceToWatch list.
for _, target := range targets {
for _, keyspaceToWatch := range keyspaces {
if target.Keyspace == keyspaceToWatch {
filteredTargets = append(filteredTargets, target)
}
}
}
return filteredTargets
}

// waitForTablets is the internal method that polls for tablets.
func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error {
targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets)

for {
// We nil targets as we find them.
allPresent := true
Expand Down Expand Up @@ -800,7 +786,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(), nil
return thc.Connection(hc), nil
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
21 changes: 0 additions & 21 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,27 +645,6 @@ func TestWaitForAllServingTablets(t *testing.T) {

err = hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace")

targets = []*querypb.Target{

{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

KeyspacesToWatch = []string{tablet.Keyspace}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered")

KeyspacesToWatch = []string{}
}

// TestRemoveTablet tests the behavior when a tablet goes away.
Expand Down
37 changes: 31 additions & 6 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package discovery
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
Expand All @@ -34,12 +35,16 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
var withDialerContextOnce sync.Once

// tabletHealthCheck maintains the health status of a tablet. A map of this
// structure is maintained in HealthCheck.
type tabletHealthCheck struct {
Expand Down Expand Up @@ -123,8 +128,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection()
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(hc)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -137,14 +142,34 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S
return err
}

func (thc *tabletHealthCheck) Connection() queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked()
return thc.connectionLocked(hc)
}

func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer hc.healthCheckDialSem.Release(1)
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
}
}

func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService {
func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService {
if thc.Conn == nil {
withDialerContextOnce.Do(func() {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})
})
conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -273,7 +298,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}()

// Read stream health responses.
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpcclient
import (
"context"
"crypto/tls"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -39,6 +40,7 @@ import (
)

var (
grpcDialOptionsMu sync.Mutex
keepaliveTime = 10 * time.Second
keepaliveTimeout = 10 * time.Second
initialConnWindowSize int
Expand Down Expand Up @@ -88,6 +90,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error)

// RegisterGRPCDialOptions registers an implementation of AuthServer.
func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) {
grpcDialOptionsMu.Lock()
defer grpcDialOptionsMu.Unlock()
grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc)
}

Expand Down Expand Up @@ -137,12 +141,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...

newopts = append(newopts, opts...)
var err error
grpcDialOptionsMu.Lock()
for _, grpcDialOptionInitializer := range grpcDialOptions {
newopts, err = grpcDialOptionInitializer(newopts)
if err != nil {
log.Fatalf("There was an error initializing client grpc.DialOption: %v", err)
}
}
grpcDialOptionsMu.Unlock()

newopts = append(newopts, interceptors()...)

Expand Down
87 changes: 75 additions & 12 deletions go/vt/grpcclient/client_auth_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,35 @@ import (
"context"
"encoding/json"
"os"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"vitess.io/vitess/go/vt/servenv"
)

var (
credsFile string // registered as --grpc_auth_static_client_creds in RegisterFlags
// StaticAuthClientCreds implements client interface to be able to WithPerRPCCredentials
_ credentials.PerRPCCredentials = (*StaticAuthClientCreds)(nil)

clientCreds *StaticAuthClientCreds
clientCredsCancel context.CancelFunc
clientCredsErr error
clientCredsMu sync.Mutex
clientCredsSigChan chan os.Signal
)

// StaticAuthClientCreds holder for client credentials
// StaticAuthClientCreds holder for client credentials.
type StaticAuthClientCreds struct {
Username string
Password string
}

// GetRequestMetadata gets the request metadata as a map from StaticAuthClientCreds
// GetRequestMetadata gets the request metadata as a map from StaticAuthClientCreds.
func (c *StaticAuthClientCreds) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{
"username": c.Username,
Expand All @@ -47,30 +58,82 @@ func (c *StaticAuthClientCreds) GetRequestMetadata(context.Context, ...string) (

// RequireTransportSecurity indicates whether the credentials requires transport security.
// Given that people can use this with or without TLS, at the moment we are not enforcing
// transport security
// transport security.
func (c *StaticAuthClientCreds) RequireTransportSecurity() bool {
return false
}

// AppendStaticAuth optionally appends static auth credentials if provided.
func AppendStaticAuth(opts []grpc.DialOption) ([]grpc.DialOption, error) {
if credsFile == "" {
return opts, nil
}
data, err := os.ReadFile(credsFile)
creds, err := getStaticAuthCreds()
if err != nil {
return nil, err
}
clientCreds := &StaticAuthClientCreds{}
err = json.Unmarshal(data, clientCreds)
if creds != nil {
grpcCreds := grpc.WithPerRPCCredentials(creds)
opts = append(opts, grpcCreds)
}
return opts, nil
}

// ResetStaticAuth resets the static auth credentials.
func ResetStaticAuth() {
clientCredsMu.Lock()
defer clientCredsMu.Unlock()
if clientCredsCancel != nil {
clientCredsCancel()
clientCredsCancel = nil
}
clientCreds = nil
clientCredsErr = nil
}

// getStaticAuthCreds returns the static auth creds and error.
func getStaticAuthCreds() (*StaticAuthClientCreds, error) {
clientCredsMu.Lock()
defer clientCredsMu.Unlock()
if credsFile != "" && clientCreds == nil {
var ctx context.Context
ctx, clientCredsCancel = context.WithCancel(context.Background())
go handleClientCredsSignals(ctx)
clientCreds, clientCredsErr = loadStaticAuthCredsFromFile(credsFile)
}
return clientCreds, clientCredsErr
}

// handleClientCredsSignals handles signals to reload client creds.
func handleClientCredsSignals(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-clientCredsSigChan:
if newCreds, err := loadStaticAuthCredsFromFile(credsFile); err == nil {
clientCredsMu.Lock()
clientCreds = newCreds
clientCredsErr = err
clientCredsMu.Unlock()
}
}
}
}

// loadStaticAuthCredsFromFile loads static auth credentials from a file.
func loadStaticAuthCredsFromFile(path string) (*StaticAuthClientCreds, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
creds := grpc.WithPerRPCCredentials(clientCreds)
opts = append(opts, creds)
return opts, nil
creds := &StaticAuthClientCreds{}
err = json.Unmarshal(data, creds)
return creds, err
}

func init() {
servenv.OnInit(func() {
clientCredsSigChan = make(chan os.Signal, 1)
signal.Notify(clientCredsSigChan, syscall.SIGHUP)
_, _ = getStaticAuthCreds() // preload static auth credentials
})
RegisterGRPCDialOptions(AppendStaticAuth)
}
Loading
Loading