Skip to content

Commit

Permalink
Fixup tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed May 10, 2024
1 parent 7332061 commit 2fa4e55
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
37 changes: 18 additions & 19 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func TestSchemaVersioning(t *testing.T) {
cancel()

log.Infof("\n\n\n=============================================== PAST EVENTS WITH TRACK VERSIONS START HERE ======================\n\n\n")
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
eventCh = make(chan []*binlogdatapb.VEvent)
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
eventCh2 := make(chan []*binlogdatapb.VEvent)
send = func(events []*binlogdatapb.VEvent) error {
var evs []*binlogdatapb.VEvent
for _, event := range events {
Expand All @@ -208,16 +208,16 @@ func TestSchemaVersioning(t *testing.T) {
return nil
}
select {
case eventCh <- evs:
case <-ctx.Done():
case eventCh2 <- evs:
case <-ctx2.Done():
t.Fatal("Context Done() in send")
}
return nil
}
go func() {
defer close(eventCh)
defer close(eventCh2)
req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter}
if err := tsv.VStream(ctx, req, send); err != nil {
if err := tsv.VStream(ctx2, req, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
Expand Down Expand Up @@ -254,15 +254,14 @@ func TestSchemaVersioning(t *testing.T) {
`gtid`,
)

expectLogs(ctx, t, "Past stream", eventCh, output)

cancel()
expectLogs(ctx2, t, "Past stream", eventCh2, output)
cancel2()

log.Infof("\n\n\n=============================================== PAST EVENTS WITHOUT TRACK VERSIONS START HERE ======================\n\n\n")
tsv.EnableHistorian(false)
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
eventCh = make(chan []*binlogdatapb.VEvent)
ctx3, cancel3 := context.WithCancel(context.Background())
defer cancel3()
eventCh3 := make(chan []*binlogdatapb.VEvent)
send = func(events []*binlogdatapb.VEvent) error {
var evs []*binlogdatapb.VEvent
for _, event := range events {
Expand All @@ -280,16 +279,16 @@ func TestSchemaVersioning(t *testing.T) {
return nil
}
select {
case eventCh <- evs:
case <-ctx.Done():
case eventCh3 <- evs:
case <-ctx3.Done():
t.Fatal("Context Done() in send")
}
return nil
}
go func() {
defer close(eventCh)
defer close(eventCh3)
req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter}
if err := tsv.VStream(ctx, req, send); err != nil {
if err := tsv.VStream(ctx3, req, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
Expand Down Expand Up @@ -329,8 +328,8 @@ func TestSchemaVersioning(t *testing.T) {
`gtid`,
)

expectLogs(ctx, t, "Past stream", eventCh, output)
cancel()
expectLogs(ctx3, t, "Past stream", eventCh3, output)
cancel3()

client := framework.NewClient()
client.Execute("drop table vitess_version", nil)
Expand Down
10 changes: 6 additions & 4 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,16 @@ func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.Identi
if mt != nil {
return mt, nil
}
se.mu.Lock()
defer se.mu.Unlock()
// We got nothing from the historian, which generally means that it's not enabled.
// Whatever the reason, we should get the latest schema for the "current" position.
// In order to ensure this, we need to reload the latest schema first so that our
// cache is up to date and we do in fact return the latest/current table schema.
se.mu.Lock()
defer se.mu.Unlock()
if err := se.reload(ctx, true); err != nil {
return nil, err
if se.conns != nil { // Test Engines (NewEngineForTests()) don't have a conns pool
if err := se.reload(ctx, true); err != nil {
return nil, err
}
}
tableNameStr := tableName.String()
st, ok := se.tables[tableNameStr]
Expand Down

0 comments on commit 2fa4e55

Please sign in to comment.