Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add auto scale writer support #11702

Closed
wants to merge 1 commit into from

Conversation

xiaoxmeng
Copy link
Contributor

Summary:
This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage, it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Dec 1, 2024
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

Copy link

netlify bot commented Dec 1, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 30706ee
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/67513f32569aff0008a0d07a

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 1, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 1, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 1, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 1, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 2, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 2, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 2, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 2, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 2, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

@xiaoxmeng xiaoxmeng changed the title Add auto scale writer support feat: Add auto scale writer support Dec 3, 2024
xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 3, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: oerling

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 4, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: arhimondr, oerling, zation99

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 4, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: arhimondr, oerling, zation99

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

xiaoxmeng added a commit to xiaoxmeng/velox that referenced this pull request Dec 4, 2024
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: arhimondr, oerling, zation99

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: arhimondr, oerling, zation99

Differential Revision: D66380785
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D66380785

@facebook-github-bot
Copy link
Contributor

This pull request has been merged in e40259f.

LocalPartitionNode(
const PlanNodeId& id,
Type type,
bool scaleWriter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxmeng how about putting this arg in the end, w/ a default value false?

It seems this feature is disabled by default.

athmaja-n pushed a commit to athmaja-n/velox that referenced this pull request Jan 10, 2025
Summary:
Pull Request resolved: facebookincubator#11702

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: arhimondr, oerling, zation99

Differential Revision: D66380785

fbshipit-source-id: 87a414516cc5011a26309db104c6d133c179e959
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. fb-exported Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants