Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aling/test pandas perf #1990

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne

# Release Notes

- v3.12.0(TBD)
- Optimized `to_pandas()` performance by fully parallel downloading logic.


- v3.11.0(June 17,2024)
- Added support for `token_file_path` connection parameter to read an OAuth token from a file when connecting to Snowflake.
- Added support for `debug_arrow_chunk` connection parameter to allow debugging raw arrow data in case of arrow data parsing failure.
Expand Down
35 changes: 7 additions & 28 deletions src/snowflake/connector/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import inspect
from collections import deque
from concurrent.futures import Future
from concurrent.futures import ALL_COMPLETED, Future, wait
from concurrent.futures.thread import ThreadPoolExecutor
from logging import getLogger
from typing import (
Expand Down Expand Up @@ -63,41 +63,20 @@ def result_set_iterator(
"""

with ThreadPoolExecutor(prefetch_thread_num) as pool:
# Fill up window

logger.debug("beginning to schedule result batch downloads")

for _ in range(min(prefetch_thread_num, len(unfetched_batches))):
while unfetched_batches:
logger.debug(
f"queuing download of result batch id: {unfetched_batches[0].id}"
)
unconsumed_batches.append(
pool.submit(unfetched_batches.popleft().create_iter, **kw)
)

future = pool.submit(unfetched_batches.popleft().create_iter, **kw)
unconsumed_batches.append(future)
yield from first_batch_iter

_, _ = wait(unconsumed_batches, return_when=ALL_COMPLETED)
i = 1
while unconsumed_batches:
logger.debug(f"user requesting to consume result batch {i}")

# Submit the next un-fetched batch to the pool
if unfetched_batches:
logger.debug(
f"queuing download of result batch id: {unfetched_batches[0].id}"
)
future = pool.submit(unfetched_batches.popleft().create_iter, **kw)
unconsumed_batches.append(future)

future = unconsumed_batches.popleft()

# this will raise an exception if one has occurred
batch_iterator = future.result()

logger.debug(f"user began consuming result batch {i}")
yield from batch_iterator
logger.debug(f"user finished consuming result batch {i}")

yield from unconsumed_batches.popleft().result()
logger.debug(f"user began consuming result batch {i}")
i += 1
final()

Expand Down
Loading