From 88ced6e3eead0e8f2946fc901f4f3b8920334857 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Sat, 25 Nov 2023 01:07:44 +0100 Subject: [PATCH] Experiment work stealing pools --- .../src/main/java/reactor/netty/Metrics.java | 4 + .../http/client/Http2AllocationStrategy.java | 49 +++++++ .../http/client/Http2ConnectionProvider.java | 53 +++++++- .../client/Http2ConnectionProviderMeters.java | 20 +++ .../reactor/netty/http/client/Http2Pool.java | 123 +++++++++++++++--- ...Http2ConnectionProviderMeterRegistrar.java | 28 +++- .../netty/http/client/HttpClientTest.java | 39 ++++++ 7 files changed, 289 insertions(+), 27 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java index 1b13934c65..1b35994db1 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java +++ b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java @@ -204,6 +204,10 @@ public class Metrics { */ public static final String PENDING_STREAMS = ".pending.streams"; + /** + * The number of HTTP/2 stream acquisitions steal count. + */ + public static final String STEAL_STREAMS = ".steal.streams"; // ByteBufAllocator Metrics /** diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java index c6d3ba29d7..434ab2e9fd 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java @@ -66,6 +66,19 @@ public interface Builder { * @return {@code this} */ Builder minConnections(int minConnections); + + /** + * Enables or disables work stealing mode for managing HTTP2 Connection Pools. + *

+ * By default, a single Connection Pool is used by multiple Netty event loop threads. + * When work stealing is enabled, each Netty event loop will maintain its own + * HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available + * pools using a work stealing strategy. This approach maximizes throughput and + * resource utilization in a multithreaded environment. + * + * @return {@code this} + */ + Builder enableWorkStealing(); } /** @@ -77,6 +90,18 @@ public static Http2AllocationStrategy.Builder builder() { return new Http2AllocationStrategy.Build(); } + /** + * Creates a builder for {@link Http2AllocationStrategy} and initialize it + * with an existing strategy. This method can be used to create a mutated version + * of an existing strategy. + * + * @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2 + * allocation strategy. + */ + public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) { + return new Http2AllocationStrategy.Build(existing); + } + @Override public Http2AllocationStrategy copy() { return new Http2AllocationStrategy(this); @@ -141,9 +166,14 @@ public void returnPermits(int returned) { } } + public boolean enableWorkStealing() { + return enableWorkStealing; + } + final long maxConcurrentStreams; final int maxConnections; final int minConnections; + final boolean enableWorkStealing; volatile int permits; static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits"); @@ -152,6 +182,7 @@ public void returnPermits(int returned) { this.maxConcurrentStreams = build.maxConcurrentStreams; this.maxConnections = build.maxConnections; this.minConnections = build.minConnections; + this.enableWorkStealing = build.enableWorkStealing; PERMITS.lazySet(this, this.maxConnections); } @@ -159,6 +190,7 @@ public void returnPermits(int returned) { this.maxConcurrentStreams = copy.maxConcurrentStreams; this.maxConnections = copy.maxConnections; this.minConnections = copy.minConnections; + this.enableWorkStealing = copy.enableWorkStealing; PERMITS.lazySet(this, this.maxConnections); } @@ -170,6 +202,17 @@ static final class Build implements Builder { long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; int maxConnections = DEFAULT_MAX_CONNECTIONS; int minConnections = DEFAULT_MIN_CONNECTIONS; + boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing"); + + Build() { + } + + Build(Http2AllocationStrategy existing) { + this.maxConcurrentStreams = existing.maxConcurrentStreams; + this.minConnections = existing.minConnections; + this.maxConnections = existing.maxConnections; + this.enableWorkStealing = existing.enableWorkStealing; + } @Override public Http2AllocationStrategy build() { @@ -206,5 +249,11 @@ public Builder minConnections(int minConnections) { this.minConnections = minConnections; return this; } + + @Override + public Builder enableWorkStealing() { + this.enableWorkStealing = true; + return this; + } } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index d3b12578de..17a0760fc6 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -39,6 +39,7 @@ import reactor.netty.ConnectionObserver; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; +import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.PooledConnectionProvider; import reactor.netty.transport.TransportConfig; @@ -50,13 +51,19 @@ import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; import reactor.util.context.Context; +import reactor.util.function.Tuples; import java.io.IOException; import java.net.SocketAddress; import java.time.Duration; +import java.util.Iterator; +import java.util.List; import java.util.Queue; +import java.util.concurrent.Executor; import java.util.function.BiPredicate; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static reactor.netty.ReactorNetty.format; import static reactor.netty.ReactorNetty.getChannelContext; @@ -536,12 +543,46 @@ static final class PooledConnectionAllocator { this.config = (HttpClientConfig) config; this.remoteAddress = remoteAddress; this.resolver = resolver; - this.pool = id == null ? - poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) : - poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - new MicrometerPoolMetricsRecorder(id, name, remoteAddress), - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); + + Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ? + (Http2AllocationStrategy) poolFactory.allocationStrategy() : null; + + if (http2Strategy == null || !http2Strategy.enableWorkStealing) { + this.pool = id == null ? + poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) : + poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + new MicrometerPoolMetricsRecorder(id, name, remoteAddress), + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); + } + else { + // Create one connection allocator (it will be shared by all Http2Pool instances) + Publisher connPublisher = connectChannel(); + + List execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false) + .limit(http2Strategy.maxConnections) + .collect(Collectors.toList()); + Iterator execsIter = execs.iterator(); + + MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress); + this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), + http2Strategy.minConnections, http2Strategy.maxConnections, (minConn, maxConn) -> { + Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy) + .minConnections(minConn) + .maxConnections(maxConn) + .build(); + + InstrumentedPool pool = + id == null ? + poolFactory.newPool(connPublisher, null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) : + poolFactory.newPool(connPublisher, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + micrometerRecorder, + poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)); + + return Tuples.of(pool, execsIter.next()); + }); + } } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java index 72e3bd986c..67a991346b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java @@ -67,6 +67,26 @@ public Meter.Type getType() { } }, + /** + * The number of HTTP/2 stream acquisition steal count. + */ + STEAL_STREAMS { + @Override + public String getName() { + return "reactor.netty.connection.provider.steal.streams"; + } + + @Override + public KeyName[] getKeyNames() { + return Http2ConnectionProviderMetersTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.COUNTER; + } + }, + /** * The number of the idle connections in the connection pool. */ diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 0e30f2dc31..937c08132c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -44,6 +45,7 @@ import reactor.netty.Connection; import reactor.netty.FutureMono; import reactor.netty.NettyPipeline; +import reactor.netty.internal.shaded.reactor.pool.AllocationStrategy; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -156,6 +158,8 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. final Long maxConcurrentStreams; final int minConnections; final PoolConfig poolConfig; + final AllocationStrategy allocationStrategy; + final boolean workStealingEnabled; long lastInteractionTimestamp; @@ -165,11 +169,17 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); - this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ? + Http2AllocationStrategy http2Strategy = allocationStrategy instanceof Http2AllocationStrategy ? (Http2AllocationStrategy) allocationStrategy : null; + + this.maxConcurrentStreams = http2Strategy != null ? ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1; this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; + this.workStealingEnabled = http2Strategy != null && http2Strategy.enableWorkStealing; + + // Perform allocations using either the allocationStrategy parameter, or default to the poolConfig allocation strategy. + this.allocationStrategy = allocationStrategy == null ? poolConfig.allocationStrategy() : new DelegatingAllocationStrategy(allocationStrategy); recordInteractionTimestamp(); scheduleEviction(); @@ -185,6 +195,24 @@ public Mono> acquire(Duration timeout) { return new BorrowerMono(this, timeout); } + @Override + public boolean steal(InstrumentedPool pool) { + Http2Pool other = (Http2Pool) pool; + + if (!other.isDisposed()) { + ConcurrentLinkedDeque q = other.pending; + Borrower b = other.pollPending(q, false); + if (b != null && !b.get()) { + // TODO check race conditions when timer expires or subscription is cancelled concurrently + b.setPool(this); + doAcquire(b); + return true; + } + } + + return false; + } + @Override public int acquiredSize() { return allocatedSize() - idleSize(); @@ -192,7 +220,7 @@ public int acquiredSize() { @Override public int allocatedSize() { - return poolConfig.allocationStrategy().permitGranted(); + return allocationStrategy.permitGranted(); } @Override @@ -366,7 +394,7 @@ void drainLoop() { // find a connection that can be used for opening a new stream // when cached connections are below minimum connections, then allocate a new connection boolean belowMinConnections = minConnections > 0 && - poolConfig.allocationStrategy().permitGranted() < minConnections; + allocationStrategy.permitGranted() < minConnections; Slot slot = belowMinConnections ? null : findConnection(resources); if (slot != null) { Borrower borrower = pollPending(borrowers, true); @@ -383,10 +411,19 @@ void drainLoop() { log.debug(format(slot.connection.channel(), "Channel activated")); } ACQUIRED.incrementAndGet(this); - slot.connection.channel().eventLoop().execute(() -> { + if (!workStealingEnabled) { + slot.connection.channel().eventLoop().execute(() -> { + borrower.deliver(new Http2PooledRef(slot)); // will insert the connection slot into CONNECTIONS + drain(); + }); + } + else { + // WHen using the reactor work-stealing pool, we are already executing from one of the pools' executor, + // so, we can safely deliver the borrower concurrently, all the borrowers are distributed across + // all sub pools, so we won't be in a situation where the current thread will run the drainloop + // for ever under heavy requests load, so no need to reschedule. borrower.deliver(new Http2PooledRef(slot)); - drain(); - }); + } } else { int resourcesCount = idleSize; @@ -396,7 +433,7 @@ void drainLoop() { // connections allocations were triggered } else { - int permits = poolConfig.allocationStrategy().getPermits(1); + int permits = allocationStrategy.getPermits(1); if (permits <= 0) { if (maxPending >= 0) { borrowersCount = pendingSize; @@ -412,7 +449,7 @@ void drainLoop() { else { if (permits > 1) { // warmup is not supported - poolConfig.allocationStrategy().returnPermits(permits - 1); + allocationStrategy.returnPermits(permits - 1); } Borrower borrower = pollPending(borrowers, true); if (borrower == null) { @@ -439,7 +476,7 @@ void drainLoop() { else if (sig.isOnError()) { Throwable error = sig.getThrowable(); assert error != null; - poolConfig.allocationStrategy().returnPermits(1); + allocationStrategy.returnPermits(1); borrower.fail(error); } }) @@ -633,7 +670,7 @@ void pendingOffer(Borrower borrower) { if (WIP.getAndIncrement(this) == 0) { ConcurrentLinkedQueue ir = connections; - if (maxPending >= 0 && postOffer > maxPending && ir.isEmpty() && poolConfig.allocationStrategy().estimatePermitCount() == 0) { + if (maxPending >= 0 && postOffer > maxPending && ir.isEmpty() && allocationStrategy.estimatePermitCount() == 0) { Borrower toCull = pollPending(pendingQueue, false); if (toCull != null) { pendingAcquireLimitReached(toCull, maxPending); @@ -729,7 +766,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip final Duration acquireTimeout; final CoreSubscriber actual; - final Http2Pool pool; + final AtomicReference pool; long pendingAcquireStart; @@ -738,7 +775,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip Borrower(CoreSubscriber actual, Http2Pool pool, Duration acquireTimeout) { this.acquireTimeout = acquireTimeout; this.actual = actual; - this.pool = pool; + this.pool = new AtomicReference<>(pool); this.timeoutTask = TIMEOUT_DISPOSED; } @@ -746,7 +783,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip public void cancel() { stopPendingCountdown(true); // this is not failure, the subscription was canceled if (compareAndSet(false, true)) { - pool.cancelAcquire(this); + pool().cancelAcquire(this); } } @@ -757,8 +794,9 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { + Http2Pool pool = pool(); long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired; - int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); + int permits = pool.allocationStrategy.estimatePermitCount(); int pending = pool.pendingSize; if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { pendingAcquireStart = pool.clock.millis(); @@ -773,7 +811,7 @@ public void run() { if (compareAndSet(false, true)) { // this is failure, a timeout was observed stopPendingCountdown(false); - pool.cancelAcquire(Http2Pool.Borrower.this); + pool().cancelAcquire(Http2Pool.Borrower.this); actual.onError(new PoolAcquireTimeoutException(acquireTimeout)); } } @@ -801,7 +839,10 @@ public String toString() { } void deliver(Http2PooledRef poolSlot) { - assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + if (!pool().workStealingEnabled) { + // TODO can we do this check even when workstealing is enabled ? + assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + } poolSlot.slot.incrementConcurrencyAndGet(); poolSlot.slot.deactivate(); if (get()) { @@ -823,6 +864,7 @@ void fail(Throwable error) { void stopPendingCountdown(boolean success) { if (!timeoutTask.isDisposed()) { + Http2Pool pool = pool(); if (success) { pool.poolConfig.metricsRecorder().recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart); } @@ -832,6 +874,14 @@ void stopPendingCountdown(boolean success) { } timeoutTask.dispose(); } + + Http2Pool pool() { + return pool.get(); + } + + public void setPool(Http2Pool replace) { + pool.set(replace); + } } static final class BorrowerMono extends Mono> { @@ -1043,7 +1093,7 @@ void invalidate() { if (log.isDebugEnabled()) { log.debug(format(connection.channel(), "Channel removed from pool")); } - pool.poolConfig.allocationStrategy().returnPermits(1); + pool.allocationStrategy.returnPermits(1); TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams); } } @@ -1078,4 +1128,43 @@ public long allocationTimestamp() { } } + + static final class DelegatingAllocationStrategy implements AllocationStrategy { + + final ConnectionProvider.AllocationStrategy delegate; + + DelegatingAllocationStrategy(ConnectionProvider.AllocationStrategy delegate) { + this.delegate = delegate; + } + + @Override + public int estimatePermitCount() { + return delegate.estimatePermitCount(); + } + + @Override + public int getPermits(int desired) { + return delegate.getPermits(desired); + } + + @Override + public int permitGranted() { + return delegate.permitGranted(); + } + + @Override + public int permitMinimum() { + return delegate.permitMinimum(); + } + + @Override + public int permitMaximum() { + return delegate.permitMaximum(); + } + + @Override + public void returnPermits(int returned) { + delegate.returnPermits(returned); + } + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index 75bf6cd939..3b7fd4528c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -19,8 +19,10 @@ import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Tags; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; +import reactor.netty.internal.shaded.reactor.pool.decorators.WorkStealingPool; import java.net.SocketAddress; +import java.util.List; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.http.client.Http2ConnectionProviderMeters.ACTIVE_CONNECTIONS; @@ -31,6 +33,7 @@ import static reactor.netty.http.client.Http2ConnectionProviderMeters.IDLE_CONNECTIONS; import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS; import static reactor.netty.Metrics.formatSocketAddress; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.STEAL_STREAMS; final class MicrometerHttp2ConnectionProviderMeterRegistrar { @@ -48,9 +51,20 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In .tags(tags) .register(REGISTRY); - Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) - .tags(tags) - .register(REGISTRY); + if (metrics instanceof Http2Pool) { + Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) + .tags(tags) + .register(REGISTRY); + } + else if (metrics instanceof WorkStealingPool) { + Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> getActiveStreams(((WorkStealingPool) metrics).getPools())) + .tags(tags) + .register(REGISTRY); + + Gauge.builder(STEAL_STREAMS.getName(), metrics, InstrumentedPool.PoolMetrics::stealCount) + .tags(tags) + .register(REGISTRY); + } Gauge.builder(IDLE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::idleSize) .tags(tags) @@ -70,4 +84,10 @@ void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); REGISTRY.remove(new Meter.Id(PENDING_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE)); } -} \ No newline at end of file + + int getActiveStreams(List> pools) { + return pools.stream() + .mapToInt(pool -> ((Http2Pool) pool).activeStreams()) + .sum(); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index ac6a946cdf..eb5bed5cb9 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -3333,4 +3333,43 @@ private void doTestIssue1943(HttpProtocol protocol) { .block(Duration.ofSeconds(5)); } } + + @Test + void testHttp2ClientWithWorkStealing() { + disposableServer = + HttpServer.create() + .protocol(HttpProtocol.H2C) + .port(0) + .handle((req, res) -> + res.sendString(Mono.just("Welcome"))) + .bindNow(); + + ConnectionProvider provider = ConnectionProvider + .builder("http") + .allocationStrategy(Http2AllocationStrategy.builder() + .minConnections(1) + .maxConnections(10) + .enableWorkStealing() + .build()) + .build(); + + try { + HttpClient client = HttpClient.create(provider) + .protocol(HttpProtocol.H2C) + .port(disposableServer.port()) + .wiretap(true); + + StepVerifier.create(client + .headers(hdr -> hdr.set("Content-Type", "text/plain")) + .get() + .uri("/payload-size") + .response((r, buf) -> buf.aggregate().asString().zipWith(Mono.just(r)))) + .expectNextMatches(tuple -> "Welcome".equals(tuple.getT1()) && tuple.getT2().status().equals(HttpResponseStatus.OK)) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + finally { + provider.disposeLater().block(); + } + } }