Skip to content

Commit

Permalink
Tablet throttler: inter-checks via gRPC (vitessio#13514)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Aug 1, 2023
1 parent dcacec7 commit b065394
Show file tree
Hide file tree
Showing 23 changed files with 1,814 additions and 198 deletions.
343 changes: 263 additions & 80 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

419 changes: 419 additions & 0 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Large diffs are not rendered by default.

231 changes: 121 additions & 110 deletions go/vt/proto/tabletmanagerservice/tabletmanagerservice.pb.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions go/vt/proto/tabletmanagerservice/tabletmanagerservice_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,10 @@ func (itmc *internalTabletManagerClient) RestoreFromBackup(context.Context, *top
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) CheckThrottler(context.Context, *topodatapb.Tablet, *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) Close() {
}

Expand Down
34 changes: 34 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ type TabletManagerClient struct {
// WaitForPosition(tablet *topodatapb.Tablet, position string) error, so we
// key by tablet alias and then by position.
WaitForPositionResults map[string]map[string]error
// tablet alias => duration
CheckThrottlerDelays map[string]time.Duration
// keyed by tablet alias
CheckThrottlerResults map[string]*tabletmanagerdatapb.CheckThrottlerResponse
}

type backupStreamAdapter struct {
Expand Down Expand Up @@ -1326,3 +1330,33 @@ func (fake *TabletManagerClient) VReplicationExec(ctx context.Context, tablet *t

return nil, assert.AnError
}

// CheckThrottler is part of the tmclient.TabletManagerCLient interface.
func (fake *TabletManagerClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
if fake.CheckThrottlerResults == nil {
return nil, assert.AnError
}

if tablet.Alias == nil {
return nil, assert.AnError
}

key := topoproto.TabletAliasString(tablet.Alias)

if fake.CheckThrottlerDelays != nil {
if delay, ok := fake.CheckThrottlerDelays[key]; ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
// proceed to results
}
}
}

if resultsForTablet, ok := fake.CheckThrottlerResults[key]; ok {
return resultsForTablet, nil
}

return nil, assert.AnError
}
6 changes: 6 additions & 0 deletions go/vt/vttablet/faketmclient/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ func (client *FakeTabletManagerClient) RestoreFromBackup(ctx context.Context, ta
return &eofEventStream{}, nil
}

// Throttler related methods

func (client *FakeTabletManagerClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
return &tabletmanagerdatapb.CheckThrottlerResponse{}, nil
}

//
// Management related methods
//
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,20 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req
}, nil
}

// CheckThrottler is part of the tmclient.TabletManagerClient interface.
func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
c, closer, err := client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
}
defer closer.Close()
response, err := c.CheckThrottler(ctx, req)
if err != nil {
return nil, err
}
return response, nil
}

type restoreFromBackupStreamAdapter struct {
stream tabletmanagerservicepb.TabletManager_RestoreFromBackupClient
closer io.Closer
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/grpctmserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,13 @@ func (s *server) RestoreFromBackup(request *tabletmanagerdatapb.RestoreFromBacku
return s.tm.RestoreFromBackup(ctx, logger, request)
}

func (s *server) CheckThrottler(ctx context.Context, request *tabletmanagerdatapb.CheckThrottlerRequest) (response *tabletmanagerdatapb.CheckThrottlerResponse, err error) {
defer s.tm.HandleRPCPanic(ctx, "CheckThrottler", request, response, true /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)
response, err = s.tm.CheckThrottler(ctx, request)
return response, err
}

// registration glue

func init() {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,7 @@ type RPCTM interface {
// HandleRPCPanic is to be called in a defer statement in each
// RPC input point.
HandleRPCPanic(ctx context.Context, name string, args, reply any, verbose bool, err *error)

// Throttler
CheckThrottler(ctx context.Context, request *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error)
}
53 changes: 53 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletmanager

import (
"context"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// CheckThrottler executes a throttler check
func (tm *TabletManager) CheckThrottler(ctx context.Context, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
if req.AppName == "" {
req.AppName = throttlerapp.VitessName.String()
}
flags := &throttle.CheckFlags{
LowPriority: false,
SkipRequestHeartbeats: true,
}
checkResult := tm.QueryServiceControl.CheckThrottler(ctx, req.AppName, flags)
if checkResult == nil {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "nil checkResult")
}
resp := &tabletmanagerdatapb.CheckThrottlerResponse{
StatusCode: int32(checkResult.StatusCode),
Value: checkResult.Value,
Threshold: checkResult.Threshold,
Message: checkResult.Message,
RecentlyChecked: checkResult.RecentlyChecked,
}
if checkResult.Error != nil {
resp.Error = checkResult.Error.Error()
}
return resp, nil
}
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ func (tmc *fakeTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb
return pos, nil
}

func (tmc *fakeTMClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
return &tabletmanagerdatapb.CheckThrottlerResponse{}, nil
}

// ----------------------------------------------
// testVDiffEnv

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"

"time"

Expand Down Expand Up @@ -89,6 +90,9 @@ type Controller interface {

// TopoServer returns the topo server.
TopoServer() *topo.Server

// CheckThrottler
CheckThrottler(ctx context.Context, appName string, flags *throttle.CheckFlags) *throttle.CheckResult
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,12 @@ func (tsv *TabletServer) TopoServer() *topo.Server {
return tsv.topoServer
}

// CheckThrottler issues a self check
func (tsv *TabletServer) CheckThrottler(ctx context.Context, appName string, flags *throttle.CheckFlags) *throttle.CheckResult {
r := tsv.lagThrottler.CheckByType(ctx, appName, "", flags, throttle.ThrottleCheckSelf)
return r
}

// HandlePanic is part of the queryservice.QueryService interface
func (tsv *TabletServer) HandlePanic(err *error) {
if x := recover(); x != nil {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/mysql/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package mysql

import (
"fmt"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// Probe is the minimal configuration required to connect to a MySQL server
type Probe struct {
Key InstanceKey
MetricQuery string
Tablet *topodatapb.Tablet
TabletHost string
TabletPort int
CacheMillis int
Expand Down
Loading

0 comments on commit b065394

Please sign in to comment.