Skip to content

Commit

Permalink
Fix TestGatewayBufferingWhileReparenting flakiness (vitessio#12817)
Browse files Browse the repository at this point in the history
* Fix TestGatewayBufferingWhileReparenting flakiness

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* apply review suggestions

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

---------

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui authored and timvaillancourt committed Mar 18, 2024
1 parent 66147a9 commit 285b44a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
11 changes: 6 additions & 5 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,19 +446,20 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bo
// The shard state keeps track of the current primary and the last externally reparented time, which we can use
// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary
// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) bool {
// We return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) (*topodatapb.TabletAlias, bool) {
if target.TabletType != topodatapb.TabletType_PRIMARY {
return false
return nil, false
}
ks := kew.getKeyspaceStatus(target.Keyspace)
if ks == nil {
return false
return nil, false
}
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
// If the primary tablet was present then externallyReparented will be non-zero and currentPrimary will be not nil
return !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
}
return false
return nil, false
}
8 changes: 7 additions & 1 deletion go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,16 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "current keyspace is being resharded")
continue
}
if kev.PrimaryIsNotServing(target) {
primary, serving := kev.PrimaryIsNotServing(target)
if serving {
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "primary is not serving, there is a reparent operation in progress")
continue
}
// if primary is serving, but we initially found no tablet, we're in an inconsistent state
// we then retry the entire loop
if primary != nil {
continue
}
}

// fail fast if there is no tablet
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) {
queryChan <- struct{}{}
}()

require.Len(t, tg.hc.GetHealthyTabletStats(target), 0, "GetHealthyTabletStats has tablets even though it shouldn't")

// set the serving type for the new primary tablet true and broadcast it so that the buffering code registers this change
// this should stop the buffering and the query executed in the go routine should work. This should be done with some delay so
// that we know that the query was buffered
Expand All @@ -215,11 +217,15 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) {
hc.SetServing(replicaTablet, true)
hc.Broadcast(replicaTablet)

newPrimary, ok := tg.kev.PrimaryIsNotServing(target)
require.EqualValues(t, 1, newPrimary.Uid)
require.False(t, ok, "PrimaryIsNotServing is not returning true")

// wait for the query to execute before checking for results
select {
case <-queryChan:
require.NoError(t, err)
require.Equal(t, res, sqlResult1)
require.Equal(t, sqlResult1, res)
case <-time.After(15 * time.Second):
t.Fatalf("timed out waiting for query to execute")
}
Expand Down

0 comments on commit 285b44a

Please sign in to comment.