diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 312273e0c84..8a027ee6961 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -188,9 +188,9 @@ func TestSchemaVersioning(t *testing.T) { cancel() log.Infof("\n\n\n=============================================== PAST EVENTS WITH TRACK VERSIONS START HERE ======================\n\n\n") - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() - eventCh = make(chan []*binlogdatapb.VEvent) + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + eventCh2 := make(chan []*binlogdatapb.VEvent) send = func(events []*binlogdatapb.VEvent) error { var evs []*binlogdatapb.VEvent for _, event := range events { @@ -208,16 +208,16 @@ func TestSchemaVersioning(t *testing.T) { return nil } select { - case eventCh <- evs: - case <-ctx.Done(): + case eventCh2 <- evs: + case <-ctx2.Done(): t.Fatal("Context Done() in send") } return nil } go func() { - defer close(eventCh) + defer close(eventCh2) req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter} - if err := tsv.VStream(ctx, req, send); err != nil { + if err := tsv.VStream(ctx2, req, send); err != nil { fmt.Printf("Error in tsv.VStream: %v", err) t.Error(err) } @@ -254,15 +254,14 @@ func TestSchemaVersioning(t *testing.T) { `gtid`, ) - expectLogs(ctx, t, "Past stream", eventCh, output) - - cancel() + expectLogs(ctx2, t, "Past stream", eventCh2, output) + cancel2() log.Infof("\n\n\n=============================================== PAST EVENTS WITHOUT TRACK VERSIONS START HERE ======================\n\n\n") tsv.EnableHistorian(false) - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() - eventCh = make(chan []*binlogdatapb.VEvent) + ctx3, cancel3 := context.WithCancel(context.Background()) + defer cancel3() + eventCh3 := make(chan []*binlogdatapb.VEvent) send = func(events []*binlogdatapb.VEvent) error { var evs []*binlogdatapb.VEvent for _, event := range events { @@ -280,16 +279,16 @@ func TestSchemaVersioning(t *testing.T) { return nil } select { - case eventCh <- evs: - case <-ctx.Done(): + case eventCh3 <- evs: + case <-ctx3.Done(): t.Fatal("Context Done() in send") } return nil } go func() { - defer close(eventCh) + defer close(eventCh3) req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter} - if err := tsv.VStream(ctx, req, send); err != nil { + if err := tsv.VStream(ctx3, req, send); err != nil { fmt.Printf("Error in tsv.VStream: %v", err) t.Error(err) } @@ -329,8 +328,8 @@ func TestSchemaVersioning(t *testing.T) { `gtid`, ) - expectLogs(ctx, t, "Past stream", eventCh, output) - cancel() + expectLogs(ctx3, t, "Past stream", eventCh3, output) + cancel3() client := framework.NewClient() client.Execute("drop table vitess_version", nil) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 02f20202b8d..420879e6ca3 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -680,14 +680,16 @@ func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.Identi if mt != nil { return mt, nil } + se.mu.Lock() + defer se.mu.Unlock() // We got nothing from the historian, which generally means that it's not enabled. // Whatever the reason, we should get the latest schema for the "current" position. // In order to ensure this, we need to reload the latest schema first so that our // cache is up to date and we do in fact return the latest/current table schema. - se.mu.Lock() - defer se.mu.Unlock() - if err := se.reload(ctx, true); err != nil { - return nil, err + if se.conns != nil { // Test Engines (NewEngineForTests()) don't have a conns pool + if err := se.reload(ctx, true); err != nil { + return nil, err + } } tableNameStr := tableName.String() st, ok := se.tables[tableNameStr]