Skip to content

Commit

Permalink
Remove Usage of VReplicationExec For _vt.vreplication Reads (#14424)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Mar 11, 2024
1 parent b99e150 commit fbaed97
Show file tree
Hide file tree
Showing 35 changed files with 12,278 additions and 6,053 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {

ms := &vtctldatapb.MaterializeSettings{
Workflow: common.BaseOptions.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_CUSTOM,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
SourceKeyspace: createOptions.SourceKeyspace,
TableSettings: createOptions.TableSettings.val,
Expand Down
20 changes: 18 additions & 2 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,27 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
return found, nil
}

func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, workflow string, database string, want int) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where workflow='%s'", sidecarDBIdentifier, workflow).Query
// expectNumberOfStreams waits for the given number of streams to be present and
// by default RUNNING. If you want to wait for different states, then you can
// pass in the state(s) you want to wait for.
func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, workflow string, database string, want int, states ...string) {
var query string
if len(states) == 0 {
states = append(states, binlogdatapb.VReplicationWorkflowState_Running.String())
}
query = sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where workflow='%s' and state in ('%s')",
sidecarDBIdentifier, workflow, strings.Join(states, "','")).Query
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}

// confirmAllStreamsRunning confirms that all of the migrated streams are running
// after a Reshard.
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
waitForQueryResult(t, vtgateConn, database, query, `[[INT64(0)]]`)
}

func printShardPositions(vc *VitessCluster, ksShards []string) {
for _, ksShard := range ksShards {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard)
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestVtctlMigrate(t *testing.T) {
"--source=ext1.rating", "create", ksWorkflow); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1, binlogdatapb.VReplicationWorkflowState_Stopped.String())
waitForRowCount(t, vtgateConn, "product:0", "rating", 0)
waitForRowCount(t, vtgateConn, "product:0", "review", 0)
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "cancel", ksWorkflow); err != nil {
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestVtctldMigrate(t *testing.T) {
"--mount-name", "ext1", "--all-tables", "--auto-start=false", "--cells=extcell1")
require.NoError(t, err, "Migrate command failed with %s", output)

expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1, binlogdatapb.VReplicationWorkflowState_Stopped.String())
waitForRowCount(t, vtgateConn, "product:0", "rating", 0)
waitForRowCount(t, vtgateConn, "product:0", "review", 0)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,13 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string

insertMoreCustomers(t, 16)
reshardCustomer2to4Split(t, nil, "")
confirmAllStreamsRunning(t, vtgateConn, "customer:-40")
expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4)
reshardCustomer3to2SplitMerge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:-60")
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:0")
expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,7 @@ type VRSettings struct {
DeferSecondaryKeys bool
}

// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
// ReadVRSettings retrieves the settings for a vreplication stream.
func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
Expand Down
6 changes: 4 additions & 2 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func (dc *MockDBClient) Close() {

// ExecuteFetch is part of the DBClient interface
func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
// Serialize ExecuteFetch to enforce a strict order on shared dbClients.
dc.expectMu.Lock()
defer dc.expectMu.Unlock()

dc.t.Helper()
msg := "DBClient query: %v"
if dc.Tag != "" {
Expand All @@ -195,8 +199,6 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
}

dc.expectMu.Lock()
defer dc.expectMu.Unlock()
if dc.currentResult >= len(dc.expect) {
msg := "DBClientMock: query: %s, no more requests are expected"
if dc.Tag != "" {
Expand Down
11 changes: 11 additions & 0 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconn"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -75,6 +76,16 @@ var (
}
)

// BuildTabletTypesString is a helper to build a serialized string representation of
// the tablet type(s) and optional in order clause for later use with the TabletPicker.
func BuildTabletTypesString(tabletTypes []topodatapb.TabletType, tabletSelectionPreference tabletmanagerdatapb.TabletSelectionPreference) string {
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if tabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = InOrderHint + tabletTypesStr
}
return tabletTypesStr
}

// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment
func GetTabletPickerRetryDelay() time.Duration {
muTabletPickerRetryDelay.Lock()
Expand Down
Loading

0 comments on commit fbaed97

Please sign in to comment.