Skip to content

Commit

Permalink
source-braintree-native: make fetch functions non-blocking with `awai…
Browse files Browse the repository at this point in the history
…t asyncio.sleep(0)`

The Braintree SDK's API calls are all synchronous, so they block the
event loop, meaning only a subset of streams can make progress at a time.
Adding `await asyncio.sleep(0)` to the top of the fetch functions allows
other streams to make progress since these functions now yield to the
event loop.
  • Loading branch information
Alex-Bair committed Dec 10, 2024
1 parent 1c80e71 commit e33cc52
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions source-braintree-native/source_braintree_native/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from braintree import (
BraintreeGateway,
AddOnGateway,
Expand Down Expand Up @@ -31,9 +33,11 @@


def _search_limit_error_message(count: int, name: str) -> str:
msg = f"{count} {name} returned in a single search which is "
f"greater than or equal to Braintree's documented maximum for a single {name} search. "
"Reduce the window size and backfill this stream."
msg = (
f"{count} {name} returned in a single search which is "
f"greater than or equal to Braintree's documented maximum for a single {name} search. "
"Reduce the window size and backfill this stream."
)

return msg

Expand Down Expand Up @@ -73,13 +77,15 @@ def _braintree_object_to_dict(braintree_object):
data.pop('_setattrs', None)
return data


# TODO(bair): Refactor snapshot_ and fetch_ functions to make asynchronous API requests instead of synchronous requests.
async def snapshot_resources(
braintree_gateway: BraintreeGateway,
gateway_property: str,
gateway_response_field: str | None,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
resources = getattr(braintree_gateway, gateway_property).all()

iterator = getattr(resources, gateway_response_field) if gateway_response_field else resources
Expand All @@ -93,6 +99,8 @@ async def fetch_transactions(
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(hours=window_size)
Expand Down Expand Up @@ -127,6 +135,8 @@ async def fetch_customers(
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(hours=window_size)
Expand Down Expand Up @@ -161,6 +171,8 @@ async def fetch_credit_card_verifications(
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(hours=window_size)
Expand Down Expand Up @@ -195,6 +207,8 @@ async def fetch_subscriptions(
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(hours=window_size)
Expand Down Expand Up @@ -229,6 +243,8 @@ async def fetch_disputes(
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
# The start date must be shifted back 1 day since we have to query Braintree using the received_date field,
Expand Down

0 comments on commit e33cc52

Please sign in to comment.