From 950c2e69bab483effa257c849c88a4be29e87d73 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 21 Nov 2024 14:34:54 +0100 Subject: [PATCH] CompositeFuture should support custom future implementations (#5401) See #5399 CompositeFuture expected every future to extend FutureBase. But sometimes users may come with their own implementation. Signed-off-by: Thomas Segismont --- .../core/impl/future/CompositeFutureImpl.java | 14 ++- .../tests/future/CompositeFutureTest.java | 114 ++++++++++++++++-- 2 files changed, 113 insertions(+), 15 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java index 9053ec00d54..779769c9953 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java @@ -61,8 +61,12 @@ private CompositeFutureImpl(int op, boolean initializing, Future... results) private void init() { for (Future result : results) { - FutureBase internal = (FutureBase) result; - internal.addListener(this); + if (result instanceof FutureBase) { + FutureBase internal = (FutureBase) result; + internal.addListener(this); + } else { + result.onComplete(this); + } } Object o; synchronized (this) { @@ -205,8 +209,10 @@ public int size() { private void doComplete(Object result) { for (Future r : results) { - FutureBase internal = (FutureBase) r; - internal.removeListener(this); + if (r instanceof FutureBase) { + FutureBase internal = (FutureBase) r; + internal.removeListener(this); + } } if (result == this) { tryComplete(this); diff --git a/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java b/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java index 08b8056266d..e40b5b8b75e 100644 --- a/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java @@ -11,15 +11,9 @@ package io.vertx.tests.future; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import io.vertx.core.Completable; -import io.vertx.core.CompositeFuture; -import io.vertx.core.Future; -import io.vertx.core.Promise; +import io.vertx.core.*; import io.vertx.core.impl.future.FutureImpl; import io.vertx.test.core.Repeat; - import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.Test; @@ -29,13 +23,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; +import java.util.function.*; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** * @author Julien Viet */ @@ -577,4 +570,103 @@ public void testAnyRemovesListeners2() { Future.any(f, Future.succeededFuture()); assertEquals(Collections.emptySet(), f.listeners); } + + @Test + public void testCustomFuture() { + Promise p1 = Promise.promise(); + Promise p2 = Promise.promise(); + Promise p3 = Promise.promise(); + + CompositeFuture cf = Future.all(p1.future(), new MyFuture(p2), p3.future()); + + p1.complete(null); + p2.complete(null); + p3.complete(null); + + assertTrue(cf.isComplete()); + } + + private static class MyFuture implements Future { + + private final Future delegate; + + private MyFuture(Promise promise) { + delegate = promise.future(); + } + + @Override + public boolean isComplete() { + return delegate.isComplete(); + } + + @Override + public Future onComplete(Handler> handler) { + return delegate.onComplete(handler); + } + + @Override + public Void result() { + return delegate.result(); + } + + @Override + public Throwable cause() { + return delegate.cause(); + } + + @Override + public boolean succeeded() { + return delegate.succeeded(); + } + + @Override + public boolean failed() { + return delegate.failed(); + } + + @Override + public Future compose(Function> successMapper, Function> failureMapper) { + return delegate.compose(successMapper, failureMapper); + } + + @Override + public Future transform(Function, Future> mapper) { + return delegate.transform(mapper); + } + + @Override + public Future eventually(Supplier> mapper) { + return delegate.eventually(mapper); + } + + @Override + public Future map(Function mapper) { + return delegate.map(mapper); + } + + @Override + public Future map(V value) { + return delegate.map(value); + } + + @Override + public Future otherwise(Function mapper) { + return delegate.otherwise(mapper); + } + + @Override + public Future otherwise(Void value) { + return delegate.otherwise(value); + } + + @Override + public Future expecting(Expectation expectation) { + return delegate.expecting(expectation); + } + + @Override + public Future timeout(long delay, TimeUnit unit) { + return delegate.timeout(delay, unit); + } + } }