diff --git a/rpc.go b/rpc.go index 7eb7fca3..764c55d0 100644 --- a/rpc.go +++ b/rpc.go @@ -180,6 +180,10 @@ var ( // successfully. allOK is true if all calls completed successfully, // and false if any calls failed and the errors in the results need to // be checked. +// +// SendBatch will continue retrying each RPC in batch until it +// succeeds, fails with a non-retryable error, or the context is +// canceled. func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) ( res []hrpc.RPCResult, allOK bool) { if len(batch) == 0 { @@ -235,37 +239,79 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) ( return res, allOK } - rpcByClient, ok := c.findClients(ctx, batch, res) - if !ok { - return res, false - } - sendBatchSplitCount.Observe(float64(len(rpcByClient))) + // Send and wait for responses loop. This loop will partition the + // batch per-regionserver batches, send those batches to the + // region server and wait for results. Any RPCs that hit retryable + // errors will be made into a new batch and passed through this + // loop again. - // Send each group of RPCs to region client to be executed. - type clientAndRPCs struct { - client hrpc.RegionClient - rpcs []hrpc.Call - } - // keep track of the order requests are queued so that we can wait - // for their responses in the same order. - cAndRs := make([]clientAndRPCs, 0, len(rpcByClient)) - for client, rpcs := range rpcByClient { - client.QueueBatch(ctx, rpcs) - cAndRs = append(cAndRs, clientAndRPCs{client, rpcs}) - } + // unretryableErrorSeen set to true when any RPC in the batch hits + // an error that is not retryable. This is used to remember to + // return allOK=false even after we retry RPCs that hit retryable + // errors and those all succeed. + var unretryableErrorSeen bool + var retries []hrpc.Call + backoff := backoffStart + + for { + rpcByClient, ok := c.findClients(ctx, batch, res) + if !ok { + return res, false + } + sendBatchSplitCount.Observe(float64(len(rpcByClient))) - var fail bool - func() { // func used to scope the span - ctx, sp := observability.StartSpan(ctx, "waitForResult") - defer sp.End() - for _, cAndR := range cAndRs { - ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes) - if !ok { - fail = true + // Send each group of RPCs to region client to be executed. + type clientAndRPCs struct { + client hrpc.RegionClient + rpcs []hrpc.Call + } + // keep track of the order requests are queued so that we can wait + // for their responses in the same order. + cAndRs := make([]clientAndRPCs, 0, len(rpcByClient)) + for client, rpcs := range rpcByClient { + client.QueueBatch(ctx, rpcs) + cAndRs = append(cAndRs, clientAndRPCs{client, rpcs}) + } + + // batch wil be used to hold any RPCs that need to be retried + batch = batch[:0] + var needBackoff bool + + func() { // func used to scope the span + ctx, sp := observability.StartSpan(ctx, "waitForResult") + defer sp.End() + for _, cAndR := range cAndRs { + shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion( + ctx, cAndR.client, cAndR.rpcs, res, rpcToRes) + if !ok { + allOK = false + retries = append(retries, shouldRetry...) + needBackoff = needBackoff || shouldBackoff + unretryableErrorSeen = unretryableErrorSeen || unretryableError + } } + }() + + // Exit retry loop if no RPCs are retryable because they all + // succeeded or hit unretryable errors, or the context is done + if len(retries) == 0 || ctx.Err() != nil { + break } - }() - allOK = !fail + if needBackoff { + sp.AddEvent("retrySleep") + var err error + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + break + } + } else { + sp.AddEvent("retry") + } + // Set state for next loop iteration + batch = retries + retries = retries[:0] + allOK = !unretryableErrorSeen + } return res, allOK } @@ -296,11 +342,19 @@ func (c *client) findClients(ctx context.Context, batch []hrpc.Call, res []hrpc. return rpcByClient, ok } +// waitForCompletion waits for the completion of all rpcs, updating +// the appropriate index in results with the help of rpcToRes. If all +// rpcs succeed then ok will return true, otherwise ok will return +// false, retryables will contain RPCs that can be retried, and +// shouldBackoff will be true if any RPCs need a backoff before +// retrying. func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient, - rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool { + rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) ( + retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) { - ok := true + ok = true canceledIndex := len(rpcs) + loop: for i, rpc := range rpcs { select { @@ -309,7 +363,17 @@ loop: if res.Error != nil { c.handleResultError(res.Error, rpc.Region(), rc) ok = false + switch res.Error.(type) { + case region.RetryableError: + shouldBackoff = true + retryables = append(retryables, rpc) + case region.ServerError, region.NotServingRegionError: + retryables = append(retryables, rpc) + default: + unretryableError = true + } } + case <-ctx.Done(): canceledIndex = i ok = false @@ -333,7 +397,7 @@ loop: } } - return ok + return retryables, shouldBackoff, unretryableError, ok } func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) { @@ -668,7 +732,16 @@ func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error { } } +// establishRegionOverride can be set by tests to override the +// behavior of establishRegion +var establishRegionOverride func(reg hrpc.RegionInfo, addr string) + func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { + if establishRegionOverride != nil { + establishRegionOverride(reg, addr) + return + } + var backoff time.Duration var err error for { @@ -803,7 +876,15 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { } } +// sleepAndIncreaseBackoffOverride can be set by tests to override the +// behavior of sleepAndIncreaseBackoff +var sleepAndIncreaseBackoffOverride func( + ctx context.Context, backoff time.Duration) (time.Duration, error) + func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) { + if sleepAndIncreaseBackoffOverride != nil { + return sleepAndIncreaseBackoffOverride(ctx, backoff) + } if backoff == 0 { return backoffStart, nil } diff --git a/rpc_test.go b/rpc_test.go index 1d9c0375..82c1c20c 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -1172,6 +1172,23 @@ func TestFindClients(t *testing.T) { } func TestSendBatchWaitForCompletion(t *testing.T) { + sleepCh := make(chan struct{}) + sleepAndIncreaseBackoffOverride = func(ctx context.Context, backoff time.Duration) ( + time.Duration, error) { + sleepCh <- struct{}{} + return backoff, nil + } + estRegCh := make(chan hrpc.RegionInfo) + establishRegionOverride = func(reg hrpc.RegionInfo, addr string) { + estRegCh <- reg + } + defer func() { + close(sleepCh) // panic any unexpected calls to sleepAndIncreaseBackoff + sleepAndIncreaseBackoffOverride = nil + close(estRegCh) // panic any unexpected calls to establishRegion + establishRegionOverride = nil + }() + c := newMockClient(nil) // pretend regionserver:0 has meta table rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0")) @@ -1519,4 +1536,191 @@ func TestSendBatchWaitForCompletion(t *testing.T) { } } }) + + t.Run("retryable error some", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ) + go func() { + result, ok = c.SendBatch(context.Background(), batch) + close(done) + }() + + for i := 0; i < 9; i++ { + // Error some responses + if i%2 == 0 { + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}} + continue + } + // Using an Int32 as a result. A real result would be a + // MutateResponse, but any proto.Message works for the test. + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + + <-sleepCh // Expect one call to sleepAndIncreaseBackoff + + // We should see retries on retryable errors. So now send the + // correct response. + for i := 0; i < 9; i++ { + if i%2 != 0 { + continue + } + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + + <-done + if !ok { + t.Errorf("unexpected !ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + + for i, r := range result { + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + + }) + + t.Run("retryable error and non-retryable errors", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ) + go func() { + result, ok = c.SendBatch(context.Background(), batch) + close(done) + }() + + for i := 0; i < 9; i++ { + if i%4 == 0 { + // Non-retryable error for some + batch[i].ResultChan() <- hrpc.RPCResult{Error: errors.New("error")} + continue + } + // Retryable error for some + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}} + } + + <-sleepCh // Expect one call to sleepAndIncreaseBackoff + + // We should see retries on retryable errors. So now send the + // correct response. + for i := 0; i < 9; i++ { + if i%4 == 0 { + continue + } + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + <-done + if ok { + t.Errorf("expected !ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + for i, r := range result { + if i%4 == 0 { + if r.Error == nil || r.Error.Error() != "error" { + t.Errorf("expected error but got: %v", r.Error) + } + if r.Msg != nil { + t.Errorf("unexpected Msg: %v", r.Msg) + } + continue + } + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + }) + + t.Run("not serving region error", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ) + go func() { + result, ok = c.SendBatch(context.Background(), batch) + close(done) + }() + + for i := 0; i < 9; i++ { + if i%4 == 0 { + // NSRE for some + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.NotServingRegionError{}} + continue + } + // success for some + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + + for i := 0; i < 9; i++ { + if i%4 != 0 { + continue + } + // For each failed NSRE we should expect an establishRegion call + reg := <-estRegCh + // reestablish the region: + reg.MarkAvailable() + } + + // We should see retries on the RPCs hitting NSREs. So now + // send the correct response. + for i := 0; i < 9; i++ { + if i%4 != 0 { + continue + } + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + <-done + if !ok { + t.Errorf("unexpected !ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + + for i, r := range result { + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + }) }