Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Build hash table while adding input rows for left semi and anti join #7066

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

liujiayi771
Copy link
Contributor

As discussed in the issue, during the hash build process, left semi join and anti join can deduplicate the input rows based on the join key. However, Velox's hash build addInput process adds all inputs to the RowContainer, which can result in significant memory wastage in certain scenarios, such as TPCDS Q14 and Q95. To address this, we can construct the hash table directly during data input and utilize the existing allowDuplicates parameter of the hashTable to remove duplicate data without storing it in the RowContainer. This process is similar to constructing a hash table in the hash aggregation process.

Due to Velox's hash build potentially having multiple drivers executing, in this scenario, duplicate data can only be removed for individual driver inputs. However, in the case of single-driver execution mode, it is possible to remove all duplicate data.

@netlify
Copy link

netlify bot commented Oct 16, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 3d294db
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/67976a054fb08e0008546eca

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Oct 16, 2023
@liujiayi771 liujiayi771 marked this pull request as draft October 16, 2023 06:17
@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 16, 2023

I use Gluten to perform testing, and I have observed significant reductions in peak memory usage and execution time for hash build in the case of a 1TB TPCDS dataset Q95.
create hash table after all inputs have been added to RowContainer:

stream input rows to hash table, and deduplicate the input rows:

@liujiayi771
Copy link
Contributor Author

Hi @mbasmanova. Do you have any suggestions for this optimization?

@liujiayi771 liujiayi771 force-pushed the left-semi branch 3 times, most recently from 893ad56 to efb2eea Compare October 16, 2023 10:07
@@ -751,7 +751,7 @@ class HashTable : public BaseHashTable {
// or distinct mode VectorHashers in a group by hash table. 0 for
// join build sides.
int32_t reservePct() const {
return isJoinBuild_ ? 0 : 50;
return (isJoinBuild_ && rows_->nextOffset()) ? 0 : 50;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an unrelated fix? Maybe extract it into a separate PR and explain what it does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there is no variable in the HashTable that can represent whether duplicate data is allowed. However, rows_->nextOffset() can be used to indicate that duplicate data is not allowed. For example, in the HashTable of a left semi join, rows_->nextOffset() is 0. Introducing a new variable allowDuplicates in HashTable here may make it clearer.

I feel that if HashBuild builds a hash table while adding input rows, it should also reserve some values for new keys, just like agg. I'm not sure if my understanding of reservePct is correct, please correct me if I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that if HashBuild builds a hash table while adding input rows, it should also reserve some values for new keys, just like agg.

Makes sense. Let's make sure this code path is not activated when we are building hash table after seeing all the keys though.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Thank you for the optimization. Overall this makes sense to me. Just to clarify, my understand is the following.

Before this change, HashBuild operator first added all input data into RowContainer, then built a hash table over it. Now, HashBuild operator is building hash table while adding input rows. In case when HashBuild operator runs single-threaded there is no need to build hash table again after processing all input. In case when HashBuild operator runs multi-threaded it is still necessary to re-build the hash table after all input has been received an combined.

@mbasmanova mbasmanova requested a review from xiaoxmeng October 16, 2023 13:20
@liujiayi771
Copy link
Contributor Author

@mbasmanova Yes. For single-threaded scenarios, further optimization can be done to avoid rebuilding the hash table.

@liujiayi771 liujiayi771 changed the title Stream input rows to hash table when addInput for left semi and anti join Build hash table while adding input rows for left semi and anti join Oct 16, 2023
@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 19, 2023

Hi @mbasmanova. During the process of fixing the UT, I discovered that regardless of whether it is a single-threaded hash build or not, it is necessary to call table_->prepareJoinTable in the finishHashBuild to perform a rehash. Although the hash table is already correct and usable at this point, there may still be some rows that have not been added to uniqueValues_ in VectorHasher. This may result in the generated dynamic filter not including all the values. Therefore, during the final rehash, it will execute table->analyze to add all rows to uniqueValues_.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 19, 2023

Hi @xiaoxmeng.
I'm not very familiar with arbitration, could you give me some advice? Can the current modifications to the hash table build process be compatible with the current arbitration for HashBuild?

In addition, I made some modifications in SharedArbitrationTest in order to make the test pass. For example, the current anti join deduplicates and does not store non-join key columns, which reduces memory usage. Therefore, we may need to add a seed to ensure that newVector in SharedArbitrationTest generates a different vector each time it is called.

Additionally, it may be necessary to reduce the joinMemoryUsage in the tests to trigger the execution of "fakeAllocation". And I have changed the join key to the varchar type, allowing the fuzzer to generate more different join keys and occupy more memory. I'm not sure if what I'm doing is reasonable.

@liujiayi771 liujiayi771 marked this pull request as ready for review October 20, 2023 01:52
@liujiayi771
Copy link
Contributor Author

cc @Yohahaha

@liujiayi771
Copy link
Contributor Author

@xiaoxmeng Could you help review this change?

@xiaodouchen
Copy link
Contributor

@liujiayi771 @mbasmanova Is there any latest progress on this issue? I am very interested in it.

I've been exploring the implementation details and have a question regarding the efficiency of the hash table construction within the addInput method. In cases where the input data has a very low duplication rate, it will be inefficient to build the hash table at this stage given that it will finally re-build the hash table.

Additionally, for left semi join and anti join, is there an opportunity to only optimize by storing only the join keys in the RowContainer?

@mbasmanova
Copy link
Contributor

@xiaodouchen

or left semi join and anti join, is there an opportunity to only optimize by storing only the join keys in the RowContainer?

I believe this should be done in the optimizer. The query plan passed to Velox should not have non-join keys on the build side.

@mbasmanova
Copy link
Contributor

In cases where the input data has a very low duplication rate, it will be inefficient to build the hash table at this stage given that it will finally re-build the hash table.

@xiaodouchen That's a good point. Perhaps, this logic can be adaptive. If after processing some data it sees low duplication rate, it stops building a hash table.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Dec 13, 2023

@xiaodouchen @mbasmanova If the data has a very low duplication rate, it will indeed be time-consuming when rehashing at the end. A similar approach to the early abandonment in 'agg' can be adopted.

Regarding not storing non-join keys in the row container, I understand that this has already been implemented. You can see it here, but the actual situation is that these columns will not be passed down. The optimizer will pass only the join key columns through a project.

@xiaodouchen
Copy link
Contributor

@liujiayi771 @mbasmanova Thank you for your reply!

A similar approach to the early abandonment in 'agg' can be adopted.

Is there a planned roadmap to support this? Besides, what's the plan for this pr?

@liujiayi771 I noticed that TPCH includes some semi-join cases. Have you conducted any performance tests with TPCH to evaluate the impact of your changes? I'm particularly interested in knowing the performance improvements in TPCH.

@liujiayi771
Copy link
Contributor Author

@xiaodouchen The main purpose of this modification is to reduce memory consumption caused by duplicate data, and the performance improvement also primarily comes from the reduction in data volume. I have tested 10T TPCDS q14 and q95 under Spark + Velox and there is a very significant performance improvement. Memory usage has dropped by more than a hundredfold for join operator, and execution speed has increased by more than 20%.

In the Velox's own TPCH benchmark, due to the smaller data volume, the improvement in speed may not be as noticeable. I can run a TPCH test on my cluster using Spark in the next couple of days to see the memory usage and speed improvement of the semi join, and I will reply with the results afterwards. I will look into optimizations based on data duplication rates in the next two weeks.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Dec 21, 2023

Hi @xiaodouchen, I conducted a simple test on TPCH 1TB q20, q21, and q22. The optimizations made in this PR did not have a significant impact on these three queries. This could be due to the duplication rate of the build-side data. There was not much improvement in terms of execution time for these queries; the time remained mostly the same. The memory usage for q20 and q21 was also similar, with only q22 showing a noticeable decrease in memory usage for left semi-join.

image image

@aditi-pandit
Copy link
Collaborator

@liujiayi771 : Thanks for this code. Would you be able to write a micro-benchmark that we can measure and review ? It would help us tune the config parameters as well.

@liujiayi771
Copy link
Contributor Author

@aditi-pandit Sure, I will add some benchmarks related to performance and memory usage.

@aditi-pandit aditi-pandit self-requested a review October 30, 2024 17:57
@czentgr
Copy link
Collaborator

czentgr commented Oct 31, 2024

@liujiayi771 FYI, there are test failures in the Ubuntu build in the HashJoinTest test. So this could indicate problems?

E.g.

 [ RUN      ] HashJoinTest.joinBuildSpillError
/home/runner/work/velox/velox/velox/velox/exec/tests/HashJoinTest.cpp:7659: Failure
Failed
Expected an exception
...

@liujiayi771
Copy link
Contributor Author

@liujiayi771 FYI, there are test failures in the Ubuntu build in the HashJoinTest test. So this could indicate problems?

I will try to run the tests in an ubuntu docker env, thanks.

@czentgr
Copy link
Collaborator

czentgr commented Nov 1, 2024

@liujiayi771 Thank you!. Can you please also rebase and squash the commits and a single commit helps tracking it.

@aditi-pandit
Copy link
Collaborator

@liujiayi771 : Please can you also add documentation of the properties in https://github.com/facebookincubator/velox/blob/main/velox/docs/configs.rst

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Nov 4, 2024

@liujiayi771 : Please can you also add documentation of the properties in https://github.com/facebookincubator/velox/blob/main/velox/docs/configs.rst

@aditi-pandit Sure, I've been quite busy recently. I expect to complete the benchmark and other modifications in about two weeks.

@liujiayi771 : No worries. Take your time.

@liujiayi771
Copy link
Contributor Author

@czentgr I was able to reproduce the issue you mentioned in the Ubuntu Docker env. I suspect that it is related to the fact that after I modified the left semi/anti join, the memory consumption decreased, which led to the triggering of spills being different from before, resulting in the test results not meeting expectations.
For joinBuildSpillError test case, I changed kMemoryCapacity to 27 << 20, and it passed.

@liujiayi771 liujiayi771 changed the title Build hash table while adding input rows for left semi and anti join feat: Build hash table while adding input rows for left semi and anti join Nov 24, 2024
@@ -377,6 +381,15 @@ bool isLeftNullAwareJoinWithFilter(
joinNode->isNullAware() && (joinNode->filter() != nullptr);
}

bool canDropDuplicates(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method be moved to HashJoinNode PlanNode itself ?

if (keyChannelSet.find(i) == keyChannelSet.end()) {
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
if (!canDropDuplicates(joinNode)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would write this a slightly different way to make it more readable

if (canDropDuplicates(joinNode)) {
    // For left semi and anti join with no extra filter, hash table does not
    // store dependent columns.
  return ROW(std::move(names), std::move(types));
}

for (auto i = 0; i < inputType->size(); ++i) {
  if (keyChannelSet.find(i) == keyChannelSet.end()) {
    names.emplace_back(inputType->nameOf(i));
    types.emplace_back(inputType->childAt(i));
  }
}
return ROW(std::move(names), std::move(types));

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
if (numDependents > 0) {
if (!dropDuplicates_ && numDependents > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the if condition and for loop for the keys can be put inside a single if condition for if (!dropDuplicates) instead of adding it in both places.

@@ -373,6 +380,31 @@ void HashBuild::addInput(RowVectorPtr input) {
return;
}

if (dropDuplicates_ && !abandonBuildNoDupHash_) {
const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct());
numInputRows_ += activeRows_.countSelected();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like numInputRows_ are correctly tracked in dropDuplicates only. Maybe change the name if it is numHashInputRows_ instead ?

@liujiayi771 liujiayi771 force-pushed the left-semi branch 2 times, most recently from ae02038 to 17e1303 Compare December 1, 2024 14:02
@czentgr
Copy link
Collaborator

czentgr commented Jan 22, 2025

@liujiayi771 Please rebase.
@aditi-pandit Can we close this out eventually?

@liujiayi771 liujiayi771 force-pushed the left-semi branch 2 times, most recently from 0f254d8 to 2ea490d Compare January 24, 2025 02:21
@aditi-pandit
Copy link
Collaborator

@xiaoxmeng , @Yuhta : Please can you take a look at this code. This is very helpful for TPC-DS queries.

Copy link
Collaborator

@aditi-pandit aditi-pandit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 : Thanks for this code. It is shaping well. Do you have a sample benchmark output to share ?

@@ -755,7 +755,7 @@ DEBUG_ONLY_TEST_P(
folly::EventCount taskPauseWait;
auto taskPauseWaitKey = taskPauseWait.prepareWait();

const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you make this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses an anti-join, and after this optimization, the memory usage for this test case has decreased significantly. If the fakeAllocationSize is too large, spilling will not be triggered.

.run();
}

TEST_F(HashJoinTest, antiJoinAbandomBuildNoDupHashEarly) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit : spelling "abandon"

#include "velox/exec/tests/utils/VectorTestUtil.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

#include <folly/Benchmark.h>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these includes before the velox ones.

@liujiayi771
Copy link
Contributor Author

@liujiayi771 : Thanks for this code. It is shaping well. Do you have a sample benchmark output to share ?

You can check the benchmark output in #11212.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants