Skip to content

Commit

Permalink
source-braintree-native: refactor and make window_size configurable
Browse files Browse the repository at this point in the history
The window size used for incremental streams is now a config setting,
and hitting search limits will cause the connector to error out & state
that a smaller window size should be used.

The various incremental functions in `resources.py` have been refactored.
I did not refactor the various `fetch_foobar` functions in `api.py`
since we may need to customize the behavior based on user feedback & I
anticipate we'll need to add more streams in the near future. I think
it'll be easier/more efficient to refactor these functions when we're
done adding additional streams.
  • Loading branch information
Alex-Bair committed Dec 4, 2024
1 parent 6062f69 commit f98f7be
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 261 deletions.
7 changes: 4 additions & 3 deletions source-braintree-native/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ credentials:
private_key_sops: ENC[AES256_GCM,data:x2pTFZqlycBlRYO+TrguTvJ+LeB+91iIO8Q0oNLzrBY=,iv:x/2wq0MPdw88DonfNoiA/TfSrbMw5Qov5Aw6+jnnOTk=,tag:+Gt/KPG0janD/cOBKQLfHw==,type:str]
advanced:
is_sandbox: true
window_size: 300
sops:
kms: []
gcp_kms:
Expand All @@ -15,8 +16,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-12-02T22:50:14Z"
mac: ENC[AES256_GCM,data:19i1FR1mdq9JdWdGxQNbwVo3zujo2Fdf+6IBJ0MLMAv9zP4upcVLmEOcbaW9X4e/TUDUG2Bm+df294Oo9IqVN1S4YOirC6KKvpvFF2b9Hke5j7noxrxdovtzpNdcaLqPZjR4e7BzQU3Ove6Rv2FHDZgWuTyj46fyyBOnI/8QTOY=,iv:uNli96+Kb19jdE1GovvCYxgWkNxYyRUj6BBH3Ohz8c4=,tag:TXCnEWzPssJ2zXeZ20IgcQ==,type:str]
lastmodified: "2024-12-04T16:27:37Z"
mac: ENC[AES256_GCM,data:lzvH1MDnxT8E6d7aJRnpxOtHC8a6r0qxShSY8iyyngppBa4fm47/WbHyk29zgPXoR372D3SeQLuNNT0RGScAlovx4pfXvsO9BHhNWE5JiVs2IN14Ss/uCgHjMA9cA5TEu466GE9DWtlcyPabk+GI2TGNtORRKia1pGZxlQYyebo=,iv:eQSVXXGhT9gMHmrOVr7NuwWAST61UYAcKDRCqZXHWw4=,tag:9G2lM3l5ymFIxRTPXv14Vw==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.7.3
version: 3.7.3
61 changes: 34 additions & 27 deletions source-braintree-native/source_braintree_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,28 @@

from .models import (
FullRefreshResource,
Customer,
Dispute,
Subscription,
Transaction,
CreditCardVerification,
IncrementalResource,
)


# Searches return at most 10,000 results (50,000 for transaction searches). If we hit this limit,
# the connector could have missed data and we'll need to use smaller date windows.
SEARCH_LIMIT = 10_000
TRANSACTION_SEARCH_LIMIT = 50_000
# Incremental streams use sliding date windows to try & avoid hitting the above search limits.
WINDOW_SIZE = 15

CONVENIENCE_OBJECTS = [
'gateway'
]


def _search_limit_error_message(count: int, name: str) -> str:
msg = f"{count} {name} returned in a single search which is "
f"greater than or equal to Braintree's documented maximum for a single {name} search. "
"Reduce the window size and backfill this stream."

return msg


def _braintree_object_to_dict(braintree_object):
"""
Recursively convert a Braintree object and its nested objects to a dictionary.
Expand Down Expand Up @@ -87,12 +89,13 @@ async def snapshot_resources(

async def fetch_transactions(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Transaction | LogCursor, None]:
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(days=WINDOW_SIZE)
window_end = log_cursor + timedelta(days=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.transaction.search(
Expand All @@ -103,14 +106,14 @@ async def fetch_transactions(

for object in collection.items:
count += 1
doc = Transaction.model_validate(_braintree_object_to_dict(object))
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at > log_cursor:
yield doc
most_recent_created_at = doc.created_at

if count >= TRANSACTION_SEARCH_LIMIT:
log.warning(f"{count} transactions returned in a single search which is greater than or equal to Braintree's documented maximum for a single transaction search.")
raise RuntimeError(_search_limit_error_message(count, "transactions"))

if end == window_end:
yield window_end
Expand All @@ -120,12 +123,13 @@ async def fetch_transactions(

async def fetch_customers(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Customer | LogCursor, None]:
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(days=WINDOW_SIZE)
window_end = log_cursor + timedelta(days=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.customer.search(
Expand All @@ -136,14 +140,14 @@ async def fetch_customers(

for object in collection.items:
count += 1
doc = Customer.model_validate(_braintree_object_to_dict(object))
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at > log_cursor:
yield doc
most_recent_created_at = doc.created_at

if count >= SEARCH_LIMIT:
log.warning(f"{count} customers returned in a single search which is greater than or equal to Braintree's documented maximum for a single customer search.")
raise RuntimeError(_search_limit_error_message(count, "customers"))

if end == window_end:
yield window_end
Expand All @@ -153,12 +157,13 @@ async def fetch_customers(

async def fetch_credit_card_verifications(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[CreditCardVerification | LogCursor, None]:
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(days=WINDOW_SIZE)
window_end = log_cursor + timedelta(days=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.verification.search(
Expand All @@ -169,14 +174,14 @@ async def fetch_credit_card_verifications(

for object in collection.items:
count += 1
doc = CreditCardVerification.model_validate(_braintree_object_to_dict(object))
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at > log_cursor:
yield doc
most_recent_created_at = doc.created_at

if count >= SEARCH_LIMIT:
log.warning(f"{count} credit card verifications returned in a single search which is greater than or equal to Braintree's documented maximum for a single credit card verifications search.")
raise RuntimeError(_search_limit_error_message(count, "credit card verifications"))

if end == window_end:
yield window_end
Expand All @@ -186,12 +191,13 @@ async def fetch_credit_card_verifications(

async def fetch_subscriptions(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Subscription | LogCursor, None]:
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
window_end = log_cursor + timedelta(days=WINDOW_SIZE)
window_end = log_cursor + timedelta(days=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.subscription.search(
Expand All @@ -202,14 +208,14 @@ async def fetch_subscriptions(

for object in collection.items:
count += 1
doc = Subscription.model_validate(_braintree_object_to_dict(object))
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at > log_cursor:
yield doc
most_recent_created_at = doc.created_at

if count >= SEARCH_LIMIT:
log.warning(f"{count} subscriptions returned in a single search which is greater than or equal to Braintree's documented maximum for a single subscriptions search.")
raise RuntimeError(_search_limit_error_message(count, "subscriptions"))

if end == window_end:
yield window_end
Expand All @@ -219,15 +225,16 @@ async def fetch_subscriptions(

async def fetch_disputes(
braintree_gateway: BraintreeGateway,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Dispute | LogCursor, None]:
) -> AsyncGenerator[IncrementalResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)
most_recent_created_at = log_cursor
# The start date must be shifted back 1 day since we have to query Braintree using the received_date field,
# which is less granular than the created_at cursor field (date vs. datetime).
start = log_cursor - timedelta(days=1)
window_end = log_cursor + timedelta(days=WINDOW_SIZE)
window_end = log_cursor + timedelta(days=window_size)
end = min(window_end, datetime.now(tz=UTC))

collection = braintree_gateway.dispute.search(
Expand All @@ -238,14 +245,14 @@ async def fetch_disputes(

for object in collection.disputes:
count += 1
doc = Dispute.model_validate(_braintree_object_to_dict(object))
doc = IncrementalResource.model_validate(_braintree_object_to_dict(object))

if doc.created_at > log_cursor:
yield doc
most_recent_created_at = doc.created_at

if count >= SEARCH_LIMIT:
log.warning(f"{count} disputes returned in a single search which is greater than or equal to Braintree's documented maximum for a single disputes search.")
raise RuntimeError(_search_limit_error_message(count, "disputes"))

if end == window_end:
yield window_end
Expand Down
47 changes: 16 additions & 31 deletions source-braintree-native/source_braintree_native/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from braintree import BraintreeGateway
from datetime import datetime, timezone, timedelta
from logging import Logger
from pydantic import AwareDatetime, BaseModel, Field
from typing import Literal

from typing import Annotated, AsyncGenerator, Callable, Literal

from estuary_cdk.capture.common import (
BaseDocument,
ConnectorState as GenericConnectorState,
LogCursor,
ResourceConfig,
ResourceState,
)
Expand Down Expand Up @@ -50,9 +52,15 @@ class Advanced(BaseModel):
title="Is a Sandbox Environment",
default=False,
)
window_size: Annotated[int, Field(
description="Window size in days for incremental streams. This should be left as the default value unless connector errors indicate a smaller window size is required.",
title="Window Size",
default=15,
gt=0,
)]

advanced: Advanced = Field(
default_factory=Advanced,
default_factory=Advanced, #type: ignore
title="Advanced Config",
description="Advanced settings for the connector.",
json_schema_extra={"advanced": True},
Expand All @@ -65,35 +73,12 @@ class FullRefreshResource(BaseDocument, extra="allow"):
id: str | None


# Supported full refresh resources and their corresponding name, gateway property, and gateway response property.
FULL_REFRESH_RESOURCES: list[tuple[str, str, str | None]] = [
("merchant_accounts", "merchant_account", "merchant_accounts"),
("discounts", "discount", None),
("add_ons", "add_on", None),
("plans", "plan", None),
]


class Customer(BaseDocument, extra="allow"):
id: str
created_at: AwareDatetime


class Dispute(BaseDocument, extra="allow"):
class IncrementalResource(BaseDocument, extra="allow"):
id: str
created_at: AwareDatetime


class Subscription(BaseDocument, extra="allow"):
id: str
created_at: AwareDatetime


class Transaction(BaseDocument, extra="allow"):
id: str
created_at: AwareDatetime


class CreditCardVerification(BaseDocument, extra="allow"):
id: str
created_at: AwareDatetime
IncrementalResourceFetchChangesFn = Callable[
[BraintreeGateway, int, Logger, LogCursor],
AsyncGenerator[IncrementalResource | LogCursor, None],
]
Loading

0 comments on commit f98f7be

Please sign in to comment.