From 0b961f72d882387f27c85604de78d62fda5f00f5 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 16 Oct 2024 14:39:00 +0200 Subject: [PATCH] Explore acquisition scheduler offloading. [#668] --- .../postgresql/PostgresqlConnection.java | 19 ++++++++++++++++++- .../io/r2dbc/postgresql/client/Client.java | 7 +++++++ .../postgresql/client/ReactorNettyClient.java | 12 ++++++++++++ .../r2dbc/postgresql/client/TestClient.java | 7 +++++++ 4 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 04b0fb6f..28ca991b 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -40,6 +40,7 @@ import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; +import io.r2dbc.spi.Wrapped; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -48,6 +49,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.annotation.Nullable; @@ -62,7 +64,7 @@ /** * An implementation of {@link Connection} for connecting to a PostgreSQL database. */ -final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection { +final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection, Wrapped { private final Logger logger = Loggers.getLogger(this.getClass()); @@ -384,6 +386,21 @@ public String toString() { '}'; } + @Override + public E unwrap(Class targetClass) { + + if (targetClass == Scheduler.class) { + return targetClass.cast(this.client.getScheduler()); + } + + return Wrapped.super.unwrap(targetClass); + } + + @Override + public Object unwrap() { + return null; + } + @Override public Mono validate(ValidationDepth depth) { diff --git a/src/main/java/io/r2dbc/postgresql/client/Client.java b/src/main/java/io/r2dbc/postgresql/client/Client.java index e563e2d0..ed98307c 100644 --- a/src/main/java/io/r2dbc/postgresql/client/Client.java +++ b/src/main/java/io/r2dbc/postgresql/client/Client.java @@ -28,6 +28,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import java.util.Optional; import java.util.TimeZone; @@ -121,6 +122,12 @@ default Flux exchange(Publisher requests) { */ Optional getProcessId(); + /** + * @return returns the EventLoop as scheduler. + * @since 1.0.7 + */ + Scheduler getScheduler(); + /** * Returns the connected process secret key if it has been communicated. * diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index 1effe894..cf81fa49 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -54,6 +55,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.channel.AbortedException; @@ -107,6 +109,8 @@ public final class ReactorNettyClient implements Client { private final Connection connection; + private final Scheduler scheduler; + private ConnectionContext context; private final Sinks.Many> requestSink = Sinks.many().unicast().onBackpressureBuffer(); @@ -150,6 +154,9 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { this.byteBufAllocator = connection.outbound().alloc(); this.context = new ConnectionContext().withChannelId(connection.channel().toString()); + EventLoop eventLoop = connection.channel().eventLoop(); + this.scheduler = Schedulers.fromExecutorService(eventLoop, eventLoop.toString()); + AtomicReference receiveError = new AtomicReference<>(); connection.inbound().receive() @@ -453,6 +460,11 @@ public Optional getProcessId() { return Optional.ofNullable(this.processId); } + @Override + public Scheduler getScheduler() { + return this.scheduler; + } + @Override public Optional getSecretKey() { return Optional.ofNullable(this.secretKey); diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java index b783bf18..464a9825 100644 --- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java +++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java @@ -28,6 +28,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; import java.util.ArrayList; @@ -138,6 +140,11 @@ public Optional getProcessId() { return Optional.ofNullable(this.processId); } + @Override + public Scheduler getScheduler() { + return Schedulers.immediate(); + } + @Override public Optional getSecretKey() { return Optional.ofNullable(this.secretKey);