diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index ddce3147b9..2fc4f6a01e 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -288,35 +288,40 @@ type secondReceiveBlockedDriverSub struct { } func (s *secondReceiveBlockedDriverSub) ReceiveBatch(ctx context.Context, _ int) ([]*driver.Message, error) { - s.receiveCounter.Add(1) - if s.receiveCounter.Load() > 1 { - // wait after 1st request for the context to finish before returning the batch result + // The first request will return a message right away. + // The second one will block ~forever. + if n := s.receiveCounter.Add(1); n > 1 { <-ctx.Done() } msg := &driver.Message{Body: []byte(fmt.Sprintf("message #%d", s.receiveCounter.Load()))} return []*driver.Message{msg}, nil } -func (*secondReceiveBlockedDriverSub) CanNack() bool { return false } -func (*secondReceiveBlockedDriverSub) IsRetryable(error) bool { return false } -func (*secondReceiveBlockedDriverSub) Close() error { return nil } +func (*secondReceiveBlockedDriverSub) CanNack() bool { return false } +func (*secondReceiveBlockedDriverSub) SendAcks(_ context.Context, _ []driver.AckID) error { return nil } +func (*secondReceiveBlockedDriverSub) IsRetryable(error) bool { return false } +func (*secondReceiveBlockedDriverSub) Close() error { return nil } +// TestIndependentBatchReturn verifies that when multiple batch requests are sent, +// as long as one of them succeeds it should not block Subscription.Receive. func TestIndependentBatchReturn(t *testing.T) { - // We want to test the scenario when multiple batch requests are sent, as long as one of them succeeds, it should - // not block the Subscription.Receive result s := NewSubscription( &secondReceiveBlockedDriverSub{}, - &batcher.Options{MaxBatchSize: 1, MaxHandlers: 2}, // force 2 batches, by allowing 2 handlers and 1 msg per batch + &batcher.Options{MaxBatchSize: 1, MaxHandlers: 2}, // force 2 batches by allowing 2 handlers and 1 msg per batch nil, ) - // set the false calculated subscription batch size to force 2 batches to be called - s.runningBatchSize = 2 ctx := context.Background() defer s.Shutdown(ctx) - _, err := s.Receive(ctx) + + // Set the batch size to force 2 batches to be called. + s.runningBatchSize = 2 + ctxTimeout, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + m, err := s.Receive(ctxTimeout) if err != nil { t.Fatal("Receive should not fail", err) return } + m.Ack() } func TestRetryTopic(t *testing.T) {