Skip to content

Commit

Permalink
Dedicated poolDialer logic for VTOrc, throttler (#15562)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
shlomi-noach and GuptaManan100 authored Mar 26, 2024
1 parent 6a18178 commit 90c0057
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 10 deletions.
73 changes: 73 additions & 0 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,76 @@ func TestDurabilityPolicySetLater(t *testing.T) {
assert.NotNil(t, primary, "should have elected a primary")
utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second)
}

func TestFullStatusConnectionPooling(t *testing.T) {
defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{
"--tablet_manager_grpc_concurrency=1",
}, cluster.VTOrcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]

// find primary from topo
curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, curPrimary, "should have elected a primary")
vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0]
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1)
utils.WaitForSuccessfulPRSCount(t, vtOrcProcess, keyspace.Name, shard0.Name, 1)

// Kill the current primary.
_ = curPrimary.VttabletProcess.Kill()

// Wait until VTOrc notices some problems
status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response == "null"
})
assert.Equal(t, 200, status)
assert.Contains(t, resp, "UnreachablePrimary")

time.Sleep(1 * time.Minute)

// Change the primaries ports and restart it.
curPrimary.VttabletProcess.Port = clusterInfo.ClusterInstance.GetAndReservePort()
curPrimary.VttabletProcess.GrpcPort = clusterInfo.ClusterInstance.GetAndReservePort()
err := curPrimary.VttabletProcess.Setup()
require.NoError(t, err)

// See that VTOrc eventually reports no errors.
// Wait until there are no problems and the api endpoint returns null
status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response != "null"
})
assert.Equal(t, 200, status)
assert.Equal(t, "null", resp)

// REPEATED
// Kill the current primary.
_ = curPrimary.VttabletProcess.Kill()

// Wait until VTOrc notices some problems
status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response == "null"
})
assert.Equal(t, 200, status)
assert.Contains(t, resp, "UnreachablePrimary")

time.Sleep(1 * time.Minute)

// Change the primaries ports back to original and restart it.
curPrimary.VttabletProcess.Port = curPrimary.HTTPPort
curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort
err = curPrimary.VttabletProcess.Setup()
require.NoError(t, err)

// See that VTOrc eventually reports no errors.
// Wait until there are no problems and the api endpoint returns null
status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response != "null"
})
assert.Equal(t, 200, status)
assert.Equal(t, "null", resp)
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status
// The function provided takes in the status and response and returns if we should continue to retry or not
func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, retry func(int, string) bool) (status int, response string) {
t.Helper()
timeout := time.After(10 * time.Second)
timeout := time.After(30 * time.Second)
for {
select {
case <-timeout:
Expand Down
79 changes: 70 additions & 9 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

type DialPoolGroup int

const (
dialPoolGroupThrottler DialPoolGroup = iota
dialPoolGroupVTOrc
)

type invalidatorFunc func()

var (
concurrency = 8
cert string
Expand Down Expand Up @@ -92,14 +101,17 @@ type tmc struct {
client tabletmanagerservicepb.TabletManagerClient
}

type addrTmcMap map[string]*tmc

// grpcClient implements both dialer and poolDialer.
type grpcClient struct {
// This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App},
// CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only.
// But that's OK because usually the tasks that use them are one-purpose only.
// The map is protected by the mutex.
mu sync.Mutex
rpcClientMap map[string]chan *tmc
mu sync.Mutex
rpcClientMap map[string]chan *tmc
rpcDialPoolMap map[DialPoolGroup]addrTmcMap
}

type dialer interface {
Expand All @@ -109,6 +121,7 @@ type dialer interface {

type poolDialer interface {
dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error)
dialDedicatedPool(ctx context.Context, dialPoolGroup DialPoolGroup, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, invalidatorFunc, error)
}

// Client implements tmclient.TabletManagerClient.
Expand Down Expand Up @@ -152,6 +165,17 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) (
return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil
}

func (client *grpcClient) createTmc(addr string, opt grpc.DialOption) (*tmc, error) {
cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
if err != nil {
return nil, err
}
return &tmc{
cc: cc,
client: tabletmanagerservicepb.NewTabletManagerClient(cc),
}, nil
}

func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) {
addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"]))
opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name)
Expand All @@ -170,14 +194,11 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table
client.mu.Unlock()

for i := 0; i < cap(c); i++ {
cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
tm, err := client.createTmc(addr, opt)
if err != nil {
return nil, err
}
c <- &tmc{
cc: cc,
client: tabletmanagerservicepb.NewTabletManagerClient(cc),
}
c <- tm
}
} else {
client.mu.Unlock()
Expand All @@ -188,6 +209,38 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table
return result.client, nil
}

func (client *grpcClient) dialDedicatedPool(ctx context.Context, dialPoolGroup DialPoolGroup, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, invalidatorFunc, error) {
addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"]))
opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name)
if err != nil {
return nil, nil, err
}

client.mu.Lock()
defer client.mu.Unlock()
if client.rpcDialPoolMap == nil {
client.rpcDialPoolMap = make(map[DialPoolGroup]addrTmcMap)
}
if _, ok := client.rpcDialPoolMap[dialPoolGroup]; !ok {
client.rpcDialPoolMap[dialPoolGroup] = make(addrTmcMap)
}
m := client.rpcDialPoolMap[dialPoolGroup]
if _, ok := m[addr]; !ok {
tm, err := client.createTmc(addr, opt)
if err != nil {
return nil, nil, err
}
m[addr] = tm
}
invalidator := func() {
client.mu.Lock()
defer client.mu.Unlock()
m[addr].cc.Close()
delete(m, addr)
}
return m[addr].client, invalidator, nil
}

// Close is part of the tmclient.TabletManagerClient interface.
func (client *grpcClient) Close() {
client.mu.Lock()
Expand Down Expand Up @@ -611,9 +664,10 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb.
// and dialing the other tablet every time is not practical.
func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) {
var c tabletmanagerservicepb.TabletManagerClient
var invalidator invalidatorFunc
var err error
if poolDialer, ok := client.dialer.(poolDialer); ok {
c, err = poolDialer.dialPool(ctx, tablet)
c, invalidator, err = poolDialer.dialDedicatedPool(ctx, dialPoolGroupVTOrc, tablet)
if err != nil {
return nil, err
}
Expand All @@ -630,6 +684,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet)

response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{})
if err != nil {
if invalidator != nil {
invalidator()
}
return nil, err
}
return response.Status, nil
Expand Down Expand Up @@ -1101,9 +1158,10 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req
// and dialing the other tablet every time is not practical.
func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
var c tabletmanagerservicepb.TabletManagerClient
var invalidator invalidatorFunc
var err error
if poolDialer, ok := client.dialer.(poolDialer); ok {
c, err = poolDialer.dialPool(ctx, tablet)
c, invalidator, err = poolDialer.dialDedicatedPool(ctx, dialPoolGroupThrottler, tablet)
if err != nil {
return nil, err
}
Expand All @@ -1120,6 +1178,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab

response, err := c.CheckThrottler(ctx, req)
if err != nil {
if invalidator != nil {
invalidator()
}
return nil, err
}
return response, nil
Expand Down
Loading

0 comments on commit 90c0057

Please sign in to comment.