Skip to content

Commit

Permalink
SchemaEngine: Ensure GetTableForPos returns table schema for "current…
Browse files Browse the repository at this point in the history
…" position by default (#15912)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored May 20, 2024
1 parent 93df29e commit 3b09eb2
Show file tree
Hide file tree
Showing 19 changed files with 316 additions and 183 deletions.
23 changes: 20 additions & 3 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtenv"

querypb "vitess.io/vitess/go/vt/proto/query"
)

const appendEntry = -1
const (
appendEntry = -1
useQuery = "use `fakesqldb`"
)

// DB is a fake database and all its methods are thread safe. It
// creates a mysql.Listener and implements the mysql.Handler
Expand Down Expand Up @@ -200,7 +204,7 @@ func New(t testing.TB) *DB {
db.listener.Accept()
}()

db.AddQuery("use `fakesqldb`", &sqltypes.Result{})
db.AddQuery(useQuery, &sqltypes.Result{})
// Return the db.
return db
}
Expand Down Expand Up @@ -598,6 +602,8 @@ func (db *DB) RejectQueryPattern(queryPattern, error string) {

// ClearQueryPattern removes all query patterns set up
func (db *DB) ClearQueryPattern() {
db.mu.Lock()
defer db.mu.Unlock()
db.patternData = make(map[string]exprResult)
}

Expand All @@ -617,6 +623,17 @@ func (db *DB) DeleteQuery(query string) {
delete(db.queryCalled, key)
}

// DeleteAllQueries deletes all expected queries from the fake DB.
func (db *DB) DeleteAllQueries() {
db.mu.Lock()
defer db.mu.Unlock()
clear(db.data)
clear(db.patternData)
clear(db.queryCalled)
// Use is always expected to be present.
db.data[useQuery] = &ExpectedResult{&sqltypes.Result{}, nil}
}

// AddRejectedQuery adds a query which will be rejected at execution time.
func (db *DB) AddRejectedQuery(query string, err error) {
db.mu.Lock()
Expand Down
3 changes: 1 addition & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,9 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace,
tablet := &Tablet{}

options := []string{
"--queryserver-config-schema-reload-time", "5s",
"--heartbeat_on_demand_duration", "5s",
"--heartbeat_interval", "250ms",
} // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
}
options = append(options, extraVTTabletArgs...)

if mainClusterConfig.vreplicationCompressGTID {
Expand Down
6 changes: 4 additions & 2 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,10 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')")
}
waitForRowCount(t, vtgateConn, "product", "customer2", 3+num+num)
want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1)
waitForQueryResult(t, vtgateConn, "product", "select max(cid) from customer2", want)
res := execVtgateQuery(t, vtgateConn, "product", "select max(cid) from customer2")
cid, err := res.Rows[0][0].ToInt()
require.NoError(t, err)
require.GreaterOrEqual(t, cid, 100+num+num-1)
}

// testReplicatingWithPKEnumCols ensures that we properly apply binlog events
Expand Down
8 changes: 3 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vttablet"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/tb"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func addTablet(id int) *topodatapb.Tablet {
if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil {
panic(err)
}
env.SchemaEngine.Reload(context.Background())
return tablet
}

Expand All @@ -277,15 +276,13 @@ func addOtherTablet(id int, keyspace, shard string) *topodatapb.Tablet {
if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil {
panic(err)
}
env.SchemaEngine.Reload(context.Background())
return tablet
}

func deleteTablet(tablet *topodatapb.Tablet) {
env.TopoServ.DeleteTablet(context.Background(), tablet.Alias)
// This is not automatically removed from shard replication, which results in log spam.
topo.DeleteTabletReplicationData(context.Background(), env.TopoServ, tablet)
env.SchemaEngine.Reload(context.Background())
}

// fakeTabletConn implement TabletConn interface. We only care about the
Expand Down
7 changes: 0 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"fmt"
"testing"

"context"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory"
)
Expand All @@ -38,7 +36,6 @@ func TestJournalOneToOne(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -98,7 +95,6 @@ func TestJournalOneToMany(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -164,7 +160,6 @@ func TestJournalTablePresent(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -223,7 +218,6 @@ func TestJournalTableNotPresent(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -278,7 +272,6 @@ func TestJournalTableMixed(t *testing.T) {
fmt.Sprintf("drop table %s.t", vrepldb),
fmt.Sprintf("drop table %s.t1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down
14 changes: 0 additions & 14 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func testPlayerCopyCharPK(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -223,7 +222,6 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -453,7 +451,6 @@ func testPlayerCopyTablesWithFK(t *testing.T) {
"drop table src2",
fmt.Sprintf("drop table %s.dst2", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -581,7 +578,6 @@ func testPlayerCopyTables(t *testing.T) {
fmt.Sprintf("drop table %s.yes", vrepldb),
"drop table no",
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -708,7 +704,6 @@ func testPlayerCopyBigTable(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -839,7 +834,6 @@ func testPlayerCopyWildcardRule(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.src", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -968,7 +962,6 @@ func testPlayerCopyTableContinuation(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1135,7 +1128,6 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1232,7 +1224,6 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1358,7 +1349,6 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1444,7 +1434,6 @@ func testPlayerCopyTablesGIPK(t *testing.T) {
"drop table src2",
fmt.Sprintf("drop table %s.dst2", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1535,7 +1524,6 @@ func testPlayerCopyTableCancel(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

saveTimeout := vttablet.CopyPhaseDuration
vttablet.CopyPhaseDuration = 1 * time.Millisecond
Expand Down Expand Up @@ -1626,7 +1614,6 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
"drop table src2",
fmt.Sprintf("drop table %s.dst2", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1714,7 +1701,6 @@ func testCopyTablesWithInvalidDates(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down
Loading

0 comments on commit 3b09eb2

Please sign in to comment.