Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add enhanced cte scheduling mode #24108

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jaystarshot
Copy link
Member

@jaystarshot jaystarshot commented Nov 21, 2024

Description

Fixes #22205

Enhance the scheduler to selectively block only the TableScan stages that depend on incomplete CTE TableWriter stages, rather than blocking all dependent sections.

Previously, all sections relying on a CTE writer were blocked until the writer completed.
With this change, only the specific TableScans referencing the CTE are delayed, allowing other stages to proceed. This optimization can significantly improve query latency.

For example for the materialized CTE T:

WITH T AS (SELECT * FROM tpch.orders) 
SELECT * 
FROM T 
JOIN (SELECT * FROM customer) b 
ON t.uuid = b.uuid;

In this query, the right side of the join can be executed and kept in memory concurrently while the CTE write operation completes.

2 commits

  1. Add the CTE used info to tablescans and table finish
  2. Use this info in scheduling by adding a central manager per scheduler and then using this to unblock/wake up tasks and to maintain completed ctes.

Can cause more resource utilization in some cases where intermediate results get blocked due to written ctes but that may also happen without any materialization

Motivation and Context

Impact

Test Plan

Existing UTs + prod queries

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Improve scheduling for CTE materialization: Now, only the stages containing CTE table scans that reference CTE table write stages are blocked till the write is complete, instead of the entire query being blocked as was the case previously. This is controlled by the session property ``enhanced_cte_scheduling_enabled`` (on by default) :pr:`24108`

@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 3 times, most recently from 5a5165e to 64a3b17 Compare November 21, 2024 12:08
@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 8 times, most recently from c4d1cab to 0cb4582 Compare December 3, 2024 00:43
@jaystarshot jaystarshot changed the title NA Add scheduling mode to selectively block only the TableScan stages dependent on incomplete CTE TableWriter stages instead of blocking all dependent sections Dec 3, 2024
@jaystarshot jaystarshot changed the title Add scheduling mode to selectively block only the TableScan stages dependent on incomplete CTE TableWriter stages instead of blocking all dependent sections Add enhanced cte scheduling mode Dec 3, 2024
List<ListenableFuture<?>> blocked = new ArrayList<>();

// CTE Materialization Check
if (stage.requiresMaterializedCTE()) {
Copy link
Member Author

@jaystarshot jaystarshot Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main change blocking the tablescans

@@ -278,6 +280,17 @@ else if (state == CANCELED) {

for (StageExecutionAndScheduler stageExecutionInfo : stageExecutions.values()) {
SqlStageExecution stageExecution = stageExecutionInfo.getStageExecution();
// Add a listener for state changes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a listener to stages with tablefinish which updates our tracker

@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch from 0cb4582 to f730ef3 Compare December 3, 2024 04:47
@jaystarshot jaystarshot marked this pull request as ready for review December 3, 2024 04:47
@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 2 times, most recently from a786c7a to 8cc3af6 Compare December 3, 2024 07:20
Comment on lines 609 to 610
.map(planNode -> ((TableFinishNode) planNode).getTemporaryTableInfo().orElseThrow(
() -> new IllegalStateException("TableFinishNode has no TemporaryTableInfo")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map(planNode -> ((TableFinishNode) planNode).getTemporaryTableInfo().orElseThrow(
() -> new IllegalStateException("TableFinishNode has no TemporaryTableInfo")))
.flatMap(planNode -> ((TableFinishNode) planNode).getTemporaryTableInfo())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to throw exp

{
return PlanNodeSearcher.searchFrom(planFragment.getRoot())
.where(planNode -> (planNode instanceof TableFinishNode))
.findAll().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should verify there is only one TableFinishNode using something like getOnlyElement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use findSingle() for this. Also, instead of streaming again and then anyMatch, you can move all of that into the where condition above.

Comment on lines +198 to +203
// If any CTE is not materialized, return a blocked ScheduleResult
if (!blocked.isEmpty()) {
return ScheduleResult.blocked(
false,
newTasks,
whenAnyComplete(blocked),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to wrap my head around the logic here:

If any CTE is not materialized, return a blocked ScheduleResult

So the logic should be, in order to return a result which isn't blocked, then the blocked list should be empty. This makes sense based on the blocked.isEmpty() condition.

However, the blocked future argument is whenAnyComplete(blocked). Shouldn't this be something like whenAllComplete? Or is there some nuance to what the blocked argument should be to the ScheduleResult.blocked function? I am not very familiar with this portion of the codebase.

Copy link
Member Author

@jaystarshot jaystarshot Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any or all don't matter since blocked list just contains one future here (since one tablescan will be blocked for just one CTE), however this code generalizes this and doesn't make this assumption, in that case when any one is complete, the future needs to complete so that the scheduling happens again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for tow CTE temporary tables to be in the same stage if their partitions are compatible? For example in a join stage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so FixedSourcePartitionedScheduler is just for the stages reading from bucketed tables. As far as I know presto doesn't allow multiple tablescans in one stage. But this code is still resilient

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, for hive connector, it's the bucketed tables that will expose their partitioning property into HivePartitioningHandle when considering colocated joins. So when left and right tables in a join have compatible bucket definition exactly on the join keys, they will be left into the same stage.

Currently, the tableScanNode for the temporary table caused by a CTE does not inherit the bucket feature of its source table, so seems that it won't stay in the same stage with other tableScanNodes. Do we have any plan to support inheriting the bucket feature of the source table in future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit confused here. A stage can have multiple CTE dependencies if they have the same bucketing, so seems like we shouldn wait for all blocked futures to complete, right? (this isn't broken now, as if we unblock when some complete, we'll just do this check again, just want to make sure i'm understanding the flow properly)

Copy link
Member Author

@jaystarshot jaystarshot Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct we cancel all blocked futures and shdule again (do the check again) when any/some complete and we just have to wait for any one to complete.

@steveburnett
Copy link
Contributor

Please add the PR number to the release note entry.

:pr:`12345`

@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch from 3602e0a to 2af6a32 Compare December 5, 2024 07:39
@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 3 times, most recently from 29a9a2f to 0b0e947 Compare December 5, 2024 09:53
steveburnett
steveburnett previously approved these changes Dec 5, 2024
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull branch, local doc build, looks good. Thanks!

@kaikalur
Copy link
Contributor

kaikalur commented Dec 5, 2024

In this query, the right side of the join can be executed and kept in memory concurrently while the CTE write operation completes.

Won't that increase memory for the query?

@jaystarshot
Copy link
Member Author

jaystarshot commented Dec 5, 2024

Peak memory won ‘t but total should unfortunately no way to avoid that till we read concurrently while we write maybe with a new exchange

@tdcmeehan tdcmeehan self-assigned this Dec 6, 2024
@jaystarshot
Copy link
Member Author

@rschlussel Can you please take a look

}
}

public synchronized boolean hasBeenMaterialized(String cteName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this synchronized? don't think it should be necessary with the ConcurrentHashMap, but if it is necessary, you'd need to synchronize the writes too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct this is not needed

public ListenableFuture<Void> getFutureForCTE(String cteName)
{
return materializationFutures.compute(cteName, (key, existingFuture) -> {
if (existingFuture == null || existingFuture.isCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we change isCancelled? Seems like that could be a race condition anyway (gets cancelled by another thread after we check the condition).

Copy link
Member Author

@jaystarshot jaystarshot Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we know, the Scheduler’s schedule() method is single-threaded. When the CTE writer stage (or another blocked stage) becomes unblocked here, the schedule() method resumes and cancels all pending futures here. Then it will try to schedle all blocked stages again and we will call FixedSourcePartitionedScheduler’s schedule() method, at which point a cancelled future will appear in the CTEMaterializationMap.

I think you are correct when you say that a race condition can appear in some extremely unlikely case, will need to take a deeper dive on the fix here because it doesn't seem trivial. We could check and not cancel the cte futures in the schedule() method but then it would need the info passed to the future.

Copy link
Member Author

@jaystarshot jaystarshot Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can add make all methods of CTEMaterializationTracker synchronized and remove all futures from the materializationFutures map when the shedule() method resumes and cancels all pending futures here. Then it should be threadsafe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ignore the fact that it's been canceled?

Copy link
Member Author

@jaystarshot jaystarshot Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since our map in CTEMaterializationTracker is storing the cancelled future, the FixedSourceCountScheduler schedule method will return this cancelled future (which was cancelled by a previous loop of the scheduling process).
I think that we need to make sure that the FixedSourcePartitionedScheduler returns a fresh non cancelled future and that can be only done if we remove the cancelled futures from the CTEMaterializationTracker's map

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code with these ideas!

Copy link
Member Author

@jaystarshot jaystarshot Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(main change clearing all futues)

{
if (materializedCtes.putIfAbsent(cteName, true) == null) {
SettableFuture<Void> future = materializationFutures.get(cteName);
if (future != null && !future.isCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about checking isCancelled().

/*
* Tracks whether tablefinish nodes writing temporary tables for CTE Materialization are complete
*/
@ThreadSafe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Presto we expect that everything is thread safe unless it's specifically noted as not thread safe.

@@ -179,6 +183,29 @@ public ScheduleResult schedule()
{
// schedule a task on every node in the distribution
List<RemoteTask> newTasks = ImmutableList.of();
List<ListenableFuture<?>> blocked = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this inside the if(stage.requiresMaterializedCTE()) condition.

Comment on lines +198 to +203
// If any CTE is not materialized, return a blocked ScheduleResult
if (!blocked.isEmpty()) {
return ScheduleResult.blocked(
false,
newTasks,
whenAnyComplete(blocked),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit confused here. A stage can have multiple CTE dependencies if they have the same bucketing, so seems like we shouldn wait for all blocked futures to complete, right? (this isn't broken now, as if we unblock when some complete, we'll just do this check again, just want to make sure i'm understanding the flow properly)

@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 3 times, most recently from d7a577d to cde0c24 Compare December 18, 2024 09:16
if (existingFuture == null) {
return SettableFuture.create();
}
checkArgument(!existingFuture.isCancelled(), "CTE future was found in a cancelled state");
Copy link
Contributor

@rschlussel rschlussel Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still not thread safe, as the cancellation isn't atomic with clearing the futures, so it's possible to hit this condition (future cancelled, something tries to get the future before clearAllFutures is called). It's also possible for the future to be canceled after it's returned, but before the consumer checks it or even before it's returned, but after this argument check. I think it would be better to have the code consuming the futures gracefully handle receiving a cancelled future and not have to worry about synchronization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part 1 (future canceled, something tries to get the future before clearAllFutures is called) can be fixed by clearing all before canceling.

For part 2 It's also possible for the future to be canceled after it's returned, but before the consumer checks it or even before it's returned, but after this argument check
I think this will never happen as blocked.cancel() is invoked in SqlQueryScheduler only after all stages are scheduled. At that point, all futures have already been returned to the SqlQueryScheduler, so the consumer will have completed scheduling (and return the blocked result) before this happens.

I think it would be better to have the code consuming the futures gracefully handle receiving a canceled future and not have to worry about synchronization.

I think the code consuming the future can be other Schedulers and making them handle canceled futures might be challenging (not 100% sure if that is even needed yet)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's how it works today, but it's not really thread safe, as there's no guarantee that some other thread holding the future won't cancel it or that someone won't reorder the calling code without realizing the dependency and break the assumptions that clearAllFutures is always called before cancelling any future in the map. At the very least this needs some very clear code comments about the assumptions this class is making and requirements for anything using it.

@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 2 times, most recently from 4ae3bb9 to f24338f Compare December 19, 2024 04:39
Copy link

github-actions bot commented Dec 19, 2024

Codenotify: Notifying subscribers in CODENOTIFY files for diff 8136f7c...28de7bc.

No notifications.

@@ -460,6 +474,7 @@ else if (!result.getBlocked().isDone()) {
ScheduleResult.BlockedReason blockedReason = result.getBlockedReason().get();
switch (blockedReason) {
case WRITER_SCALING:
case WAITING_FOR_CTE_MATERIALIZATION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some schedulerStats for this so you can track how often this state happens.

@@ -513,6 +528,10 @@ else if (!result.getBlocked().isDone()) {
try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
}
if (isEnhancedCTESchedulingEnabled(session)) {
// clear all cte materialization futures in the current schedule loop
cteMaterializationTracker.clearAllFutures();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could there be materialization futures for stages that are not blocked that are now cancelled? I guess that's fine because we finished the scheduling loop, so nothing is waiting on them.


private final Map<String, Boolean> materializedCtes = new HashMap<>();

public synchronized ListenableFuture<Void> getFutureForCTE(String cteName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to avoid all this locking. it seems like right now it's only needed to make the materializationFutures and materializedCTE calls atomic (because aside from that we still make assumptions about when cancel is or is not called, so it's not providing any thread safety with regard to the cancellation calls).

I'm thinking it would be better to use a single ConcurrentHashMap<>, and then not clear the futures before cancellation and instead check in getFutureForCTE() that it hasn't been cancelled, and if it has create a new one (like the code you had before). something like the following.

private final Map<String, SettableFuture<Void>> materializationFutures = new ConcurrentHashMap<>();

public ListenableFuture<Void> getFutureForCTE(String cteName)
    {
         return materializationFutures.compute(cteName, (key, existingFuture) -> {
            if (existingFuture == null || existingFuture.iscancelled()) {
                return SettableFuture.create();
            }
            return existingFuture;
        });

   public void markCTEAsMaterialized(String cteName)
    {
         materializationFutures.compute(cteName, (key, existingFuture) -> {
            if (existingFuture == null || existingFuture.iscancelled()) {
                SettableFuture completedFuture = SettableFuture.create();
                completedFuture.set(null);
                return completedFuture;
            }
            existingFuture.set(null); // notify all listeners
            return existingFuture;
        });


Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats better than the current code and will fix the race condition 1 which you described.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not happy with the assumptions that we are making, will brainstorm for a better way but it won't be trivial

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public void markCTEAsMaterialized(String cteName)
    {
        materializationFutures.compute(cteName, (key, existingFuture) -> {
            if (existingFuture == null || existingFuture.isCancelled()) {
                SettableFuture completedFuture = SettableFuture.create();
                completedFuture.set(null);
                return completedFuture;
            }
            existingFuture.set(null); // notify all listeners
            return existingFuture;
        });
    }

Also here completing the future when its cancelled

  if (existingFuture == null || existingFuture.isCancelled()) {
                SettableFuture completedFuture = SettableFuture.create();
                completedFuture.set(null);
                return completedFuture;
            }

is not really useful since there will be no listeners on the new future. the entire goal of the markCTEAsMaterialized() is to unblock the stages which were waiting on this future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of setting a new completed future if the original one was canceled is in order to add it to the map. that way when someone calls getFutureForCTE() they will get a future that is done.

Copy link
Member Author

@jaystarshot jaystarshot Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I think I found a good fix. If we check the FixedSourcePatitionScheduler code, we can see that it actually returns a wrapped future in whenAnyComplete. This wrapped future is the one which can be cancelled. I also debugged and verified that if this is cancelled the inner ones are also cancelled (maybe because we cancelled with mayInterruptIfRunning?)
However if we make sure that the ctematerialization tracker always returns a Futures.nonCancellationPropagating here then the future in the tracker can never be cancelled so in a way we have handled cancellations.

if (!blocked.isEmpty()) {
                return ScheduleResult.blocked(
                        false,
                        newTasks,
                        whenAnyComplete(blocked),
                        BlockedReason.WAITING_FOR_CTE_MATERIALIZATION,
                        0);
            }

@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 2 times, most recently from b358c47 to 00d5f06 Compare December 20, 2024 01:23
@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch 2 times, most recently from 3b5fc10 to 6f45ff7 Compare December 20, 2024 05:56
@jaystarshot jaystarshot force-pushed the jay-improv-schedule-oss branch from 6f45ff7 to 28de7bc Compare December 20, 2024 06:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants