Skip to content

Commit

Permalink
Improve mechanism for extracting the result of a PlainActionFuture (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktindall authored Jul 4, 2024
1 parent 1be0f2b commit 7a8a7c0
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 72 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/110019.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 110019
summary: Improve mechanism for extracting the result of a `PlainActionFuture`
area: Distributed
type: enhancement
issues:
- 108125
Original file line number Diff line number Diff line change
Expand Up @@ -178,32 +178,19 @@ public T actionGet(long timeout, TimeUnit unit) {
* Return the result of this future, similarly to {@link FutureUtils#get} with a zero timeout except that this method ignores the
* interrupted status of the calling thread.
* <p>
* As with {@link FutureUtils#get}, if the future completed exceptionally with a {@link RuntimeException} then this method throws that
* exception, but if the future completed exceptionally with an exception that is not a {@link RuntimeException} then this method throws
* an {@link UncategorizedExecutionException} whose cause is an {@link ExecutionException} whose cause is the completing exception.
* If the future completed exceptionally then this method throws an {@link ExecutionException} whose cause is the completing exception.
* <p>
* It is not valid to call this method if the future is incomplete.
*
* @return the result of this future, if it has been completed successfully.
* @throws RuntimeException if this future was completed exceptionally, wrapping checked exceptions as described above.
* @throws ExecutionException if this future was completed exceptionally.
* @throws CancellationException if this future was cancelled.
* @throws IllegalStateException if this future is incomplete.
*/
public T result() {
public T result() throws ExecutionException {
return sync.result();
}

/**
* Return the result of this future, if it has been completed successfully, or unwrap and throw the exception with which it was
* completed exceptionally. It is not valid to call this method if the future is incomplete.
*/
public T actionResult() {
try {
return result();
} catch (ElasticsearchException e) {
throw unwrapEsException(e);
}
}

/**
* <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
* private subclass to hold the synchronizer. This synchronizer is used to
Expand All @@ -217,7 +204,7 @@ public T actionResult() {
* RUNNING to COMPLETING, that thread will then set the result of the
* computation, and only then transition to COMPLETED or CANCELLED.
* <p>
* We don't use the integer argument passed between acquire methods so we
* We don't use the integer argument passed between acquire methods, so we
* pass around a -1 everywhere.
*/
static final class Sync<V> extends AbstractQueuedSynchronizer {
Expand Down Expand Up @@ -302,24 +289,9 @@ private V getValue() throws CancellationException, ExecutionException {
}
}

V result() {
final int state = getState();
switch (state) {
case COMPLETED:
if (exception instanceof RuntimeException runtimeException) {
throw runtimeException;
} else if (exception != null) {
throw new UncategorizedExecutionException("Failed execution", new ExecutionException(exception));
} else {
return value;
}
case CANCELLED:
throw new CancellationException("Task was cancelled.");
default:
final var message = "Error, synchronizer in invalid state: " + state;
assert false : message;
throw new IllegalStateException(message);
}
V result() throws CancellationException, ExecutionException {
assert isDone() : "Error, synchronizer in invalid state: " + getState();
return getValue();
}

/**
Expand Down Expand Up @@ -358,7 +330,7 @@ boolean cancel() {
}

/**
* Implementation of completing a task. Either {@code v} or {@code t} will
* Implementation of completing a task. Either {@code v} or {@code e} will
* be set but not both. The {@code finalState} is the state to change to
* from {@link #RUNNING}. If the state is not in the RUNNING state we
* return {@code false} after waiting for the state to be set to a valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
Expand All @@ -33,6 +34,7 @@

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -172,8 +174,12 @@ public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
if (future == null || future.isDone() == false) {
return null;
}
// call non-blocking actionResult() as we could be on a network or scheduler thread which we must not block
return future.actionResult();
// call non-blocking result() as we could be on a network or scheduler thread which we must not block
try {
return future.result();
} catch (ExecutionException e) {
throw new UncategorizedExecutionException("An error occurred fetching timestamp field type for " + index, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;

Expand All @@ -68,7 +69,7 @@ void runTest() {
future
);

final var response = future.result();
final var response = safeGet(future);
assertThat(response.getFailures(), empty());
assertThat(response.getStoreStatuses(), anEmptyMap());
assertThat(shardsWithFailures, empty());
Expand Down Expand Up @@ -138,7 +139,7 @@ void runTest() {
listExpected = false;
assertFalse(future.isDone());
deterministicTaskQueue.runAllTasks();
expectThrows(TaskCancelledException.class, future::result);
expectThrows(ExecutionException.class, TaskCancelledException.class, future::result);
}
});
}
Expand All @@ -159,7 +160,10 @@ void runTest() {
failOneRequest = true;
deterministicTaskQueue.runAllTasks();
assertFalse(failOneRequest);
assertEquals("simulated", expectThrows(ElasticsearchException.class, future::result).getMessage());
assertEquals(
"simulated",
expectThrows(ExecutionException.class, ElasticsearchException.class, future::result).getMessage()
);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public void testNoResult() {
assumeTrue("assertions required for this test", Assertions.ENABLED);
final var future = new PlainActionFuture<>();
expectThrows(AssertionError.class, future::result);
expectThrows(AssertionError.class, future::actionResult);
}

public void testUnwrapException() {
Expand All @@ -93,19 +92,17 @@ private void checkUnwrap(Exception exception, Class<? extends Exception> actionG

assertEquals(actionGetException, expectThrows(RuntimeException.class, future::actionGet).getClass());
assertEquals(actionGetException, expectThrows(RuntimeException.class, () -> future.actionGet(10, TimeUnit.SECONDS)).getClass());
assertEquals(actionGetException, expectThrows(RuntimeException.class, future::actionResult).getClass());
assertEquals(actionGetException, expectThrows(RuntimeException.class, expectIgnoresInterrupt(future::actionResult)).getClass());
assertEquals(getException, expectThrows(ExecutionException.class, future::get).getCause().getClass());
assertEquals(getException, expectThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS)).getCause().getClass());

if (exception instanceof RuntimeException) {
assertEquals(getException, expectThrows(Exception.class, future::result).getClass());
assertEquals(getException, expectThrows(Exception.class, expectIgnoresInterrupt(future::result)).getClass());
expectThrows(ExecutionException.class, getException, future::result);
expectThrows(ExecutionException.class, getException, expectIgnoresInterrupt(future::result));
assertEquals(getException, expectThrows(Exception.class, () -> FutureUtils.get(future)).getClass());
assertEquals(getException, expectThrows(Exception.class, () -> FutureUtils.get(future, 10, TimeUnit.SECONDS)).getClass());
} else {
assertEquals(getException, expectThrowsWrapped(future::result).getClass());
assertEquals(getException, expectThrowsWrapped(expectIgnoresInterrupt(future::result)).getClass());
expectThrows(ExecutionException.class, getException, future::result);
expectThrows(ExecutionException.class, getException, expectIgnoresInterrupt(future::result));
assertEquals(getException, expectThrowsWrapped(() -> FutureUtils.get(future)).getClass());
assertEquals(getException, expectThrowsWrapped(() -> FutureUtils.get(future, 10, TimeUnit.SECONDS)).getClass());
}
Expand All @@ -129,12 +126,10 @@ public void testCancelException() {
assertCancellation(() -> future.get(10, TimeUnit.SECONDS));
assertCancellation(() -> future.actionGet(10, TimeUnit.SECONDS));
assertCancellation(future::result);
assertCancellation(future::actionResult);

try {
Thread.currentThread().interrupt();
assertCancellation(future::result);
assertCancellation(future::actionResult);
} finally {
assertTrue(Thread.interrupted());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testNoPendingRefreshIfAlreadyCancelled() {
expectThrows(ExecutionException.class, TaskCancelledException.class, future::get);
}

public void testListenersCompletedByRefresh() {
public void testListenersCompletedByRefresh() throws ExecutionException {
final TestCache testCache = new TestCache();

// The first get() calls the refresh function
Expand Down Expand Up @@ -81,7 +81,7 @@ public void testListenersCompletedByRefresh() {
assertThat(future3.result(), equalTo(2));
}

public void testListenerCompletedByRefreshEvenIfDiscarded() {
public void testListenerCompletedByRefreshEvenIfDiscarded() throws ExecutionException {
final TestCache testCache = new TestCache();

// This computation is discarded before it completes.
Expand All @@ -103,7 +103,7 @@ public void testListenerCompletedByRefreshEvenIfDiscarded() {
assertThat(future1.result(), sameInstance(future2.result()));
}

public void testListenerCompletedWithCancellationExceptionIfRefreshCancelled() {
public void testListenerCompletedWithCancellationExceptionIfRefreshCancelled() throws ExecutionException {
final TestCache testCache = new TestCache();

// This computation is discarded before it completes.
Expand All @@ -120,12 +120,12 @@ public void testListenerCompletedWithCancellationExceptionIfRefreshCancelled() {
testCache.get("bar", () -> false, future2);
testCache.assertPendingRefreshes(2);
testCache.assertNextRefreshCancelled();
expectThrows(TaskCancelledException.class, future1::result);
expectThrows(ExecutionException.class, TaskCancelledException.class, future1::result);
testCache.completeNextRefresh("bar", 2);
assertThat(future2.result(), equalTo(2));
}

public void testListenerCompletedWithFresherInputIfSuperseded() {
public void testListenerCompletedWithFresherInputIfSuperseded() throws ExecutionException {
final TestCache testCache = new TestCache();

// This computation is superseded before it completes.
Expand Down Expand Up @@ -164,10 +164,10 @@ public void testRunsCancellationChecksEvenWhenSuperseded() {

isCancelled.set(true);
testCache.completeNextRefresh("bar", 1);
expectThrows(TaskCancelledException.class, future1::result);
expectThrows(ExecutionException.class, TaskCancelledException.class, future1::result);
}

public void testExceptionCompletesListenersButIsNotCached() {
public void testExceptionCompletesListenersButIsNotCached() throws ExecutionException {
final TestCache testCache = new TestCache();

// If a refresh results in an exception then all the pending get() calls complete exceptionally
Expand All @@ -178,16 +178,16 @@ public void testExceptionCompletesListenersButIsNotCached() {
testCache.assertPendingRefreshes(1);
final ElasticsearchException exception = new ElasticsearchException("simulated");
testCache.completeNextRefresh(exception);
assertSame(exception, expectThrows(ElasticsearchException.class, future0::result));
assertSame(exception, expectThrows(ElasticsearchException.class, future1::result));
assertSame(exception, expectThrows(ExecutionException.class, ElasticsearchException.class, future0::result));
assertSame(exception, expectThrows(ExecutionException.class, ElasticsearchException.class, future1::result));

testCache.assertNoPendingRefreshes();
// The exception is not cached, however, so a subsequent get() call with a matching key performs another refresh
final TestFuture future2 = new TestFuture();
testCache.get("foo", () -> false, future2);
testCache.assertPendingRefreshes(1);
testCache.completeNextRefresh("foo", 1);
assertThat(future2.actionResult(), equalTo(1));
assertThat(future2.result(), equalTo(1));
}

public void testConcurrentRefreshesAndCancellation() throws InterruptedException {
Expand Down Expand Up @@ -416,7 +416,7 @@ protected String getKey(String s) {
testCache.get("successful", () -> false, successfulFuture);
cancelledThread.join();

expectThrows(TaskCancelledException.class, cancelledFuture::result);
expectThrows(ExecutionException.class, TaskCancelledException.class, cancelledFuture::result);
}

private static final ThreadContext testThreadContext = new ThreadContext(Settings.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ public void testRejection() {
safeAwait(barrier); // release blocked executor

if (success) {
expectThrows(EsRejectedExecutionException.class, future2::result);
expectThrows(ExecutionException.class, EsRejectedExecutionException.class, future2::result);
assertNull(future1.actionGet(10, TimeUnit.SECONDS));
} else {
var exception = expectThrows(EsRejectedExecutionException.class, future2::result);
var exception = expectThrows(ExecutionException.class, EsRejectedExecutionException.class, future2::result);
assertEquals(1, exception.getSuppressed().length);
assertThat(exception.getSuppressed()[0], instanceOf(ElasticsearchException.class));
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void createServices() {
}

@After
public void verifyReposThenStopServices() {
public void verifyReposThenStopServices() throws ExecutionException {
try {
clearDisruptionsAndAwaitSync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static ClusterState startShardsAndReroute(
public static ClusterState reroute(AllocationService allocationService, ClusterState clusterState) {
final var listener = new PlainActionFuture<Void>();
final var result = allocationService.reroute(clusterState, "test reroute", listener);
listener.result(); // ensures it completed successfully
safeGet(listener::result); // ensures it completed successfully
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -2298,6 +2299,20 @@ public static <T> T safeGet(Future<T> future) {
}
}

/**
* Call a {@link CheckedSupplier}, converting all exceptions into an {@link AssertionError}. Useful for avoiding
* try/catch boilerplate or cumbersome propagation of checked exceptions around something that <i>should</i> never throw.
*
* @return The value returned by the {@code supplier}.
*/
public static <T> T safeGet(CheckedSupplier<T, ?> supplier) {
try {
return supplier.get();
} catch (Exception e) {
return fail(e);
}
}

/**
* Wait for the exceptional completion of the given {@link SubscribableListener}, with a timeout of {@link #SAFE_AWAIT_TIMEOUT},
* preserving the thread's interrupt status flag and converting a successful completion, interrupt or timeout into an {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

/**
* An {@link ActionFuture} that listeners can be attached to. Listeners are executed when the future is completed
Expand Down Expand Up @@ -200,7 +201,23 @@ public void addListener(ActionListener<Long> listener, long value) {
assert invariant();
}

private static void executeListener(final ActionListener<Long> listener, final Supplier<Long> result) {
/**
* Return the result of this future, if it has been completed successfully, or unwrap and throw the exception with which it was
* completed exceptionally. It is not valid to call this method if the future is incomplete.
*/
private Long actionResult() throws Exception {
try {
return result();
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception exCause) {
throw exCause;
} else {
throw e;
}
}
}

private static void executeListener(final ActionListener<Long> listener, final CheckedSupplier<Long, ?> result) {
try {
listener.onResponse(result.get());
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 7a8a7c0

Please sign in to comment.