Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test speed improvements on agg functions by modifying scan_parquet() arguments and using collect_async() #4

Open
claireboyd opened this issue Feb 19, 2024 · 1 comment

Comments

@claireboyd
Copy link
Collaborator

In thinking through opportunities to improve the speed of the aggregation functions (e.g. rank) on large parquet files, two real opportunities emerged:

In a few tests using 1.1MB and 1.2GB sized parquet files, there were a few key takeaways (see all data collected from tests in the summary table):

  • In general, collect_async() saw larger performance gains when tested on a larger file size (notably when using parallel=“auto”)
  • It seems like collect_async works best when one or both of use_statistics or hive_partitioning are turned on

The current implementation uses (collect(), parallel=’auto’, use_statistics=True, hive_partitioning=True). This would be most similar to the first row of the summary table with speed 762 (using this as the benchmark to create recommendations).

Here are recommendations of 3 tests to try based on the 1.2GB run:

  • collect(), parallel=’row_groups’, use_statistics=False, hive_partitioning=False (~15% improvement from benchmark)
  • collect_async(), parallel=’row_groups’, use_statistics=True, hive_partitioning=True (~15% improvement from benchmark)
  • collect(), parallel=’columns’, use_statistics=False, hive_partitioning=True (~13% improvement from benchmark)
@claireboyd
Copy link
Collaborator Author

@nmarchio Here's an example of how collect_async works (because it needs to be wrapped in a function & run by asyncio instead of piped into the process). The below code chunk uses the params for the second recommended test above:

INPUT_FILENAME = <filepath to parquet file>
PARAMS_test2 = {'parallel': "row_groups", 'use_statistics': True,'hive_partitioning': True}

async def test_collect_async(input_dir, **kwargs):
    return await (
        (pl.scan_parquet(Path(input_dir), low_memory=True, **kwargs)
        .with_columns([
            # REPLACE COL NAMES HERE FOR THE RELEVANT OPERATION
            (pl.col("AssdTotalValue").rank(method="random", descending = True, seed = 1).over(['SaleRecordingYear', "county_code"]).alias("highestvalbycountybyyear")),
        ])
        ).collect_async(streaming=True, simplify_expression=True)
    )

#Returns a DataFrame (not a LazyDataframe)
df = asyncio.run(test_collect_async(INPUT_FILENAME, **PARAMS_test2))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant