Skip to content

Commit

Permalink
source-braintree-native: run synchronous API calls in separate threads
Browse files Browse the repository at this point in the history
The Braintree SDK make synchronous HTTP requests, which was blocking the
main thread and preventing separate streams from sending concurrent API
calls.

Any HTTP requests made by the Braintree SDK are now wrapped in
`asyncio.to_thread`, which runs the function in a separate thread. This
unblocks the main thread and lets separate streams make concurrent API
calls.
  • Loading branch information
Alex-Bair committed Dec 11, 2024
1 parent c611944 commit 82556d6
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions source-braintree-native/source_braintree_native/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from braintree import (
BraintreeGateway,
AddOnGateway,
Expand All @@ -7,8 +8,10 @@
CreditCardVerificationSearch,
DisputeSearch,
SubscriptionSearch,
ResourceCollection,
)
from braintree.attribute_getter import AttributeGetter
from braintree.paginated_collection import PaginatedCollection
from datetime import datetime, timedelta, UTC
from logging import Logger
from typing import AsyncGenerator
Expand Down Expand Up @@ -75,14 +78,30 @@ def _braintree_object_to_dict(braintree_object):
data.pop('_setattrs', None)
return data

# TODO(bair): Refactor snapshot_ and fetch_ functions to make asynchronous API requests instead of synchronous requests.

# Braintree's SDK makes synchronous API requests, which prevents multiple streams from
# sending concurrent API requests. asyncio.to_thread is used as a wrapper to run these
# synchronous API calls in a separate thread and avoid blocking the main thread's event loop.
async def _async_iterator_wrapper(collection: ResourceCollection | PaginatedCollection):
def _braintree_iterator(collection: ResourceCollection | PaginatedCollection):
for object in collection.items:
yield object

it = await asyncio.to_thread(_braintree_iterator, collection)

for object in it:
yield object


async def snapshot_resources(
braintree_gateway: BraintreeGateway,
gateway_property: str,
gateway_response_field: str | None,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:
resources = getattr(braintree_gateway, gateway_property).all()
resources = await asyncio.to_thread(
getattr(braintree_gateway, gateway_property).all
)

iterator = getattr(resources, gateway_response_field) if gateway_response_field else resources
for object in iterator:
Expand All @@ -100,13 +119,14 @@ async def fetch_transactions(
window_end = log_cursor + timedelta(hours=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.transaction.search(
TransactionSearch.created_at.between(log_cursor, end)
collection: ResourceCollection = await asyncio.to_thread(
braintree_gateway.transaction.search,
TransactionSearch.created_at.between(log_cursor, end),
)

count = 0

for object in collection.items:
async for object in _async_iterator_wrapper(collection):
count += 1
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

Expand Down Expand Up @@ -134,13 +154,14 @@ async def fetch_customers(
window_end = log_cursor + timedelta(hours=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.customer.search(
CustomerSearch.created_at.between(log_cursor, end)
collection: ResourceCollection = await asyncio.to_thread(
braintree_gateway.customer.search,
CustomerSearch.created_at.between(log_cursor, end),
)

count = 0

for object in collection.items:
async for object in _async_iterator_wrapper(collection):
count += 1
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

Expand Down Expand Up @@ -168,13 +189,14 @@ async def fetch_credit_card_verifications(
window_end = log_cursor + timedelta(hours=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.verification.search(
CreditCardVerificationSearch.created_at.between(log_cursor, end)
collection: ResourceCollection = await asyncio.to_thread(
braintree_gateway.verification.search,
CreditCardVerificationSearch.created_at.between(log_cursor, end),
)

count = 0

for object in collection.items:
async for object in _async_iterator_wrapper(collection):
count += 1
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

Expand Down Expand Up @@ -202,13 +224,14 @@ async def fetch_subscriptions(
window_end = log_cursor + timedelta(hours=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.subscription.search(
SubscriptionSearch.created_at.between(log_cursor, end)
collection: ResourceCollection = await asyncio.to_thread(
braintree_gateway.subscription.search,
SubscriptionSearch.created_at.between(log_cursor, end),
)

count = 0

for object in collection.items:
async for object in _async_iterator_wrapper(collection):
count += 1
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

Expand Down Expand Up @@ -239,13 +262,14 @@ async def fetch_disputes(
window_end = log_cursor + timedelta(hours=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.dispute.search(
DisputeSearch.received_date.between(start, end)
search_result = await asyncio.to_thread(
braintree_gateway.dispute.search,
DisputeSearch.received_date.between(start, end),
)

count = 0

for object in collection.disputes:
async for object in _async_iterator_wrapper(search_result.disputes):
count += 1
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

Expand Down

0 comments on commit 82556d6

Please sign in to comment.