-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Assign event loop to tasks and run a task's methods in the same loop #24288
base: master
Are you sure you want to change the base?
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
@@ -101,6 +104,9 @@ public class HttpRemoteTaskFactory | |||
private final MetadataManager metadataManager; | |||
private final QueryManager queryManager; | |||
private final DecayCounter taskUpdateRequestSize; | |||
// TODO: use config file to set this value | |||
private final EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(2000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not worth setting it to a value higher then the number of CPU's available on the system
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
@@ -257,6 +264,7 @@ public RemoteTask createRemoteTask( | |||
taskUpdateRequestSize, | |||
handleResolver, | |||
connectorTypeSerdeManager, | |||
schedulerStatsTracker); | |||
schedulerStatsTracker, | |||
eventLoopGroup.next()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the task still need an executor
? Also worth checking usages of updateScheduledExecutor
/errorScheduledExecutor
and see if all are proper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executor
is used to run StateChangeListener
s attached. This is another thing that seems to be broken. The StateChangeListener
has to have an executor attached. It shouldn't be a responsibility of the event producer to provide a thread where to run a foreign callback. Instead a callback should be provided with an executor to run by the creator of a callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally make sense. Let me create a separate pr (given that this may require big changes) to let the creator of the listener to handle its own execution. Learned a lot from your precious comment. Really appreciate it.
taskStatusFetcher.start(); | ||
taskInfoFetcher.start(); | ||
} | ||
taskEventLoop.execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change remove all synchronized sections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr yes, all synchronized sections are now async. The only thing I am not certain is this change https://github.com/prestodb/presto/pull/24288/files#r1894673304. The original synchronized implementation returns a future but now we will need to notify the listener in a sortof async way. I wonder what your thought is on this. So far, I am testing the code on verifier and things look good. And we might be able to get minimal performance gain if we use regular variables for those atomic ones since they dont need to be atomic any more.
7bb112b
to
9e5e4a7
Compare
Very nice change. So I understand, is the effect of this to reduce the number of IO threads related to task management from # of tasks to # of queries? |
@tdcmeehan Yeah, the idea is to reduce the number of event processing threads and remove synchronization with the eventual goal of coordinator supporting more tasks running concurrently. Currently we are seeing scalability issues when the number of tasks grows beyond 30-50K. Dispatching events on standard Java thread pools becomes a problem. |
9e5e4a7
to
a9961b8
Compare
a9961b8
to
6eaec15
Compare
if (splitQueueHasSpace) { | ||
future.set(null); | ||
} | ||
whenSplitQueueHasSpace.createNewListener().addListener(() -> future.set(null), directExecutor()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Placeholder]
@shangm2 This is a great change. Please add details in testing section if you have performed any shadow tests. Will be great if we can run it through problematic traffic and show before/after with this change. |
Yep. I am making screenshots about the result of running verifier agains this change. One issue is this pr changes the way we use thread pool and I am not sure which metrics we can use to compare before and after the change. What do you think @arhimondr ? Can you suggest some metrics I can use to confirm the change? Thanks. I did run some strobelight and did not see anything suspicious: https://fburl.com/scuba/strobelight_java_asyncprofiler/on_demand/3qo6j1h9 |
{ | ||
taskUpdateTimeline.add(System.nanoTime()); | ||
executor.execute(this::sendUpdate); | ||
taskEventLoop.execute(this::sendUpdate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can call sendUpdate directly here, since sendUpdate itself sends a callback to an event loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch. thanks!
whenSplitQueueHasSpaceThreshold = OptionalLong.of(weightThreshold); | ||
updateSplitQueueSpace(); | ||
} | ||
if (splitQueueHasSpace) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make splitQueueHasSpace
volatile
and keep the shortcut. There is a chance it may be important.
@@ -234,6 +230,8 @@ public final class HttpRemoteTask | |||
private final DecayCounter taskUpdateRequestSize; | |||
private final SchedulerStatsTracker schedulerStatsTracker; | |||
|
|||
private final EventLoop taskEventLoop; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Atomic fields no longer need to be atomic (
nextSplitId
,needsUpdate
,sendPlan
,started
,aborted
,outputBuffers
). It is better to remove the atomic as it may suggest that the variable is accesses from multiple threads concurrently. - Remove
GuardedBy
annotations, as the accesses are no longer protected by a mutex. Instead you may want to add a comment at the beginning of the class and explain what is going on in this class (e.g.: that all interactions with the TaskState have to be done via an event loop)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updating the code.
@@ -367,10 +361,10 @@ public HttpRemoteTask( | |||
initialTask.getTaskStatus(), | |||
taskStatusRefreshMaxWait, | |||
taskStatusCodec, | |||
executor, | |||
taskEventLoop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just pass it once and call it taskEventLoop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I was trying not to modify the code in other files and keep this pr straightforward but looks like TaskInfoFetcher and ContinuousTaskStatusFetcher are only being used here. Let me update them as well.
executor, | ||
updateScheduledExecutor, | ||
errorScheduledExecutor, | ||
taskEventLoop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -604,7 +609,7 @@ public void onFailure(Throwable failedReason) | |||
doRemoveRemoteSource(errorTracker, request, future); | |||
} | |||
else { | |||
errorRateLimit.addListener(() -> doRemoveRemoteSource(errorTracker, request, future), errorScheduledExecutor); | |||
errorRateLimit.addListener(() -> doRemoveRemoteSource(errorTracker, request, future), taskEventLoop); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run the callback itself in the event loop as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Thanks for catching this!
taskInfoFetcher.start(); | ||
} | ||
taskEventLoop.execute(() -> { | ||
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend not to override the thread name (here and in other places). It is easier to diagnose threading related issues when the threads are named consistently (e.g.: allowing you to find all threads belonging to a certain thread pool and all related events for it)
httpClient, | ||
maxErrorDuration, | ||
errorScheduledExecutor, | ||
taskEventLoop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do the same exercise (removing synchronization) for the ContinuousTaskStatusFetcher
and the TaskInfoFetcher
It would be great if we could stress test this change (run many queries with many stages concurrently) and see if there are any major bottlenecks by profiling the event loop executor for both on-cpu and off-cpu activity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the change. Thank you for working on it
Noted. Let me resolve the comments and run tableau queries against the change. |
Description
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
If release note is NOT required, use: