Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition that prevents queries from being buffered after vtgate startup #16655

Merged
merged 10 commits into from
Aug 29, 2024
Merged
90 changes: 82 additions & 8 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package discovery
import (
"context"
"fmt"
"slices"
"sync"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand All @@ -37,6 +39,11 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

var (
// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
waitConsistentKeyspacesCheck = 100 * time.Millisecond
)

// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
// for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved.
// Right now this is capable of detecting the end of failovers, both planned and unplanned,
Expand Down Expand Up @@ -662,29 +669,53 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
return ks.beingResharded(target.Shard)
}

// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
// that the primary tablet for that shard is not serving. This is possible during a Planned
// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and
// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target.
// We check the following things before we start buffering -
// 1. The shard must have a primary.
// 2. The primary must be non-serving.
// 3. The keyspace must be marked inconsistent.
//
// This buffering is meant to kick in during a Planned Reparent Shard operation.
// As part of that operation the old primary will become non-serving. At that point
// this code should return true to start buffering requests.
// Just as the PRS operation completes, a new primary will be elected, and
// it will send its own healthcheck stating that it is serving. We should buffer requests until
// that point. There are use cases where people do not run with a Primary server at all, so we must
// that point.
//
// There are use cases where people do not run with a Primary server at all, so we must
// verify that we only start buffering when a primary was present, and it went not serving.
// The shard state keeps track of the current primary and the last externally reparented time, which
// we can use to determine that there was a serving primary which now became non serving. This is
// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
// stop when these operations succeed. We return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
// stop when these operations succeed. We also return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
if target.TabletType != topodatapb.TabletType_PRIMARY {
// We don't support buffering for any target tablet type other than the primary.
return nil, false
}
ks := kew.getKeyspaceStatus(ctx, target.Keyspace)
if ks == nil {
// If the keyspace status is nil, then the keyspace must be deleted.
// The user query is trying to access a keyspace that has been deleted.
// There is no reason to buffer this query.
return nil, false
}
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
// If the primary tablet was present then externallyReparented will be non-zero and
// currentPrimary will be not nil.
// As described in the function comment, we only want to start buffering when all the following conditions are met -
// 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty.
// They are set the first time the shard registers an update from a serving primary and are never cleared out after that.
// If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition
// will always be true.
// 2. The primary must be non-serving. We check this by checking the serving field in the shard state.
// When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added
// for being defensive against any bugs.
// 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state.
//
// The reason we need all the three checks is that we want to be very defensive in when we start buffering.
// We don't want to start buffering when we don't know for sure if the primary
// is not serving and we will receive an update that stops buffering soon.
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, false
Expand All @@ -703,3 +734,46 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
}
return servingKeyspaces
}

// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
for {
// We empty keyspaces as we find them to be consistent.
allConsistent := true
for i, ks := range keyspaces {
if ks == "" {
continue
}

// Get the keyspace status and see it is consistent yet or not.
kss := kew.getKeyspaceStatus(ctx, ks)
// If kss is nil, then it must be deleted. In that case too it is fine for us to consider
// it consistent since the keyspace has been deleted.
if kss == nil || kss.consistent {
keyspaces[i] = ""
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
} else {
allConsistent = false
}
}

if allConsistent {
// all the keyspaces are consistent.
return nil
}

// Unblock after the sleep or when the context has expired.
select {
case <-ctx.Done():
for _, ks := range keyspaces {
if ks != "" {
log.Infof("keyspace %v didn't become consistent", ks)
}
}
return ctx.Err()
case <-time.After(waitConsistentKeyspacesCheck):
}
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
}
119 changes: 100 additions & 19 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ func TestKeyspaceEventTypes(t *testing.T) {
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)

type testCase struct {
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectPrimaryNotServing bool
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectShouldBuffer bool
}

testCases := []testCase{
Expand Down Expand Up @@ -189,9 +189,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "two to four resharding in progress",
Expand Down Expand Up @@ -250,9 +250,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-80",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "unsharded primary not serving",
Expand All @@ -276,9 +276,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-",
expectResharding: false,
expectShouldBuffer: true,
},
{
name: "sharded primary not serving",
Expand Down Expand Up @@ -310,9 +310,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-80",
expectResharding: false,
expectShouldBuffer: true,
},
}

Expand All @@ -327,8 +327,89 @@ func TestKeyspaceEventTypes(t *testing.T) {
resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding)

_, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing)
_, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer)
})
}
}

// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios.
func TestWaitForConsistentKeyspaces(t *testing.T) {
testcases := []struct {
name string
ksMap map[string]*keyspaceState
ksList []string
errExpected string
}{
{
name: "Empty keyspace list",
ksList: nil,
ksMap: map[string]*keyspaceState{
"ks1": {},
},
errExpected: "",
},
{
name: "All keyspaces consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: true,
},
},
errExpected: "",
},
{
name: "One keyspace inconsistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: false,
},
},
errExpected: "context canceled",
},
{
name: "One deleted keyspace - consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
deleted: true,
},
},
errExpected: "",
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
// We create a cancelable context and immediately cancel it.
// We don't want the unit tests to wait, so we only test the first
// iteration of whether the keyspace event watcher returns
// that the keyspaces are consistent or not.
ctx, cancel := context.WithCancel(context.Background())
cancel()
kew := KeyspaceEventWatcher{
keyspaces: tt.ksMap,
mu: sync.Mutex{},
ts: &fakeTopoServer{},
}
err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList)
if tt.errExpected != "" {
require.ErrorContains(t, err, tt.errExpected)
} else {
require.NoError(t, err)
}

})
}
}
Expand Down
14 changes: 7 additions & 7 deletions go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package srvtopo

import (
"sync"

"context"
"sync"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
Expand All @@ -29,15 +28,16 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
// It also returns all the keyspaces that it found.
func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) {
var err error
if len(keyspaces) == 0 {
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand Down Expand Up @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
return nil, nil, errRecorder.Error()
}

return targets, nil
return targets, keyspaces, nil
}
Loading
Loading