diff --git a/pom.xml b/pom.xml
index fde9885f52e01..b5d0979a6ce09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -212,6 +212,12 @@
4.1.107.Final
+
+ io.netty
+ netty-transport
+ 4.1.107.Final
+
+
com.facebook.presto
presto-testing-docker
@@ -2607,7 +2613,9 @@
unlink-out-of-tree-build-directory
- exec
+
+ exec
+
pre-clean
rm
@@ -2619,7 +2627,9 @@
remove-out-of-tree-build-directory
- exec
+
+ exec
+
pre-clean
rm
@@ -2631,7 +2641,9 @@
create-out-of-tree-build-directory
- exec
+
+ exec
+
validate
mkdir
@@ -2643,7 +2655,9 @@
link-out-of-tree-build-directory
- exec
+
+ exec
+
validate
ln
diff --git a/presto-main/pom.xml b/presto-main/pom.xml
index f6ab94cfd3330..b28ecafa47df8 100644
--- a/presto-main/pom.xml
+++ b/presto-main/pom.xml
@@ -503,6 +503,11 @@
ratis-common
true
+
+
+ io.netty
+ netty-transport
+
diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java
index 95febdbfdd8b7..f188c2976b8b1 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java
@@ -77,6 +77,7 @@
import com.sun.management.ThreadMXBean;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
+import io.netty.channel.EventLoop;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.joda.time.DateTime;
@@ -94,10 +95,8 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -193,15 +192,13 @@ public final class HttpRemoteTask
private final AtomicReference outputBuffers = new AtomicReference<>();
private final FutureStateChange> whenSplitQueueHasSpace = new FutureStateChange<>();
@GuardedBy("this")
- private boolean splitQueueHasSpace = true;
+ private final AtomicBoolean splitQueueHasSpace = new AtomicBoolean(true);
@GuardedBy("this")
- private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty();
+ private final AtomicReference whenSplitQueueHasSpaceThreshold = new AtomicReference<>(OptionalLong.empty());
private final boolean summarizeTaskInfo;
private final HttpClient httpClient;
- private final Executor executor;
- private final ScheduledExecutorService errorScheduledExecutor;
private final Codec taskInfoCodec;
//Json codec required for TaskUpdateRequest endpoint which uses JSON and returns a TaskInfo
@@ -234,6 +231,8 @@ public final class HttpRemoteTask
private final DecayCounter taskUpdateRequestSize;
private final SchedulerStatsTracker schedulerStatsTracker;
+ private final EventLoop taskEventLoop;
+
public HttpRemoteTask(
Session session,
TaskId taskId,
@@ -244,9 +243,6 @@ public HttpRemoteTask(
Multimap initialSplits,
OutputBuffers outputBuffers,
HttpClient httpClient,
- Executor executor,
- ScheduledExecutorService updateScheduledExecutor,
- ScheduledExecutorService errorScheduledExecutor,
Duration maxErrorDuration,
Duration taskStatusRefreshMaxWait,
Duration taskInfoRefreshMaxWait,
@@ -271,7 +267,8 @@ public HttpRemoteTask(
DecayCounter taskUpdateRequestSize,
HandleResolver handleResolver,
ConnectorTypeSerdeManager connectorTypeSerdeManager,
- SchedulerStatsTracker schedulerStatsTracker)
+ SchedulerStatsTracker schedulerStatsTracker,
+ EventLoop taskEventLoop)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
@@ -281,7 +278,6 @@ public HttpRemoteTask(
requireNonNull(planFragment, "planFragment is null");
requireNonNull(outputBuffers, "outputBuffers is null");
requireNonNull(httpClient, "httpClient is null");
- requireNonNull(executor, "executor is null");
requireNonNull(taskStatusCodec, "taskStatusCodec is null");
requireNonNull(taskInfoCodec, "taskInfoCodec is null");
requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
@@ -298,6 +294,7 @@ public HttpRemoteTask(
requireNonNull(connectorTypeSerdeManager, "connectorTypeSerdeManager is null");
requireNonNull(taskUpdateRequestSize, "taskUpdateRequestSize cannot be null");
requireNonNull(schedulerStatsTracker, "schedulerStatsTracker is null");
+ requireNonNull(taskEventLoop, "taskEventLoop is null");
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
@@ -308,14 +305,12 @@ public HttpRemoteTask(
this.planFragment = planFragment;
this.outputBuffers.set(outputBuffers);
this.httpClient = httpClient;
- this.executor = executor;
- this.errorScheduledExecutor = errorScheduledExecutor;
this.summarizeTaskInfo = summarizeTaskInfo;
this.taskInfoCodec = taskInfoCodec;
this.taskInfoJsonCodec = taskInfoJsonCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.planFragmentCodec = planFragmentCodec;
- this.updateErrorTracker = taskRequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
+ this.updateErrorTracker = taskRequestErrorTracker(taskId, location, maxErrorDuration, taskEventLoop, "updating task");
this.nodeStatsTracker = requireNonNull(nodeStatsTracker, "nodeStatsTracker is null");
this.maxErrorDuration = maxErrorDuration;
this.stats = stats;
@@ -367,10 +362,10 @@ public HttpRemoteTask(
initialTask.getTaskStatus(),
taskStatusRefreshMaxWait,
taskStatusCodec,
- executor,
+ taskEventLoop,
httpClient,
maxErrorDuration,
- errorScheduledExecutor,
+ taskEventLoop,
stats,
binaryTransportEnabled,
thriftTransportEnabled,
@@ -386,9 +381,9 @@ public HttpRemoteTask(
metadataUpdatesCodec,
maxErrorDuration,
summarizeTaskInfo,
- executor,
- updateScheduledExecutor,
- errorScheduledExecutor,
+ taskEventLoop,
+ taskEventLoop,
+ taskEventLoop,
stats,
binaryTransportEnabled,
taskInfoThriftTransportEnabled,
@@ -410,6 +405,7 @@ public HttpRemoteTask(
}
});
+ this.taskEventLoop = taskEventLoop;
updateTaskStats();
updateSplitQueueSpace();
}
@@ -453,18 +449,20 @@ public URI getRemoteTaskLocation()
@Override
public void start()
{
- try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
- // to start we just need to trigger an update
- started.set(true);
- scheduleUpdate();
-
- taskStatusFetcher.start();
- taskInfoFetcher.start();
- }
+ taskEventLoop.execute(() -> {
+ try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
+ // to start we just need to trigger an update
+ started.set(true);
+ scheduleUpdate();
+
+ taskStatusFetcher.start();
+ taskInfoFetcher.start();
+ }
+ });
}
@Override
- public synchronized void addSplits(Multimap splitsBySource)
+ public void addSplits(Multimap splitsBySource)
{
requireNonNull(splitsBySource, "splitsBySource is null");
@@ -473,71 +471,79 @@ public synchronized void addSplits(Multimap splitsBySource)
return;
}
- boolean needsUpdate = false;
- for (Entry> entry : splitsBySource.asMap().entrySet()) {
- PlanNodeId sourceId = entry.getKey();
- Collection splits = entry.getValue();
- boolean isTableScanSource = tableScanPlanNodeIds.contains(sourceId);
-
- checkState(!noMoreSplits.containsKey(sourceId), "noMoreSplits has already been set for %s", sourceId);
- int added = 0;
- long addedWeight = 0;
- for (Split split : splits) {
- if (pendingSplits.put(sourceId, new ScheduledSplit(nextSplitId.getAndIncrement(), sourceId, split))) {
- if (isTableScanSource) {
- added++;
- addedWeight = addExact(addedWeight, split.getSplitWeight().getRawValue());
+ taskEventLoop.execute(() -> {
+ boolean needsUpdate = false;
+ for (Entry> entry : splitsBySource.asMap().entrySet()) {
+ PlanNodeId sourceId = entry.getKey();
+ Collection splits = entry.getValue();
+ boolean isTableScanSource = tableScanPlanNodeIds.contains(sourceId);
+
+ checkState(!noMoreSplits.containsKey(sourceId), "noMoreSplits has already been set for %s", sourceId);
+ int added = 0;
+ long addedWeight = 0;
+ for (Split split : splits) {
+ if (pendingSplits.put(sourceId, new ScheduledSplit(nextSplitId.getAndIncrement(), sourceId, split))) {
+ if (isTableScanSource) {
+ added++;
+ addedWeight = addExact(addedWeight, split.getSplitWeight().getRawValue());
+ }
}
}
+ if (isTableScanSource) {
+ pendingSourceSplitCount += added;
+ pendingSourceSplitsWeight = addExact(pendingSourceSplitsWeight, addedWeight);
+ updateTaskStats();
+ }
+ needsUpdate = true;
}
- if (isTableScanSource) {
- pendingSourceSplitCount += added;
- pendingSourceSplitsWeight = addExact(pendingSourceSplitsWeight, addedWeight);
- updateTaskStats();
- }
- needsUpdate = true;
- }
- updateSplitQueueSpace();
+ updateSplitQueueSpace();
- if (needsUpdate) {
- this.needsUpdate.set(true);
- scheduleUpdate();
- }
+ if (needsUpdate) {
+ this.needsUpdate.set(true);
+ scheduleUpdate();
+ }
+ });
}
@Override
- public synchronized void noMoreSplits(PlanNodeId sourceId)
+ public void noMoreSplits(PlanNodeId sourceId)
{
- if (noMoreSplits.containsKey(sourceId)) {
- return;
- }
+ taskEventLoop.execute(() -> {
+ if (noMoreSplits.containsKey(sourceId)) {
+ return;
+ }
- noMoreSplits.put(sourceId, true);
- needsUpdate.set(true);
- scheduleUpdate();
+ noMoreSplits.put(sourceId, true);
+ needsUpdate.set(true);
+ scheduleUpdate();
+ });
}
@Override
- public synchronized void noMoreSplits(PlanNodeId sourceId, Lifespan lifespan)
+ public void noMoreSplits(PlanNodeId sourceId, Lifespan lifespan)
{
- if (pendingNoMoreSplitsForLifespan.put(sourceId, lifespan)) {
- needsUpdate.set(true);
- scheduleUpdate();
- }
+ taskEventLoop.execute(() -> {
+ if (pendingNoMoreSplitsForLifespan.put(sourceId, lifespan)) {
+ needsUpdate.set(true);
+ scheduleUpdate();
+ }
+ });
}
@Override
- public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers)
+ public void setOutputBuffers(OutputBuffers newOutputBuffers)
{
if (getTaskStatus().getState().isDone()) {
return;
}
- if (newOutputBuffers.getVersion() > outputBuffers.get().getVersion()) {
- outputBuffers.set(newOutputBuffers);
- needsUpdate.set(true);
- scheduleUpdate();
- }
+ taskEventLoop.execute(() -> {
+ if (newOutputBuffers.getVersion() > outputBuffers.get().getVersion()) {
+ outputBuffers.set(newOutputBuffers);
+ needsUpdate.set(true);
+ scheduleUpdate();
+ }
+ });
}
@Override
@@ -555,7 +561,7 @@ public ListenableFuture> removeRemoteSource(TaskId remoteSourceTaskId)
taskId,
remoteSourceUri,
maxErrorDuration,
- errorScheduledExecutor,
+ taskEventLoop,
"Remove exchange remote source");
SettableFuture> future = SettableFuture.create();
@@ -604,7 +610,7 @@ public void onFailure(Throwable failedReason)
doRemoveRemoteSource(errorTracker, request, future);
}
else {
- errorRateLimit.addListener(() -> doRemoveRemoteSource(errorTracker, request, future), errorScheduledExecutor);
+ errorRateLimit.addListener(() -> doRemoveRemoteSource(errorTracker, request, future), taskEventLoop);
}
}
};
@@ -688,29 +694,31 @@ public void addFinalTaskInfoListener(StateChangeListener stateChangeLi
}
@Override
- public synchronized ListenableFuture> whenSplitQueueHasSpace(long weightThreshold)
+ public ListenableFuture> whenSplitQueueHasSpace(long weightThreshold)
{
- if (whenSplitQueueHasSpaceThreshold.isPresent()) {
- checkArgument(weightThreshold == whenSplitQueueHasSpaceThreshold.getAsLong(), "Multiple split queue space notification thresholds not supported");
+ if (whenSplitQueueHasSpaceThreshold.get().isPresent()) {
+ checkArgument(weightThreshold == whenSplitQueueHasSpaceThreshold.get().getAsLong(), "Multiple split queue space notification thresholds not supported");
}
else {
- whenSplitQueueHasSpaceThreshold = OptionalLong.of(weightThreshold);
+ whenSplitQueueHasSpaceThreshold.set(OptionalLong.of(weightThreshold));
updateSplitQueueSpace();
}
- if (splitQueueHasSpace) {
+ if (splitQueueHasSpace.get()) {
return immediateFuture(null);
}
return whenSplitQueueHasSpace.createNewListener();
}
- private synchronized void updateSplitQueueSpace()
+ private void updateSplitQueueSpace()
{
// Must check whether the unacknowledged split count threshold is reached even without listeners registered yet
- splitQueueHasSpace = getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits &&
- (!whenSplitQueueHasSpaceThreshold.isPresent() || getQueuedPartitionedSplitsWeight() < whenSplitQueueHasSpaceThreshold.getAsLong());
+ boolean hasSpaceThresholdPresent = whenSplitQueueHasSpaceThreshold.get().isPresent();
+ boolean hasSpace = getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits &&
+ (!hasSpaceThresholdPresent || getQueuedPartitionedSplitsWeight() < whenSplitQueueHasSpaceThreshold.get().getAsLong());
+ splitQueueHasSpace.set(hasSpace);
// Only trigger notifications if a listener might be registered
- if (splitQueueHasSpace && whenSplitQueueHasSpaceThreshold.isPresent()) {
- whenSplitQueueHasSpace.complete(null, executor);
+ if (hasSpace && hasSpaceThresholdPresent) {
+ whenSplitQueueHasSpace.complete(null, taskEventLoop);
}
}
@@ -729,40 +737,42 @@ private void updateTaskStats()
}
}
- private synchronized void processTaskUpdate(TaskInfo newValue, List sources)
+ private void processTaskUpdate(TaskInfo newValue, List sources)
{
- //Setting the flag as false since TaskUpdateRequest is not on thrift yet.
- //Once it is converted to thrift we can use the isThrift enabled flag here.
- updateTaskInfo(newValue, false);
-
- // remove acknowledged splits, which frees memory
- for (TaskSource source : sources) {
- PlanNodeId planNodeId = source.getPlanNodeId();
- boolean isTableScanSource = tableScanPlanNodeIds.contains(planNodeId);
- int removed = 0;
- long removedWeight = 0;
- for (ScheduledSplit split : source.getSplits()) {
- if (pendingSplits.remove(planNodeId, split)) {
- if (isTableScanSource) {
- removed++;
- removedWeight = addExact(removedWeight, split.getSplit().getSplitWeight().getRawValue());
+ taskEventLoop.execute(() -> {
+ //Setting the flag as false since TaskUpdateRequest is not on thrift yet.
+ //Once it is converted to thrift we can use the isThrift enabled flag here.
+ updateTaskInfo(newValue, false);
+
+ // remove acknowledged splits, which frees memory
+ for (TaskSource source : sources) {
+ PlanNodeId planNodeId = source.getPlanNodeId();
+ boolean isTableScanSource = tableScanPlanNodeIds.contains(planNodeId);
+ int removed = 0;
+ long removedWeight = 0;
+ for (ScheduledSplit split : source.getSplits()) {
+ if (pendingSplits.remove(planNodeId, split)) {
+ if (isTableScanSource) {
+ removed++;
+ removedWeight = addExact(removedWeight, split.getSplit().getSplitWeight().getRawValue());
+ }
}
}
+ if (source.isNoMoreSplits()) {
+ noMoreSplits.put(planNodeId, false);
+ }
+ for (Lifespan lifespan : source.getNoMoreSplitsForLifespan()) {
+ pendingNoMoreSplitsForLifespan.remove(planNodeId, lifespan);
+ }
+ if (isTableScanSource) {
+ pendingSourceSplitCount -= removed;
+ pendingSourceSplitsWeight -= removedWeight;
+ }
}
- if (source.isNoMoreSplits()) {
- noMoreSplits.put(planNodeId, false);
- }
- for (Lifespan lifespan : source.getNoMoreSplitsForLifespan()) {
- pendingNoMoreSplitsForLifespan.remove(planNodeId, lifespan);
- }
- if (isTableScanSource) {
- pendingSourceSplitCount -= removed;
- pendingSourceSplitsWeight -= removedWeight;
- }
- }
- // Update stats before split queue space to ensure node stats are up to date before waking up the scheduler
- updateTaskStats();
- updateSplitQueueSpace();
+ // Update stats before split queue space to ensure node stats are up to date before waking up the scheduler
+ updateTaskStats();
+ updateSplitQueueSpace();
+ });
}
private void onSuccessTaskInfo(TaskInfo result)
@@ -832,103 +842,105 @@ private void onFailureTaskInfo(
doScheduleAsyncCleanupRequest(cleanupBackoff, request, action);
}
else {
- errorScheduledExecutor.schedule(() -> doScheduleAsyncCleanupRequest(cleanupBackoff, request, action), delayNanos, NANOSECONDS);
+ taskEventLoop.schedule(() -> doScheduleAsyncCleanupRequest(cleanupBackoff, request, action), delayNanos, NANOSECONDS);
}
}
- private synchronized void scheduleUpdate()
+ private void scheduleUpdate()
{
taskUpdateTimeline.add(System.nanoTime());
- executor.execute(this::sendUpdate);
+ taskEventLoop.execute(this::sendUpdate);
}
- private synchronized void sendUpdate()
+ private void sendUpdate()
{
- TaskStatus taskStatus = getTaskStatus();
- // don't update if the task hasn't been started yet or if it is already finished
- if (!started.get() || !needsUpdate.get() || taskStatus.getState().isDone()) {
- return;
- }
+ taskEventLoop.execute(() -> {
+ TaskStatus taskStatus = getTaskStatus();
+ // don't update if the task hasn't been started yet or if it is already finished
+ if (!started.get() || !needsUpdate.get() || taskStatus.getState().isDone()) {
+ return;
+ }
- // if there is a request already running, wait for it to complete
- if (this.currentRequest != null && !this.currentRequest.isDone()) {
- return;
- }
+ // if there is a request already running, wait for it to complete
+ if (this.currentRequest != null && !this.currentRequest.isDone()) {
+ return;
+ }
- // if throttled due to error, asynchronously wait for timeout and try again
- ListenableFuture> errorRateLimit = updateErrorTracker.acquireRequestPermit();
- if (!errorRateLimit.isDone()) {
- errorRateLimit.addListener(this::sendUpdate, executor);
- return;
- }
+ // if throttled due to error, asynchronously wait for timeout and try again
+ ListenableFuture> errorRateLimit = updateErrorTracker.acquireRequestPermit();
+ if (!errorRateLimit.isDone()) {
+ errorRateLimit.addListener(this::sendUpdate, taskEventLoop);
+ return;
+ }
- List sources = getSources();
+ List sources = getSources();
- Optional fragment = Optional.empty();
- if (sendPlan.get()) {
- long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
- fragment = Optional.of(planFragment.bytesForTaskSerialization(planFragmentCodec));
- schedulerStatsTracker.recordTaskPlanSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start);
- }
- Optional writeInfo = sendPlan.get() ? Optional.of(tableWriteInfo) : Optional.empty();
- TaskUpdateRequest updateRequest = new TaskUpdateRequest(
- session.toSessionRepresentation(),
- session.getIdentity().getExtraCredentials(),
- fragment,
- sources,
- outputBuffers.get(),
- writeInfo);
- long serializeStartCpuTimeNanos = THREAD_MX_BEAN.getCurrentThreadCpuTime();
- byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
- schedulerStatsTracker.recordTaskUpdateSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - serializeStartCpuTimeNanos);
-
- taskUpdateRequestSize.add(taskUpdateRequestJson.length);
-
- if (taskUpdateRequestJson.length > maxTaskUpdateSizeInBytes) {
- failTask(new PrestoException(EXCEEDED_TASK_UPDATE_SIZE_LIMIT, getExceededTaskUpdateSizeMessage(taskUpdateRequestJson)));
- }
+ Optional fragment = Optional.empty();
+ if (sendPlan.get()) {
+ long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
+ fragment = Optional.of(planFragment.bytesForTaskSerialization(planFragmentCodec));
+ schedulerStatsTracker.recordTaskPlanSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start);
+ }
+ Optional writeInfo = sendPlan.get() ? Optional.of(tableWriteInfo) : Optional.empty();
+ TaskUpdateRequest updateRequest = new TaskUpdateRequest(
+ session.toSessionRepresentation(),
+ session.getIdentity().getExtraCredentials(),
+ fragment,
+ sources,
+ outputBuffers.get(),
+ writeInfo);
+ long serializeStartCpuTimeNanos = THREAD_MX_BEAN.getCurrentThreadCpuTime();
+ byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
+ schedulerStatsTracker.recordTaskUpdateSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - serializeStartCpuTimeNanos);
+
+ taskUpdateRequestSize.add(taskUpdateRequestJson.length);
+
+ if (taskUpdateRequestJson.length > maxTaskUpdateSizeInBytes) {
+ failTask(new PrestoException(EXCEEDED_TASK_UPDATE_SIZE_LIMIT, getExceededTaskUpdateSizeMessage(taskUpdateRequestJson)));
+ }
- if (fragment.isPresent()) {
- stats.updateWithPlanSize(taskUpdateRequestJson.length);
- }
- else {
- if (ThreadLocalRandom.current().nextDouble() < UPDATE_WITHOUT_PLAN_STATS_SAMPLE_RATE) {
- // This is to keep track of the task update size even when the plan fragment is NOT present
- stats.updateWithoutPlanSize(taskUpdateRequestJson.length);
+ if (fragment.isPresent()) {
+ stats.updateWithPlanSize(taskUpdateRequestJson.length);
+ }
+ else {
+ if (ThreadLocalRandom.current().nextDouble() < UPDATE_WITHOUT_PLAN_STATS_SAMPLE_RATE) {
+ // This is to keep track of the task update size even when the plan fragment is NOT present
+ stats.updateWithoutPlanSize(taskUpdateRequestJson.length);
+ }
}
- }
- HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
- Request request = setContentTypeHeaders(binaryTransportEnabled, preparePost())
- .setUri(uriBuilder.build())
- .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
- .build();
+ HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
+ Request request = setContentTypeHeaders(binaryTransportEnabled, preparePost())
+ .setUri(uriBuilder.build())
+ .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
+ .build();
- ResponseHandler responseHandler;
- if (binaryTransportEnabled) {
- responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoCodec);
- }
- else {
- responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoJsonCodec);
- }
+ ResponseHandler responseHandler;
+ if (binaryTransportEnabled) {
+ responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoCodec);
+ }
+ else {
+ responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoJsonCodec);
+ }
- updateErrorTracker.startRequest();
+ updateErrorTracker.startRequest();
- ListenableFuture> future = httpClient.executeAsync(request, responseHandler);
- currentRequest = future;
- currentRequestStartNanos = System.nanoTime();
- if (!taskUpdateTimeline.isEmpty()) {
- currentRequestLastTaskUpdate = taskUpdateTimeline.getLong(taskUpdateTimeline.size() - 1);
- }
+ ListenableFuture> future = httpClient.executeAsync(request, responseHandler);
+ currentRequest = future;
+ currentRequestStartNanos = System.nanoTime();
+ if (!taskUpdateTimeline.isEmpty()) {
+ currentRequestLastTaskUpdate = taskUpdateTimeline.getLong(taskUpdateTimeline.size() - 1);
+ }
- // The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
- // and does so without grabbing the instance lock.
- needsUpdate.set(false);
+ // The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
+ // and does so without grabbing the instance lock.
+ needsUpdate.set(false);
- Futures.addCallback(
- future,
- new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR),
- executor);
+ Futures.addCallback(
+ future,
+ new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR),
+ taskEventLoop);
+ });
}
private String getExceededTaskUpdateSizeMessage(byte[] taskUpdateRequestJson)
@@ -937,7 +949,7 @@ private String getExceededTaskUpdateSizeMessage(byte[] taskUpdateRequestJson)
return format("TaskUpdate size of %s has exceeded the limit of %s", taskUpdateSize.toString(), this.maxTaskUpdateDataSize.toString());
}
- private synchronized List getSources()
+ private List getSources()
{
return Stream.concat(tableScanPlanNodeIds.stream(), remoteSourcePlanNodeIds.stream())
.map(this::getSource)
@@ -945,7 +957,7 @@ private synchronized List getSources()
.collect(toImmutableList());
}
- private synchronized TaskSource getSource(PlanNodeId planNodeId)
+ private TaskSource getSource(PlanNodeId planNodeId)
{
Set splits = pendingSplits.get(planNodeId);
boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId));
@@ -960,64 +972,68 @@ private synchronized TaskSource getSource(PlanNodeId planNodeId)
}
@Override
- public synchronized void cancel()
+ public void cancel()
{
- try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
- TaskStatus taskStatus = getTaskStatus();
- if (taskStatus.getState().isDone()) {
- return;
- }
+ taskEventLoop.execute(() -> {
+ try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
+ TaskStatus taskStatus = getTaskStatus();
+ if (taskStatus.getState().isDone()) {
+ return;
+ }
- // send cancel to task and ignore response
- HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus).addParameter("abort", "false");
- Request.Builder builder = setContentTypeHeaders(binaryTransportEnabled, prepareDelete());
- if (taskInfoThriftTransportEnabled) {
- builder = ThriftRequestUtils.prepareThriftDelete(thriftProtocol);
+ // send cancel to task and ignore response
+ HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus).addParameter("abort", "false");
+ Request.Builder builder = setContentTypeHeaders(binaryTransportEnabled, prepareDelete());
+ if (taskInfoThriftTransportEnabled) {
+ builder = ThriftRequestUtils.prepareThriftDelete(thriftProtocol);
+ }
+ Request request = builder.setUri(uriBuilder.build())
+ .build();
+ scheduleAsyncCleanupRequest(createCleanupBackoff(), request, "cancel");
}
- Request request = builder.setUri(uriBuilder.build())
- .build();
- scheduleAsyncCleanupRequest(createCleanupBackoff(), request, "cancel");
- }
+ });
}
- private synchronized void cleanUpTask()
+ private void cleanUpTask()
{
- checkState(getTaskStatus().getState().isDone(), "attempt to clean up a task that is not done yet");
-
- // clear pending splits to free memory
- pendingSplits.clear();
- pendingSourceSplitCount = 0;
- pendingSourceSplitsWeight = 0;
- updateTaskStats();
- splitQueueHasSpace = true;
- whenSplitQueueHasSpace.complete(null, executor);
-
- // cancel pending request
- if (currentRequest != null) {
- // do not terminate if the request is already running to avoid closing pooled connections
- currentRequest.cancel(false);
- currentRequest = null;
- currentRequestStartNanos = 0;
- }
+ taskEventLoop.execute(() -> {
+ checkState(getTaskStatus().getState().isDone(), "attempt to clean up a task that is not done yet");
- taskStatusFetcher.stop();
+ // clear pending splits to free memory
+ pendingSplits.clear();
+ pendingSourceSplitCount = 0;
+ pendingSourceSplitsWeight = 0;
+ updateTaskStats();
+ splitQueueHasSpace.set(true);
+ whenSplitQueueHasSpace.complete(null, taskEventLoop);
+
+ // cancel pending request
+ if (currentRequest != null) {
+ // do not terminate if the request is already running to avoid closing pooled connections
+ currentRequest.cancel(false);
+ currentRequest = null;
+ currentRequestStartNanos = 0;
+ }
- // The remote task is likely to get a delete from the PageBufferClient first.
- // We send an additional delete anyway to get the final TaskInfo
- HttpUriBuilder uriBuilder = getHttpUriBuilder(getTaskStatus());
- Request.Builder requestBuilder = setContentTypeHeaders(binaryTransportEnabled, prepareDelete());
- if (taskInfoThriftTransportEnabled) {
- requestBuilder = ThriftRequestUtils.prepareThriftDelete(Protocol.BINARY);
- }
- Request request = requestBuilder
- .setUri(uriBuilder.build())
- .build();
+ taskStatusFetcher.stop();
+
+ // The remote task is likely to get a delete from the PageBufferClient first.
+ // We send an additional delete anyway to get the final TaskInfo
+ HttpUriBuilder uriBuilder = getHttpUriBuilder(getTaskStatus());
+ Request.Builder requestBuilder = setContentTypeHeaders(binaryTransportEnabled, prepareDelete());
+ if (taskInfoThriftTransportEnabled) {
+ requestBuilder = ThriftRequestUtils.prepareThriftDelete(Protocol.BINARY);
+ }
+ Request request = requestBuilder
+ .setUri(uriBuilder.build())
+ .build();
- scheduleAsyncCleanupRequest(createCleanupBackoff(), request, "cleanup");
+ scheduleAsyncCleanupRequest(createCleanupBackoff(), request, "cleanup");
+ });
}
@Override
- public synchronized void abort()
+ public void abort()
{
if (getTaskStatus().getState().isDone()) {
return;
@@ -1026,24 +1042,26 @@ public synchronized void abort()
abort(failWith(getTaskStatus(), ABORTED, ImmutableList.of()));
}
- private synchronized void abort(TaskStatus status)
+ private void abort(TaskStatus status)
{
- checkState(status.getState().isDone(), "cannot abort task with an incomplete status");
+ taskEventLoop.execute(() -> {
+ checkState(status.getState().isDone(), "cannot abort task with an incomplete status");
- try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
- taskStatusFetcher.updateTaskStatus(status);
+ try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
+ taskStatusFetcher.updateTaskStatus(status);
- // send abort to task
- HttpUriBuilder uriBuilder = getHttpUriBuilder(getTaskStatus());
- Request.Builder builder = setContentTypeHeaders(binaryTransportEnabled, prepareDelete());
- if (taskInfoThriftTransportEnabled) {
- builder = ThriftRequestUtils.prepareThriftDelete(thriftProtocol);
- }
+ // send abort to task
+ HttpUriBuilder uriBuilder = getHttpUriBuilder(getTaskStatus());
+ Request.Builder builder = setContentTypeHeaders(binaryTransportEnabled, prepareDelete());
+ if (taskInfoThriftTransportEnabled) {
+ builder = ThriftRequestUtils.prepareThriftDelete(thriftProtocol);
+ }
- Request request = builder.setUri(uriBuilder.build())
- .build();
- scheduleAsyncCleanupRequest(createCleanupBackoff(), request, "abort");
- }
+ Request request = builder.setUri(uriBuilder.build())
+ .build();
+ scheduleAsyncCleanupRequest(createCleanupBackoff(), request, "abort");
+ }
+ });
}
private void scheduleAsyncCleanupRequest(Backoff cleanupBackoff, Request request, String action)
@@ -1063,19 +1081,19 @@ private void doScheduleAsyncCleanupRequest(Backoff cleanupBackoff, Request reque
responseHandler = new ThriftResponseHandler(unwrapThriftCodec(taskInfoCodec));
Futures.addCallback(httpClient.executeAsync(request, responseHandler),
new ThriftResponseFutureCallback(action, request, cleanupBackoff),
- executor);
+ taskEventLoop);
}
else if (binaryTransportEnabled) {
responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoCodec);
Futures.addCallback(httpClient.executeAsync(request, responseHandler),
new BaseResponseFutureCallback(action, request, cleanupBackoff),
- executor);
+ taskEventLoop);
}
else {
responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoCodec);
Futures.addCallback(httpClient.executeAsync(request, responseHandler),
new BaseResponseFutureCallback(action, request, cleanupBackoff),
- executor);
+ taskEventLoop);
}
}
@@ -1144,20 +1162,17 @@ public void success(TaskInfo value)
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
long oldestTaskUpdateTime = 0;
- long currentRequestStartNanos;
- synchronized (HttpRemoteTask.this) {
- currentRequest = null;
- sendPlan.set(value.isNeedsPlan());
- currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
- if (!taskUpdateTimeline.isEmpty()) {
- oldestTaskUpdateTime = taskUpdateTimeline.getLong(0);
- }
- int deliveredUpdates = taskUpdateTimeline.size();
- while (deliveredUpdates > 0 && taskUpdateTimeline.getLong(deliveredUpdates - 1) > currentRequestLastTaskUpdate) {
- deliveredUpdates--;
- }
- taskUpdateTimeline.removeElements(0, deliveredUpdates);
+ currentRequest = null;
+ sendPlan.set(value.isNeedsPlan());
+ if (!taskUpdateTimeline.isEmpty()) {
+ oldestTaskUpdateTime = taskUpdateTimeline.getLong(0);
+ }
+ int deliveredUpdates = taskUpdateTimeline.size();
+ while (deliveredUpdates > 0 && taskUpdateTimeline.getLong(deliveredUpdates - 1) > currentRequestLastTaskUpdate) {
+ deliveredUpdates--;
}
+ taskUpdateTimeline.removeElements(0, deliveredUpdates);
+
updateStats(currentRequestStartNanos);
processTaskUpdate(value, sources);
updateErrorTracker.requestSucceeded();
diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java
index c2b0e6b3dd4c4..fb64550d0b147 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java
@@ -13,8 +13,6 @@
*/
package com.facebook.presto.server.remotetask;
-import com.facebook.airlift.concurrent.BoundedExecutor;
-import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
@@ -49,24 +47,18 @@
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airlift.units.Duration;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
import org.weakref.jmx.Managed;
-import org.weakref.jmx.Nested;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.server.thrift.ThriftCodecWrapper.wrapThriftCodec;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.Executors.newCachedThreadPool;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
public class HttpRemoteTaskFactory
implements RemoteTaskFactory
@@ -87,11 +79,6 @@ public class HttpRemoteTaskFactory
private final ConnectorTypeSerdeManager connectorTypeSerdeManager;
private final Duration taskInfoUpdateInterval;
- private final ExecutorService coreExecutor;
- private final Executor executor;
- private final ThreadPoolExecutorMBean executorMBean;
- private final ScheduledExecutorService updateScheduledExecutor;
- private final ScheduledExecutorService errorScheduledExecutor;
private final RemoteTaskStats stats;
private final boolean binaryTransportEnabled;
private final boolean thriftTransportEnabled;
@@ -101,6 +88,8 @@ public class HttpRemoteTaskFactory
private final MetadataManager metadataManager;
private final QueryManager queryManager;
private final DecayCounter taskUpdateRequestSize;
+ private final EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors(),
+ new ThreadFactoryBuilder().setNameFormat("task-event-loop-%s").setDaemon(true).build());
@Inject
public HttpRemoteTaskFactory(
@@ -135,10 +124,6 @@ public HttpRemoteTaskFactory(
this.taskInfoRefreshMaxWait = taskConfig.getInfoRefreshMaxWait();
this.handleResolver = handleResolver;
this.connectorTypeSerdeManager = connectorTypeSerdeManager;
-
- this.coreExecutor = newCachedThreadPool(daemonThreadsNamed("remote-task-callback-%s"));
- this.executor = new BoundedExecutor(coreExecutor, config.getRemoteTaskMaxCallbackThreads());
- this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreExecutor);
this.stats = requireNonNull(stats, "stats is null");
requireNonNull(communicationConfig, "communicationConfig is null");
binaryTransportEnabled = communicationConfig.isBinaryTransportEnabled();
@@ -181,18 +166,9 @@ else if (binaryTransportEnabled) {
this.metadataManager = metadataManager;
this.queryManager = queryManager;
- this.updateScheduledExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("task-info-update-scheduler-%s"));
- this.errorScheduledExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("remote-task-error-delay-%s"));
this.taskUpdateRequestSize = new DecayCounter(ExponentialDecay.oneMinute());
}
- @Managed
- @Nested
- public ThreadPoolExecutorMBean getExecutor()
- {
- return executorMBean;
- }
-
@Managed
public double getTaskUpdateRequestSize()
{
@@ -202,9 +178,7 @@ public double getTaskUpdateRequestSize()
@PreDestroy
public void stop()
{
- coreExecutor.shutdownNow();
- updateScheduledExecutor.shutdownNow();
- errorScheduledExecutor.shutdownNow();
+ eventLoopGroup.shutdownGracefully();
}
@Override
@@ -230,9 +204,6 @@ public RemoteTask createRemoteTask(
initialSplits,
outputBuffers,
httpClient,
- executor,
- updateScheduledExecutor,
- errorScheduledExecutor,
maxErrorDuration,
taskStatusRefreshMaxWait,
taskInfoRefreshMaxWait,
@@ -257,6 +228,7 @@ public RemoteTask createRemoteTask(
taskUpdateRequestSize,
handleResolver,
connectorTypeSerdeManager,
- schedulerStatsTracker);
+ schedulerStatsTracker,
+ eventLoopGroup.next());
}
}