Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SchemaEngine: Ensure GetTableForPos returns table schema for "current" position by default #15912

Merged
merged 9 commits into from
May 20, 2024
23 changes: 20 additions & 3 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtenv"

querypb "vitess.io/vitess/go/vt/proto/query"
)

const appendEntry = -1
const (
appendEntry = -1
useQuery = "use `fakesqldb`"
)

// DB is a fake database and all its methods are thread safe. It
// creates a mysql.Listener and implements the mysql.Handler
Expand Down Expand Up @@ -200,7 +204,7 @@ func New(t testing.TB) *DB {
db.listener.Accept()
}()

db.AddQuery("use `fakesqldb`", &sqltypes.Result{})
db.AddQuery(useQuery, &sqltypes.Result{})
// Return the db.
return db
}
Expand Down Expand Up @@ -598,6 +602,8 @@ func (db *DB) RejectQueryPattern(queryPattern, error string) {

// ClearQueryPattern removes all query patterns set up
func (db *DB) ClearQueryPattern() {
db.mu.Lock()
defer db.mu.Unlock()
db.patternData = make(map[string]exprResult)
}

Expand All @@ -617,6 +623,17 @@ func (db *DB) DeleteQuery(query string) {
delete(db.queryCalled, key)
}

// DeleteAllQueries deletes all expected queries from the fake DB.
func (db *DB) DeleteAllQueries() {
db.mu.Lock()
defer db.mu.Unlock()
clear(db.data)
clear(db.patternData)
clear(db.queryCalled)
// Use is always expected to be present.
db.data[useQuery] = &ExpectedResult{&sqltypes.Result{}, nil}
}

// AddRejectedQuery adds a query which will be rejected at execution time.
func (db *DB) AddRejectedQuery(query string, err error) {
db.mu.Lock()
Expand Down
3 changes: 1 addition & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,9 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace,
tablet := &Tablet{}

options := []string{
"--queryserver-config-schema-reload-time", "5s",
"--heartbeat_on_demand_duration", "5s",
"--heartbeat_interval", "250ms",
} // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
}
options = append(options, extraVTTabletArgs...)

if mainClusterConfig.vreplicationCompressGTID {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,10 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')")
}
waitForRowCount(t, vtgateConn, "product", "customer2", 3+num+num)
want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1)
waitForQueryResult(t, vtgateConn, "product", "select max(cid) from customer2", want)
res := execVtgateQuery(t, vtgateConn, "product", "select max(cid) from customer2")
cid, err := res.Rows[0][0].ToInt()
require.NoError(t, err)
require.GreaterOrEqual(t, cid, 100+num+num-1)
}

// testReplicatingWithPKEnumCols ensures that we properly apply binlog events
Expand Down
8 changes: 3 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vttablet"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/tb"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func addTablet(id int) *topodatapb.Tablet {
if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil {
panic(err)
}
env.SchemaEngine.Reload(context.Background())
return tablet
}

Expand All @@ -277,15 +276,13 @@ func addOtherTablet(id int, keyspace, shard string) *topodatapb.Tablet {
if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil {
panic(err)
}
env.SchemaEngine.Reload(context.Background())
return tablet
}

func deleteTablet(tablet *topodatapb.Tablet) {
env.TopoServ.DeleteTablet(context.Background(), tablet.Alias)
// This is not automatically removed from shard replication, which results in log spam.
topo.DeleteTabletReplicationData(context.Background(), env.TopoServ, tablet)
env.SchemaEngine.Reload(context.Background())
}

// fakeTabletConn implement TabletConn interface. We only care about the
Expand Down
7 changes: 0 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"fmt"
"testing"

"context"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory"
)
Expand All @@ -38,7 +36,6 @@ func TestJournalOneToOne(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -98,7 +95,6 @@ func TestJournalOneToMany(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -164,7 +160,6 @@ func TestJournalTablePresent(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -223,7 +218,6 @@ func TestJournalTableNotPresent(t *testing.T) {
"drop table t",
fmt.Sprintf("drop table %s.t", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -278,7 +272,6 @@ func TestJournalTableMixed(t *testing.T) {
fmt.Sprintf("drop table %s.t", vrepldb),
fmt.Sprintf("drop table %s.t1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down
14 changes: 0 additions & 14 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func testPlayerCopyCharPK(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -223,7 +222,6 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -453,7 +451,6 @@ func testPlayerCopyTablesWithFK(t *testing.T) {
"drop table src2",
fmt.Sprintf("drop table %s.dst2", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -581,7 +578,6 @@ func testPlayerCopyTables(t *testing.T) {
fmt.Sprintf("drop table %s.yes", vrepldb),
"drop table no",
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -708,7 +704,6 @@ func testPlayerCopyBigTable(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -839,7 +834,6 @@ func testPlayerCopyWildcardRule(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.src", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
Expand Down Expand Up @@ -968,7 +962,6 @@ func testPlayerCopyTableContinuation(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1135,7 +1128,6 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1232,7 +1224,6 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1358,7 +1349,6 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1444,7 +1434,6 @@ func testPlayerCopyTablesGIPK(t *testing.T) {
"drop table src2",
fmt.Sprintf("drop table %s.dst2", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1535,7 +1524,6 @@ func testPlayerCopyTableCancel(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

saveTimeout := vttablet.CopyPhaseDuration
vttablet.CopyPhaseDuration = 1 * time.Millisecond
Expand Down Expand Up @@ -1626,7 +1614,6 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
"drop table src2",
fmt.Sprintf("drop table %s.dst2", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -1714,7 +1701,6 @@ func testCopyTablesWithInvalidDates(t *testing.T) {
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down
Loading
Loading