From ea5a5997908956bbb65bc5d125864ad3d940336a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 22 Nov 2023 20:27:36 +0200 Subject: [PATCH] Expose metrics for pending acquire operation latency (#2980) Fixes #2946 --- .../resources/ConnectionProviderMeters.java | 65 +++++++++- .../DefaultPooledConnectionProvider.java | 27 +++- .../MicrometerPoolMetricsRecorder.java | 113 +++++++++++++++++ .../resources/PooledConnectionProvider.java | 73 ++++++++++- .../DefaultPooledConnectionProviderTest.java | 65 +++++++++- .../http/client/Http2ConnectionProvider.java | 40 ++++-- .../client/Http2ConnectionProviderMeters.java | 65 +++++++++- .../reactor/netty/http/client/Http2Pool.java | 23 +++- .../client/MicrometerPoolMetricsRecorder.java | 113 +++++++++++++++++ .../netty/http/client/Http2PoolTest.java | 117 +++++++++++++++++- .../DefaultPooledConnectionProviderTest.java | 34 ++++- 11 files changed, 709 insertions(+), 26 deletions(-) create mode 100644 reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPoolMetricsRecorder.java create mode 100644 reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerPoolMetricsRecorder.java diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProviderMeters.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProviderMeters.java index 4398ad5053..52014dc53c 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProviderMeters.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProviderMeters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,6 +127,26 @@ public Meter.Type getType() { } }, + /** + * Time spent in pending acquire a connection from the connection pool. + */ + PENDING_CONNECTIONS_TIME { + @Override + public String getName() { + return "reactor.netty.connection.provider.pending.connections.time"; + } + + @Override + public KeyName[] getKeyNames() { + return PendingConnectionsTimeTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.TIMER; + } + }, + /** * The number of all connections in the connection pool, active or idle. */ @@ -179,4 +199,47 @@ public String asString() { } } } + + enum PendingConnectionsTimeTags implements KeyName { + + /** + * ID. + */ + ID { + @Override + public String asString() { + return "id"; + } + }, + + /** + * NAME. + */ + NAME { + @Override + public String asString() { + return "name"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * STATUS. + */ + STATUS { + @Override + public String asString() { + return "status"; + } + } + } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java index 2ae8bae202..146b05eda1 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +96,16 @@ protected InstrumentedPool createPool( return new PooledConnectionAllocator(config, poolFactory, remoteAddress, resolverGroup).pool; } + @Override + protected InstrumentedPool createPool( + String id, + TransportConfig config, + PoolFactory poolFactory, + SocketAddress remoteAddress, + AddressResolverGroup resolverGroup) { + return new PooledConnectionAllocator(id, name, config, poolFactory, remoteAddress, resolverGroup).pool; + } + static final Logger log = Loggers.getLogger(DefaultPooledConnectionProvider.class); static final AttributeKey OWNER = AttributeKey.valueOf("connectionOwner"); @@ -502,10 +512,23 @@ static final class PooledConnectionAllocator { PoolFactory provider, SocketAddress remoteAddress, AddressResolverGroup resolver) { + this(null, null, config, provider, remoteAddress, resolver); + } + + PooledConnectionAllocator( + @Nullable String id, + @Nullable String name, + TransportConfig config, + PoolFactory provider, + SocketAddress remoteAddress, + AddressResolverGroup resolver) { this.config = config; this.remoteAddress = remoteAddress; this.resolver = resolver; - this.pool = provider.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE); + this.pool = id == null ? + provider.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE) : + provider.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + new MicrometerPoolMetricsRecorder(id, name, remoteAddress)); } Publisher connectChannel() { diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPoolMetricsRecorder.java b/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPoolMetricsRecorder.java new file mode 100644 index 0000000000..9e2ce21f2a --- /dev/null +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPoolMetricsRecorder.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.resources; + +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import reactor.core.Disposable; +import reactor.pool.PoolMetricsRecorder; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +import static reactor.netty.Metrics.ERROR; +import static reactor.netty.Metrics.REGISTRY; +import static reactor.netty.Metrics.SUCCESS; +import static reactor.netty.Metrics.formatSocketAddress; +import static reactor.netty.resources.ConnectionProviderMeters.PENDING_CONNECTIONS_TIME; +import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.ID; +import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.NAME; +import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.REMOTE_ADDRESS; +import static reactor.netty.resources.ConnectionProviderMeters.PendingConnectionsTimeTags.STATUS; + +final class MicrometerPoolMetricsRecorder implements Disposable, PoolMetricsRecorder { + + final Timer pendingSuccessTimer; + final Timer pendingErrorTimer; + + MicrometerPoolMetricsRecorder(String id, String poolName, SocketAddress remoteAddress) { + pendingSuccessTimer = buildTimer(id, poolName, remoteAddress, SUCCESS); + pendingErrorTimer = buildTimer(id, poolName, remoteAddress, ERROR); + } + + @Override + public void recordAllocationSuccessAndLatency(long latencyMs) { + //noop + } + + @Override + public void recordAllocationFailureAndLatency(long latencyMs) { + //noop + } + + @Override + public void recordResetLatency(long latencyMs) { + //noop + } + + @Override + public void recordDestroyLatency(long latencyMs) { + //noop + } + + @Override + public void recordRecycled() { + //noop + } + + @Override + public void recordLifetimeDuration(long millisecondsSinceAllocation) { + //noop + } + + @Override + public void recordIdleTime(long millisecondsIdle) { + //noop + } + + @Override + public void recordSlowPath() { + //noop + } + + @Override + public void recordFastPath() { + //noop + } + + @Override + public void recordPendingSuccessAndLatency(long latencyMs) { + pendingSuccessTimer.record(latencyMs, TimeUnit.MILLISECONDS); + } + + @Override + public void recordPendingFailureAndLatency(long latencyMs) { + pendingErrorTimer.record(latencyMs, TimeUnit.MILLISECONDS); + } + + @Override + public void dispose() { + REGISTRY.remove(pendingSuccessTimer); + REGISTRY.remove(pendingErrorTimer); + } + + static Timer buildTimer(String id, String poolName, SocketAddress remoteAddress, String status) { + return Timer.builder(PENDING_CONNECTIONS_TIME.getName()) + .tags(Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), formatSocketAddress(remoteAddress), + NAME.asString(), poolName, STATUS.asString(), status)) + .register(REGISTRY); + } +} diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 9d65679890..82526bf3af 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -35,6 +35,7 @@ import reactor.pool.Pool; import reactor.pool.PoolBuilder; import reactor.pool.PoolConfig; +import reactor.pool.PoolMetricsRecorder; import reactor.pool.PooledRef; import reactor.pool.PooledRefMetadata; import reactor.pool.decorators.GracefulShutdownInstrumentedPool; @@ -131,12 +132,16 @@ public final Mono acquire( log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress); } - InstrumentedPool newPool = createPool(config, poolFactory, remoteAddress, resolverGroup); + boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null; + String id = metricsEnabled ? poolKey.hashCode() + "" : null; - if (poolFactory.metricsEnabled || config.metricsRecorder() != null) { + InstrumentedPool newPool = metricsEnabled && Metrics.isMicrometerAvailable() ? + createPool(id, config, poolFactory, remoteAddress, resolverGroup) : + createPool(config, poolFactory, remoteAddress, resolverGroup); + + if (metricsEnabled) { // registrar is null when metrics are enabled on HttpClient level or // with the `metrics(boolean metricsEnabled)` method on ConnectionProvider - String id = poolKey.hashCode() + ""; if (poolFactory.registrar != null) { poolFactory.registrar.get().registerMetrics(name, id, remoteAddress, new DelegatingConnectionPoolMetrics(newPool.metrics())); @@ -204,6 +209,10 @@ public final Mono disposeLater() { } else if (Metrics.isMicrometerAvailable()) { deRegisterDefaultMetrics(id, remoteAddress); + PoolMetricsRecorder recorder = pool.config().metricsRecorder(); + if (recorder instanceof Disposable) { + ((Disposable) recorder).dispose(); + } } }); }); @@ -215,6 +224,10 @@ else if (Metrics.isMicrometerAvailable()) { } else if (Metrics.isMicrometerAvailable()) { deRegisterDefaultMetrics(id, remoteAddress); + PoolMetricsRecorder recorder = pool.config().metricsRecorder(); + if (recorder instanceof Disposable) { + ((Disposable) recorder).dispose(); + } } }) ); @@ -251,6 +264,10 @@ public final void disposeWhen(SocketAddress address) { } else if (Metrics.isMicrometerAvailable()) { deRegisterDefaultMetrics(id, address); + PoolMetricsRecorder recorder = e.getValue().config().metricsRecorder(); + if (recorder instanceof Disposable) { + ((Disposable) recorder).dispose(); + } } }) ).subscribe(); @@ -303,6 +320,15 @@ protected abstract InstrumentedPool createPool( SocketAddress remoteAddress, AddressResolverGroup resolverGroup); + protected InstrumentedPool createPool( + String id, + TransportConfig config, + PoolFactory poolFactory, + SocketAddress remoteAddress, + AddressResolverGroup resolverGroup) { + return createPool(config, poolFactory, remoteAddress, resolverGroup); + } + protected PoolFactory poolFactory(SocketAddress remoteAddress) { return poolFactoryPerRemoteHost.getOrDefault(remoteAddress, defaultPoolFactory); } @@ -376,6 +402,10 @@ final void disposeInactivePoolsInBackground() { } else if (Metrics.isMicrometerAvailable()) { deRegisterDefaultMetrics(id, address); + PoolMetricsRecorder recorder = e.getValue().config().metricsRecorder(); + if (recorder instanceof Disposable) { + ((Disposable) recorder).dispose(); + } } }) ).subscribe(); @@ -470,6 +500,18 @@ public InstrumentedPool newPool( return newPoolInternal(allocator, destroyHandler, evictionPredicate).buildPool(); } + public InstrumentedPool newPool( + Publisher allocator, + Function> destroyHandler, + BiPredicate evictionPredicate, + PoolMetricsRecorder poolMetricsRecorder) { + if (disposeTimeout != null) { + return newPoolInternal(allocator, destroyHandler, evictionPredicate, poolMetricsRecorder) + .buildPoolAndDecorateWith(InstrumentedPoolDecorators::gracefulShutdown); + } + return newPoolInternal(allocator, destroyHandler, evictionPredicate, poolMetricsRecorder).buildPool(); + } + public InstrumentedPool newPool( Publisher allocator, @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility @@ -483,10 +525,31 @@ public InstrumentedPool newPool( return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory); } + public InstrumentedPool newPool( + Publisher allocator, + Function> destroyHandler, + BiPredicate defaultEvictionPredicate, + PoolMetricsRecorder poolMetricsRecorder, + Function, InstrumentedPool> poolFactory) { + if (disposeTimeout != null) { + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder) + .build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown)); + } + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory); + } + PoolBuilder> newPoolInternal( Publisher allocator, Function> destroyHandler, BiPredicate defaultEvictionPredicate) { + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, null); + } + + PoolBuilder> newPoolInternal( + Publisher allocator, + Function> destroyHandler, + BiPredicate defaultEvictionPredicate, + @Nullable PoolMetricsRecorder poolMetricsRecorder) { PoolBuilder> poolBuilder = PoolBuilder.from(allocator) .destroyHandler(destroyHandler) @@ -535,6 +598,10 @@ PoolBuilder> newPoolInternal( poolBuilder = poolBuilder.idleResourceReuseMruOrder(); } + if (poolMetricsRecorder != null) { + poolBuilder.metricsRecorder(poolMetricsRecorder); + } + return poolBuilder; } diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 8dd6ea991f..b711606d24 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.time.Clock; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import java.util.function.BiPredicate; +import java.util.function.Function; import java.util.function.Supplier; import io.netty.channel.ChannelOption; @@ -46,9 +49,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.scheduler.Scheduler; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; @@ -62,10 +67,13 @@ import reactor.netty.transport.AddressUtils; import reactor.netty.transport.ClientTransportConfig; import reactor.netty.transport.NameResolverProvider; +import reactor.pool.AllocationStrategy; import reactor.pool.InstrumentedPool; import reactor.pool.PoolAcquirePendingLimitException; import reactor.pool.PoolConfig; +import reactor.pool.PoolMetricsRecorder; import reactor.pool.PooledRef; +import reactor.pool.PooledRefMetadata; import reactor.scheduler.clock.SchedulerClock; import reactor.test.StepVerifier; import reactor.test.scheduler.VirtualTimeScheduler; @@ -547,7 +555,7 @@ public Mono> acquire(Duration timeout) { @Override public PoolConfig config() { - return null; + return new PoolConfigImpl(); } @Override @@ -566,6 +574,59 @@ public PoolMetrics metrics() { } } + static final class PoolConfigImpl implements PoolConfig { + + @Override + public Mono allocator() { + return null; + } + + @Override + public AllocationStrategy allocationStrategy() { + return null; + } + + @Override + public int maxPending() { + return 0; + } + + @Override + public Function> releaseHandler() { + return null; + } + + @Override + public Function> destroyHandler() { + return null; + } + + @Override + public BiPredicate evictionPredicate() { + return null; + } + + @Override + public Scheduler acquisitionScheduler() { + return null; + } + + @Override + public PoolMetricsRecorder metricsRecorder() { + return null; + } + + @Override + public Clock clock() { + return null; + } + + @Override + public boolean reuseIdleResourcesInLruOrder() { + return false; + } + } + static final class ClientTransportConfigImpl extends ClientTransportConfig { final EventLoopGroup group; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 5d83544e2c..d3b12578de 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,7 +57,6 @@ import java.util.Queue; import java.util.function.BiPredicate; import java.util.function.Function; -import java.util.function.Supplier; import static reactor.netty.ReactorNetty.format; import static reactor.netty.ReactorNetty.getChannelContext; @@ -122,7 +121,17 @@ protected InstrumentedPool createPool( PooledConnectionProvider.PoolFactory poolFactory, SocketAddress remoteAddress, AddressResolverGroup resolverGroup) { - return new PooledConnectionAllocator(parent, config, poolFactory, () -> remoteAddress, resolverGroup).pool; + return new PooledConnectionAllocator(parent, config, poolFactory, remoteAddress, resolverGroup).pool; + } + + @Override + protected InstrumentedPool createPool( + String id, + TransportConfig config, + PooledConnectionProvider.PoolFactory poolFactory, + SocketAddress remoteAddress, + AddressResolverGroup resolverGroup) { + return new PooledConnectionAllocator(id, name(), parent, config, poolFactory, remoteAddress, resolverGroup).pool; } @Override @@ -503,25 +512,40 @@ static final class PooledConnectionAllocator { final ConnectionProvider parent; final HttpClientConfig config; final InstrumentedPool pool; - final Supplier remoteAddress; + final SocketAddress remoteAddress; final AddressResolverGroup resolver; PooledConnectionAllocator( ConnectionProvider parent, TransportConfig config, PoolFactory poolFactory, - Supplier remoteAddress, + SocketAddress remoteAddress, + AddressResolverGroup resolver) { + this(null, null, parent, config, poolFactory, remoteAddress, resolver); + } + + PooledConnectionAllocator( + @Nullable String id, + @Nullable String name, + ConnectionProvider parent, + TransportConfig config, + PoolFactory poolFactory, + SocketAddress remoteAddress, AddressResolverGroup resolver) { this.parent = parent; this.config = (HttpClientConfig) config; this.remoteAddress = remoteAddress; this.resolver = resolver; - this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); + this.pool = id == null ? + poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) : + poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + new MicrometerPoolMetricsRecorder(id, name, remoteAddress), + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); } Publisher connectChannel() { - return parent.acquire(config, new DelegatingConnectionObserver(), remoteAddress, resolver) + return parent.acquire(config, new DelegatingConnectionObserver(), () -> remoteAddress, resolver) .map(conn -> conn); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java index a43270dba2..72e3bd986c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -105,6 +105,26 @@ public KeyName[] getKeyNames() { public Meter.Type getType() { return Meter.Type.GAUGE; } + }, + + /** + * Time spent in pending acquire a stream from the connection pool. + */ + PENDING_STREAMS_TIME { + @Override + public String getName() { + return "reactor.netty.connection.provider.pending.streams.time"; + } + + @Override + public KeyName[] getKeyNames() { + return PendingStreamsTimeTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.TIMER; + } }; enum Http2ConnectionProviderMetersTags implements KeyName { @@ -139,4 +159,47 @@ public String asString() { } } } + + enum PendingStreamsTimeTags implements KeyName { + + /** + * ID. + */ + ID { + @Override + public String asString() { + return "id"; + } + }, + + /** + * NAME. + */ + NAME { + @Override + public String asString() { + return "name"; + } + }, + + /** + * Remote address. + */ + REMOTE_ADDRESS { + @Override + public String asString() { + return "remote.address"; + } + }, + + /** + * STATUS. + */ + STATUS { + @Override + public String asString() { + return "status"; + } + } + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index c333a17338..0e30f2dc31 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -378,7 +378,7 @@ void drainLoop() { borrower.fail(new PoolShutdownException()); return; } - borrower.stopPendingCountdown(); + borrower.stopPendingCountdown(true); if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Channel activated")); } @@ -422,7 +422,7 @@ void drainLoop() { borrower.fail(new PoolShutdownException()); return; } - borrower.stopPendingCountdown(); + borrower.stopPendingCountdown(true); Mono allocator = poolConfig.allocator(); Mono primary = allocator.doOnEach(sig -> { @@ -731,6 +731,8 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip final CoreSubscriber actual; final Http2Pool pool; + long pendingAcquireStart; + Disposable timeoutTask; Borrower(CoreSubscriber actual, Http2Pool pool, Duration acquireTimeout) { @@ -742,7 +744,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip @Override public void cancel() { - stopPendingCountdown(); + stopPendingCountdown(true); // this is not failure, the subscription was canceled if (compareAndSet(false, true)) { pool.cancelAcquire(this); } @@ -759,6 +761,7 @@ public void request(long n) { int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { + pendingAcquireStart = pool.clock.millis(); timeoutTask = pool.poolConfig.pendingAcquireTimer().apply(this, acquireTimeout); } pool.doAcquire(this); @@ -768,6 +771,8 @@ public void request(long n) { @Override public void run() { if (compareAndSet(false, true)) { + // this is failure, a timeout was observed + stopPendingCountdown(false); pool.cancelAcquire(Http2Pool.Borrower.this); actual.onError(new PoolAcquireTimeoutException(acquireTimeout)); } @@ -810,13 +815,21 @@ void deliver(Http2PooledRef poolSlot) { } void fail(Throwable error) { - stopPendingCountdown(); + stopPendingCountdown(false); if (!get()) { actual.onError(error); } } - void stopPendingCountdown() { + void stopPendingCountdown(boolean success) { + if (!timeoutTask.isDisposed()) { + if (success) { + pool.poolConfig.metricsRecorder().recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart); + } + else { + pool.poolConfig.metricsRecorder().recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart); + } + } timeoutTask.dispose(); } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerPoolMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerPoolMetricsRecorder.java new file mode 100644 index 0000000000..b2137f4430 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerPoolMetricsRecorder.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import reactor.core.Disposable; +import reactor.netty.internal.shaded.reactor.pool.PoolMetricsRecorder; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +import static reactor.netty.Metrics.ERROR; +import static reactor.netty.Metrics.REGISTRY; +import static reactor.netty.Metrics.SUCCESS; +import static reactor.netty.Metrics.formatSocketAddress; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS_TIME; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.PendingStreamsTimeTags.ID; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.PendingStreamsTimeTags.NAME; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.PendingStreamsTimeTags.REMOTE_ADDRESS; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.PendingStreamsTimeTags.STATUS; + +final class MicrometerPoolMetricsRecorder implements Disposable, PoolMetricsRecorder { + + final Timer pendingSuccessTimer; + final Timer pendingErrorTimer; + + MicrometerPoolMetricsRecorder(String id, String poolName, SocketAddress remoteAddress) { + pendingSuccessTimer = buildTimer(id, poolName, remoteAddress, SUCCESS); + pendingErrorTimer = buildTimer(id, poolName, remoteAddress, ERROR); + } + + @Override + public void recordAllocationSuccessAndLatency(long latencyMs) { + //noop + } + + @Override + public void recordAllocationFailureAndLatency(long latencyMs) { + //noop + } + + @Override + public void recordResetLatency(long latencyMs) { + //noop + } + + @Override + public void recordDestroyLatency(long latencyMs) { + //noop + } + + @Override + public void recordRecycled() { + //noop + } + + @Override + public void recordLifetimeDuration(long millisecondsSinceAllocation) { + //noop + } + + @Override + public void recordIdleTime(long millisecondsIdle) { + //noop + } + + @Override + public void recordSlowPath() { + //noop + } + + @Override + public void recordFastPath() { + //noop + } + + @Override + public void recordPendingSuccessAndLatency(long latencyMs) { + pendingSuccessTimer.record(latencyMs, TimeUnit.MILLISECONDS); + } + + @Override + public void recordPendingFailureAndLatency(long latencyMs) { + pendingErrorTimer.record(latencyMs, TimeUnit.MILLISECONDS); + } + + @Override + public void dispose() { + REGISTRY.remove(pendingSuccessTimer); + REGISTRY.remove(pendingErrorTimer); + } + + static Timer buildTimer(String id, String poolName, SocketAddress remoteAddress, String status) { + return Timer.builder(PENDING_STREAMS_TIME.getName()) + .tags(Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), formatSocketAddress(remoteAddress), + NAME.asString(), poolName, STATUS.asString(), status)) + .register(REGISTRY); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index 4dbda691db..5730c2968f 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; import reactor.netty.internal.shaded.reactor.pool.PoolConfig; +import reactor.netty.internal.shaded.reactor.pool.PoolMetricsRecorder; import reactor.netty.internal.shaded.reactor.pool.PooledRef; import reactor.test.StepVerifier; import reactor.util.annotation.Nullable; @@ -1205,6 +1206,55 @@ void nonHttp2ConnectionEmittedOnce() { } } + @Test + void recordsPendingCountAndLatencies() { + EmbeddedChannel channel = new EmbeddedChannel(); + TestPoolMetricsRecorder recorder = new TestPoolMetricsRecorder(); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.just(Connection.from(channel))) + .metricsRecorder(recorder) + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); + + try { + //success, acquisition happens immediately + PooledRef pooledRef = http2Pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1)); + assertThat(pooledRef).isNotNull(); + + //success, acquisition happens after pending some time + http2Pool.acquire(Duration.ofMillis(50)).subscribe(); + + //error, timed out + http2Pool.acquire(Duration.ofMillis(1)) + .as(StepVerifier::create) + .expectError(PoolAcquireTimeoutException.class) + .verify(Duration.ofSeconds(1)); + + pooledRef.release().block(Duration.ofSeconds(1)); + + assertThat(recorder.pendingSuccessCounter) + .as("pending success") + .isEqualTo(1); + + assertThat(recorder.pendingErrorCounter) + .as("pending errors") + .isEqualTo(1); + + assertThat(recorder.pendingSuccessLatency) + .as("pending success latency") + .isGreaterThanOrEqualTo(1L); + + assertThat(recorder.pendingErrorLatency) + .as("pending error latency") + .isGreaterThanOrEqualTo(1L); + } + finally { + channel.finishAndReleaseAll(); + Connection.from(channel).dispose(); + } + } + static final class TestChannelId implements ChannelId { static final Random rndm = new Random(); @@ -1234,4 +1284,69 @@ public int compareTo(ChannelId o) { return this.asShortText().compareTo(o.asShortText()); } } + + static final class TestPoolMetricsRecorder implements PoolMetricsRecorder { + + int pendingSuccessCounter; + int pendingErrorCounter; + long pendingSuccessLatency; + long pendingErrorLatency; + + @Override + public void recordAllocationSuccessAndLatency(long latencyMs) { + //noop + } + + @Override + public void recordAllocationFailureAndLatency(long latencyMs) { + //noop + } + + @Override + public void recordResetLatency(long latencyMs) { + //noop + } + + @Override + public void recordDestroyLatency(long latencyMs) { + //noop + } + + @Override + public void recordRecycled() { + //noop + } + + @Override + public void recordLifetimeDuration(long millisecondsSinceAllocation) { + //noop + } + + @Override + public void recordIdleTime(long millisecondsIdle) { + //noop + } + + @Override + public void recordSlowPath() { + //noop + } + + @Override + public void recordFastPath() { + //noop + } + + @Override + public void recordPendingSuccessAndLatency(long latencyMs) { + this.pendingSuccessCounter++; + this.pendingSuccessLatency = latencyMs; + } + + @Override + public void recordPendingFailureAndLatency(long latencyMs) { + this.pendingErrorCounter++; + this.pendingErrorLatency = latencyMs; + } + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 870b1f78f3..22071906ea 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -18,6 +18,7 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -684,10 +685,12 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole CountDownLatch latch = new CountDownLatch(10); DefaultPooledConnectionProvider provider = (DefaultPooledConnectionProvider) builder.build(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient client = createClient(provider, disposableServer.port()) .protocol(isHttp2 ? HttpProtocol.H2C : HttpProtocol.HTTP11) .doOnResponse((res, conn) -> { + serverAddress.set(conn.channel().remoteAddress()); Channel channel = conn.channel() instanceof Http2StreamChannel ? conn.channel().parent() : conn.channel(); channel.closeFuture() @@ -707,12 +710,20 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole .expectComplete() .verify(Duration.ofSeconds(5)); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + String pendingTime = isHttp2 ? ".pending.streams.time" : ".pending.connections.time"; + assertThat(provider.channelPools.size()).isEqualTo(1); if (meterRegistrar != null) { assertThat(meterRegistrar.registered.get()).isTrue(); } else { - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, NAME, metricsName)).isNotEqualTo(-1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, metricsName)).isNotEqualTo(-1); + + assertThat(getTimerValue(CONNECTION_PROVIDER_PREFIX + pendingTime, + REMOTE_ADDRESS, address, NAME, metricsName)).isNotEqualTo(-1); } if (enableEvictInBackground) { @@ -731,10 +742,18 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole } else { if (enableEvictInBackground) { - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, NAME, metricsName)).isEqualTo(-1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, metricsName)).isEqualTo(-1); + + assertThat(getTimerValue(CONNECTION_PROVIDER_PREFIX + pendingTime, + REMOTE_ADDRESS, address, NAME, metricsName)).isEqualTo(-1); } else { - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, NAME, metricsName)).isNotEqualTo(-1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, metricsName)).isNotEqualTo(-1); + + assertThat(getTimerValue(CONNECTION_PROVIDER_PREFIX + pendingTime, + REMOTE_ADDRESS, address, NAME, metricsName)).isNotEqualTo(-1); } } } @@ -798,6 +817,15 @@ private double getGaugeValue(String gaugeName, String... tags) { return result; } + private double getTimerValue(String timerName, String... tags) { + Timer timer = registry.find(timerName).tags(tags).timer(); + double result = -1; + if (timer != null) { + result = timer.count(); + } + return result; + } + @Test @SuppressWarnings("FutureReturnValueIgnored") void testHttp2PoolAndGoAway() {