Skip to content

Commit

Permalink
Add GetServerStatus RPC to use in PRS (#16022)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 authored Jun 3, 2024
1 parent 1090d42 commit d48ef18
Show file tree
Hide file tree
Showing 22 changed files with 3,318 additions and 1,849 deletions.
5 changes: 5 additions & 0 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func tmcPrimaryPosition(ctx context.Context, tabletGrpcPort int) (string, error)
return tmClient.PrimaryPosition(ctx, vtablet)
}

func tmcGetGlobalStatusVars(ctx context.Context, tabletGrpcPort int, variables []string) (map[string]string, error) {
vtablet := getTablet(tabletGrpcPort)
return tmClient.GetGlobalStatusVars(ctx, vtablet, variables)
}

func tmcStartReplicationUntilAfter(ctx context.Context, tabletGrpcPort int, positon string, waittime time.Duration) error {
vtablet := getTablet(tabletGrpcPort)
return tmClient.StartReplicationUntilAfter(ctx, vtablet, positon, waittime)
Expand Down
29 changes: 29 additions & 0 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletmanager
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -100,3 +101,31 @@ func TestResetReplicationParameters(t *testing.T) {
require.NoError(t, err)
require.Len(t, res.Rows, 0)
}

// TestGetGlobalStatusVars tests the GetGlobalStatusVars RPC
func TestGetGlobalStatusVars(t *testing.T) {
ctx := context.Background()
statusValues, err := tmcGetGlobalStatusVars(ctx, replicaTablet.GrpcPort, []string{"Innodb_buffer_pool_pages_data", "unknown_value"})
require.NoError(t, err)
require.Len(t, statusValues, 1)
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_data")

statusValues, err = tmcGetGlobalStatusVars(ctx, replicaTablet.GrpcPort, []string{"Uptime", "Innodb_buffer_pool_pages_data"})
require.NoError(t, err)
require.Len(t, statusValues, 2)
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_data")
checkValueGreaterZero(t, statusValues, "Uptime")

statusValues, err = tmcGetGlobalStatusVars(ctx, replicaTablet.GrpcPort, nil)
require.NoError(t, err)
require.Greater(t, len(statusValues), 250)
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_data")
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_free")
checkValueGreaterZero(t, statusValues, "Uptime")
}

func checkValueGreaterZero(t *testing.T, statusValues map[string]string, val string) {
valInMap, err := strconv.Atoi(statusValues[val])
require.NoError(t, err)
require.Greater(t, valInMap, 0)
}
7 changes: 7 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ func (fmd *FakeMysqlDaemon) SetSuperReadOnly(ctx context.Context, on bool) (Rese
return nil, nil
}

// GetGlobalStatusVars is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error) {
return make(map[string]string), fmd.ExecuteSuperQueryList(ctx, []string{
"FAKE " + getGlobalStatusQuery,
})
}

// StartReplication is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StartReplication(ctx context.Context, hookExtraEnv map[string]string) error {
if fmd.StartReplicationError != nil {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type MysqlDaemon interface {
// GetServerUUID returns the servers UUID
GetServerUUID(ctx context.Context) (string, error)

// GetGlobalStatusVars returns the server's global status variables asked for.
// An empty/nil variable name parameter slice means you want all of them.
GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error)

// replication related methods
StartReplication(ctx context.Context, hookExtraEnv map[string]string) error
RestartReplication(ctx context.Context, hookExtraEnv map[string]string) error
Expand Down
40 changes: 40 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,20 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/replicationdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

const (
// Queries used for RPCs
getGlobalStatusQuery = "SELECT variable_name, variable_value FROM performance_schema.global_status"
)

type ResetSuperReadOnlyFunc func() error

// WaitForReplicationStart waits until the deadline for replication to start.
Expand Down Expand Up @@ -228,6 +236,38 @@ func (mysqld *Mysqld) GetServerUUID(ctx context.Context) (string, error) {
return conn.Conn.GetServerUUID()
}

// GetGlobalStatusVars returns the server's global status variables asked for.
// An empty/nil variable name parameter slice means you want all of them.
func (mysqld *Mysqld) GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error) {
query := getGlobalStatusQuery
if len(variables) != 0 {
// The format specifier is for any optional predicates.
statusBv, err := sqltypes.BuildBindVariable(variables)
if err != nil {
return nil, err
}
query, err = sqlparser.ParseAndBind(getGlobalStatusQuery+" WHERE variable_name IN %a",
statusBv,
)
if err != nil {
return nil, err
}
}
qr, err := mysqld.FetchSuperQuery(ctx, query)
if err != nil {
return nil, err
}

finalRes := make(map[string]string, len(qr.Rows))
for _, row := range qr.Rows {
if len(row) != 2 {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "incorrect number of fields in the row")
}
finalRes[row[0].ToString()] = row[1].ToString()
}
return finalRes, nil
}

// IsReadOnly return true if the instance is read only
func (mysqld *Mysqld) IsReadOnly(ctx context.Context) (bool, error) {
qr, err := mysqld.FetchSuperQuery(ctx, "SHOW VARIABLES LIKE 'read_only'")
Expand Down
Loading

0 comments on commit d48ef18

Please sign in to comment.