Skip to content

Commit

Permalink
SendBatch: Retry retryable errors
Browse files Browse the repository at this point in the history
To match the behavior of SendRPC, SendBatch should retry RPCs that hit
retryable errors: region.RetryableError, region.ServerError, and
region.NotServingRegionError.

SendBatch will now retry each RPC that hits a retryable error. What
used to be a single step through of assigning regions to RPCs,
grouping them by region server and then dispatching the RPCs to their
respective servers, is now done in a loop. The first iteration of
the loop operates on the entire batch. Later iterations operate on the
set of RPCs that failed with retryable errors in the previous batch.
  • Loading branch information
aaronbee committed Oct 11, 2023
1 parent 9f8195a commit e6f33a5
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 30 deletions.
141 changes: 111 additions & 30 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ var (
// successfully. allOK is true if all calls completed successfully,
// and false if any calls failed and the errors in the results need to
// be checked.
//
// SendBatch will continue retrying each RPC in batch until it
// succeeds, fails with a non-retryable error, or the context is
// canceled.
func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
res []hrpc.RPCResult, allOK bool) {
if len(batch) == 0 {
Expand Down Expand Up @@ -235,37 +239,79 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
return res, allOK
}

rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}
sendBatchSplitCount.Observe(float64(len(rpcByClient)))
// Send and wait for responses loop. This loop will partition the
// batch per-regionserver batches, send those batches to the
// region server and wait for results. Any RPCs that hit retryable
// errors will be made into a new batch and passed through this
// loop again.

// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}
// unretryableErrorSeen set to true when any RPC in the batch hits
// an error that is not retryable. This is used to remember to
// return allOK=false even after we retry RPCs that hit retryable
// errors and those all succeed.
var unretryableErrorSeen bool
var retries []hrpc.Call
backoff := backoffStart

for {
rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}

Check warning on line 260 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L259-L260

Added lines #L259 - L260 were not covered by tests
sendBatchSplitCount.Observe(float64(len(rpcByClient)))

var fail bool
func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
fail = true
// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}

// batch wil be used to hold any RPCs that need to be retried
batch = batch[:0]
var needBackoff bool

func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion(
ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
allOK = false
retries = append(retries, shouldRetry...)
needBackoff = needBackoff || shouldBackoff
unretryableErrorSeen = unretryableErrorSeen || unretryableError
}
}
}()

// Exit retry loop if no RPCs are retryable because they all
// succeeded or hit unretryable errors, or the context is done
if len(retries) == 0 || ctx.Err() != nil {
break
}
}()
allOK = !fail
if needBackoff {
sp.AddEvent("retrySleep")
var err error
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
break

Check warning on line 305 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L305

Added line #L305 was not covered by tests
}
} else {
sp.AddEvent("retry")
}
// Set state for next loop iteration
batch = retries
retries = retries[:0]
allOK = !unretryableErrorSeen
}

return res, allOK
}
Expand Down Expand Up @@ -296,11 +342,19 @@ func (c *client) findClients(ctx context.Context, batch []hrpc.Call, res []hrpc.
return rpcByClient, ok
}

// waitForCompletion waits for the completion of all rpcs, updating
// the appropriate index in results with the help of rpcToRes. If all
// rpcs succeed then ok will return true, otherwise ok will return
// false, retryables will contain RPCs that can be retried, and
// shouldBackoff will be true if any RPCs need a backoff before
// retrying.
func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient,
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool {
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) (
retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) {

ok := true
ok = true
canceledIndex := len(rpcs)

loop:
for i, rpc := range rpcs {
select {
Expand All @@ -309,7 +363,17 @@ loop:
if res.Error != nil {
c.handleResultError(res.Error, rpc.Region(), rc)
ok = false
switch res.Error.(type) {
case region.RetryableError:
shouldBackoff = true
retryables = append(retryables, rpc)
case region.ServerError, region.NotServingRegionError:
retryables = append(retryables, rpc)
default:
unretryableError = true
}
}

case <-ctx.Done():
canceledIndex = i
ok = false
Expand All @@ -333,7 +397,7 @@ loop:
}
}

return ok
return retryables, shouldBackoff, unretryableError, ok
}

func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) {
Expand Down Expand Up @@ -668,7 +732,16 @@ func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
}
}

// establishRegionOverride can be set by tests to override the
// behavior of establishRegion
var establishRegionOverride func(reg hrpc.RegionInfo, addr string)

func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
if establishRegionOverride != nil {
establishRegionOverride(reg, addr)
return
}

var backoff time.Duration
var err error
for {
Expand Down Expand Up @@ -803,7 +876,15 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
}
}

// sleepAndIncreaseBackoffOverride can be set by tests to override the
// behavior of sleepAndIncreaseBackoff
var sleepAndIncreaseBackoffOverride func(
ctx context.Context, backoff time.Duration) (time.Duration, error)

func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
if sleepAndIncreaseBackoffOverride != nil {
return sleepAndIncreaseBackoffOverride(ctx, backoff)
}
if backoff == 0 {
return backoffStart, nil
}
Expand Down
Loading

0 comments on commit e6f33a5

Please sign in to comment.