Skip to content

Commit

Permalink
fix build for go1.18
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Sep 19, 2023
1 parent 9a2dee5 commit 564e675
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
24 changes: 12 additions & 12 deletions go/vt/vtgate/engine/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
querypb "vitess.io/vitess/go/vt/proto/query"
)

Expand Down Expand Up @@ -96,36 +96,36 @@ func (jn *Join) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[st

// TryStreamExecute performs a streaming exec.
func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var fieldNeeded atomic.Bool
fieldNeeded.Store(wantfields)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, fieldNeeded.Load(), func(lresult *sqltypes.Result) error {
var fieldNeeded sync2.AtomicBool
fieldNeeded.Set(wantfields)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, fieldNeeded.Get(), func(lresult *sqltypes.Result) error {
joinVars := make(map[string]*querypb.BindVariable)
for _, lrow := range lresult.Rows {
for k, col := range jn.Vars {
joinVars[k] = sqltypes.ValueBindVariable(lrow[col])
}
var rowSent atomic.Bool
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), fieldNeeded.Load(), func(rresult *sqltypes.Result) error {
var rowSent sync2.AtomicBool
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), fieldNeeded.Get(), func(rresult *sqltypes.Result) error {
result := &sqltypes.Result{}
if fieldNeeded.Load() {
if fieldNeeded.Get() {
// This code is currently unreachable because the first result
// will always be just the field info, which will cause the outer
// wantfields code path to be executed. But this may change in the future.
fieldNeeded.Store(false)
fieldNeeded.Set(false)
result.Fields = joinFields(lresult.Fields, rresult.Fields, jn.Cols)
}
for _, rrow := range rresult.Rows {
result.Rows = append(result.Rows, joinRows(lrow, rrow, jn.Cols))
}
if len(rresult.Rows) != 0 {
rowSent.Store(true)
rowSent.Set(true)
}
return callback(result)
})
if err != nil {
return err
}
if jn.Opcode == LeftJoin && !rowSent.Load() {
if jn.Opcode == LeftJoin && !rowSent.Get() {
result := &sqltypes.Result{}
result.Rows = [][]sqltypes.Value{joinRows(
lrow,
Expand All @@ -135,8 +135,8 @@ func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
return callback(result)
}
}
if fieldNeeded.Load() {
fieldNeeded.Store(false)
if fieldNeeded.Get() {
fieldNeeded.Set(false)
for k := range jn.Vars {
joinVars[k] = sqltypes.NullBindVariable
}
Expand Down
11 changes: 4 additions & 7 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3784,26 +3784,23 @@ func TestMain(m *testing.M) {
}

func TestStreamJoinQuery(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Special setup: Don't use createExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
u := createSandbox(KsTestUnsharded)
s := createSandbox(KsTestSharded)
s.VSchema = executorVSchema
u.VSchema = unshardedVSchema
serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)
serv := newSandboxForCells([]string{cell})
resolver := newTestResolver(hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor := createExecutor(serv, cell, resolver)

sql := "select u.foo, u.apa, ue.bar, ue.apa from user u join user_extra ue on u.foo = ue.bar"
result, err := executorStream(ctx, executor, sql)
result, err := executorStream(executor, sql)
require.NoError(t, err)
wantResult := &sqltypes.Result{
Fields: append(sandboxconn.SingleRowResult.Fields, sandboxconn.SingleRowResult.Fields...),
Expand Down

0 comments on commit 564e675

Please sign in to comment.