Skip to content

Commit

Permalink
CompositeFuture should support custom future implementations (#5401)
Browse files Browse the repository at this point in the history
See #5399

CompositeFuture expected every future to extend FutureBase.
But sometimes users may come with their own implementation.

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont authored Nov 21, 2024
1 parent 1a28a6f commit 950c2e6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ private CompositeFutureImpl(int op, boolean initializing, Future<?>... results)

private void init() {
for (Future<?> result : results) {
FutureBase internal = (FutureBase<?>) result;
internal.addListener(this);
if (result instanceof FutureBase) {
FutureBase internal = (FutureBase<?>) result;
internal.addListener(this);
} else {
result.onComplete(this);
}
}
Object o;
synchronized (this) {
Expand Down Expand Up @@ -205,8 +209,10 @@ public int size() {

private void doComplete(Object result) {
for (Future<?> r : results) {
FutureBase internal = (FutureBase<?>) r;
internal.removeListener(this);
if (r instanceof FutureBase) {
FutureBase internal = (FutureBase<?>) r;
internal.removeListener(this);
}
}
if (result == this) {
tryComplete(this);
Expand Down
114 changes: 103 additions & 11 deletions vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@

package io.vertx.tests.future;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.vertx.core.Completable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.*;
import io.vertx.core.impl.future.FutureImpl;
import io.vertx.test.core.Repeat;

import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.Test;

Expand All @@ -29,13 +23,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
Expand Down Expand Up @@ -577,4 +570,103 @@ public void testAnyRemovesListeners2() {
Future.any(f, Future.succeededFuture());
assertEquals(Collections.emptySet(), f.listeners);
}

@Test
public void testCustomFuture() {
Promise<Void> p1 = Promise.promise();
Promise<Void> p2 = Promise.promise();
Promise<Void> p3 = Promise.promise();

CompositeFuture cf = Future.all(p1.future(), new MyFuture(p2), p3.future());

p1.complete(null);
p2.complete(null);
p3.complete(null);

assertTrue(cf.isComplete());
}

private static class MyFuture implements Future<Void> {

private final Future<Void> delegate;

private MyFuture(Promise<Void> promise) {
delegate = promise.future();
}

@Override
public boolean isComplete() {
return delegate.isComplete();
}

@Override
public Future<Void> onComplete(Handler<AsyncResult<Void>> handler) {
return delegate.onComplete(handler);
}

@Override
public Void result() {
return delegate.result();
}

@Override
public Throwable cause() {
return delegate.cause();
}

@Override
public boolean succeeded() {
return delegate.succeeded();
}

@Override
public boolean failed() {
return delegate.failed();
}

@Override
public <U> Future<U> compose(Function<? super Void, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper) {
return delegate.compose(successMapper, failureMapper);
}

@Override
public <U> Future<U> transform(Function<AsyncResult<Void>, Future<U>> mapper) {
return delegate.transform(mapper);
}

@Override
public <U> Future<Void> eventually(Supplier<Future<U>> mapper) {
return delegate.eventually(mapper);
}

@Override
public <U> Future<U> map(Function<? super Void, U> mapper) {
return delegate.map(mapper);
}

@Override
public <V> Future<V> map(V value) {
return delegate.map(value);
}

@Override
public Future<Void> otherwise(Function<Throwable, Void> mapper) {
return delegate.otherwise(mapper);
}

@Override
public Future<Void> otherwise(Void value) {
return delegate.otherwise(value);
}

@Override
public Future<Void> expecting(Expectation<? super Void> expectation) {
return delegate.expecting(expectation);
}

@Override
public Future<Void> timeout(long delay, TimeUnit unit) {
return delegate.timeout(delay, unit);
}
}
}

0 comments on commit 950c2e6

Please sign in to comment.