Skip to content

Commit

Permalink
Merge pull request #2186 from estuary/bair/source-impact-actions-cursors
Browse files Browse the repository at this point in the history
source-impact-native: fix `Actions` backfill cursor and other misc. fixes
  • Loading branch information
jonwihl authored Dec 6, 2024
2 parents bd2f944 + 0097a69 commit e227729
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 69 deletions.
2 changes: 1 addition & 1 deletion source-impact-native/source_impact_native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def request_class(self):

async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
return ConnectorSpec(
documentationUrl="https://docs.estuary.dev",
documentationUrl="https://go.estuary.dev/source-impact-native",
configSchema=EndpointConfig.model_json_schema(),
oauth2=None,
resourceConfigSchema=ResourceConfig.model_json_schema(),
Expand Down
30 changes: 15 additions & 15 deletions source-impact-native/source_impact_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def fetch_incremental(

async def fetch_backfill(
cls,
stop_date,
config_start_date: datetime,
account_sid,
http: HTTPSession,
log: Logger,
Expand All @@ -80,16 +80,16 @@ async def fetch_backfill(
else:
url = f"{API}/{API_CATALOG}/{account_sid}/{cls.NAME}"
if _cls.START_DATE:
parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, stop_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}
parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, config_start_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}

result = json.loads(await http.request(log, url, method="GET", params=parameters, headers=headers))

for results in result[f"{_cls.NAME}"]:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == stop_date:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == config_start_date:
doc = _cls.model_validate_json(json.dumps(results))
yield doc
return
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < stop_date:
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < config_start_date:
return
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < cutoff:
doc = _cls.model_validate_json(json.dumps(results))
Expand Down Expand Up @@ -156,7 +156,7 @@ async def fetch_incremental_actions(
async def fetch_backfill_actions(
cls_parent,
cls,
stop_date,
config_start_date: datetime,
account_sid,
http: HTTPSession,
log: Logger,
Expand All @@ -176,8 +176,8 @@ async def fetch_backfill_actions(
parameters = {}


if cutoff - timedelta(days=1095) <= stop_date <= cutoff:
start_date = stop_date
if cutoff - timedelta(days=1095) <= config_start_date <= cutoff:
start_date = config_start_date
end_date = cutoff
else:
start_date = cutoff - timedelta(days=1095)
Expand All @@ -202,21 +202,21 @@ async def fetch_backfill_actions(
for start, end in dates:
iterating = True

parameters["LockingDateStart"] = _cursor_dt(cls.NAME, start)
parameters["LockingDateEnd"] = _cursor_dt(cls.NAME, end)
parameters["ActionDateStart"] = _cursor_dt(cls.NAME, start)
parameters["ActionDateEnd"] = _cursor_dt(cls.NAME, end)


while iterating:

result = json.loads(await http.request(log, url, method="GET", params=parameters, headers=headers))

for results in result[f"{_cls.NAME}"]:
if _s_to_dt(results[f"CreationDate"]) == stop_date:
if _s_to_dt(results[f"CreationDate"]) == config_start_date:
doc = _cls.model_validate_json(json.dumps(results))
yield doc
iterating = False
break
elif _s_to_dt(results[f"CreationDate"]) < stop_date:
elif _s_to_dt(results[f"CreationDate"]) < config_start_date:
break
elif _s_to_dt(results[f"CreationDate"]) < cutoff:
doc = _cls.model_validate_json(json.dumps(results))
Expand Down Expand Up @@ -316,7 +316,7 @@ async def fetch_incremental_child(
async def fetch_backfill_child(
cls_parent,
cls,
stop_date,
config_start_date: datetime,
account_sid,
http: HTTPSession,
log: Logger,
Expand Down Expand Up @@ -349,17 +349,17 @@ async def fetch_backfill_child(
url = f"{API}/{API_CATALOG}/{account_sid}/Programs/{campaign}/{cls.NAME}"
headers = {'Accept': 'application/json'}
# if _cls.START_DATE:
# parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, stop_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}
# parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, config_start_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}

while iterating:
result = json.loads(await http.request(log, url, method="GET", params=parameters, headers=headers))

for results in result[f"{_cls.NAME}"]:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == stop_date:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == config_start_date:
doc = _cls.model_validate_json(json.dumps(results))
yield doc

elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < stop_date:
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < config_start_date:
iterating = False
break
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < cutoff:
Expand Down
14 changes: 9 additions & 5 deletions source-impact-native/source_impact_native/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, UTC, timedelta
from enum import StrEnum, auto
from pydantic import BaseModel, Field, AwareDatetime
from typing import Literal, Generic, TypeVar, Annotated, ClassVar, TYPE_CHECKING, Dict, List
Expand All @@ -17,6 +17,10 @@

ConnectorState = GenericConnectorState[ResourceState]

def default_start_date():
dt = datetime.now(tz=UTC) - timedelta(days=30)
return dt


class CatalogEnum(StrEnum):
brand = "Brand"
Expand All @@ -31,10 +35,10 @@ class EndpointConfig(BaseModel, extra="allow"):
description="As of now, only BRAND catalogs are allowed"

)
stop_date: AwareDatetime = Field(
description="Replication Stop Date. Records will only be considered for backfilling "\
"before the stop_date, similar to a start date",
default=datetime.fromisoformat("2010-01-01T00:00:00Z".replace('Z', '+00:00'))
start_date: AwareDatetime = Field(
description="UTC date and time in the format YYYY-MM-DDTHH:MM:SSZ. Data generated before this date will not be replicated. If left blank, the start date will be set to 30 days before the present date.",
title="Start Date",
default_factory=default_start_date
)


Expand Down
48 changes: 24 additions & 24 deletions source-impact-native/source_impact_native/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,25 @@ async def all_resources(
if config.api_catalog == "Brand":
# We pass along the credentials.username value since its required by every endpoint
all_streams = (
action_object(Campaigns, Actions, http, config.stop_date, config.credentials.username),
action_object(Campaigns, ActionInquiries, http, config.stop_date, config.credentials.username),
base_object(Invoices, http, config.stop_date, config.credentials.username),
base_object(Jobs, http, config.stop_date, config.credentials.username),
base_object(PromoCodes, http, config.stop_date, config.credentials.username),
base_object(Deals, http, config.stop_date, config.credentials.username),
base_object(ExceptionLists, http, config.stop_date, config.credentials.username),
base_object(UniqueUrls, http, config.stop_date, config.credentials.username),
base_object(TrackingValueRequests, http, config.stop_date, config.credentials.username),
snapshot_object(Campaigns, http, config.stop_date, config.credentials.username),
snapshot_object(Ads, http, config.stop_date, config.credentials.username),
snapshot_object(Catalogs, http, config.stop_date, config.credentials.username),
snapshot_object(Reports, http, config.stop_date, config.credentials.username),
snapshot_object(PhoneNumbers, http, config.stop_date, config.credentials.username),
child_object(Campaigns, Contracts, http, config.stop_date, config.credentials.username),
child_object(Campaigns, Tasks, http, config.stop_date, config.credentials.username),
child_object(Campaigns, Notes, http, config.stop_date, config.credentials.username),
child_object(Campaigns, BlockRedirectRules, http, config.stop_date, config.credentials.username),
media_groups_object(Campaigns, MediaPartnerGroups, http, config.stop_date, config.credentials.username),
action_object(Campaigns, Actions, http, config.start_date, config.credentials.username),
action_object(Campaigns, ActionInquiries, http, config.start_date, config.credentials.username),
base_object(Invoices, http, config.start_date, config.credentials.username),
base_object(Jobs, http, config.start_date, config.credentials.username),
base_object(PromoCodes, http, config.start_date, config.credentials.username),
base_object(Deals, http, config.start_date, config.credentials.username),
base_object(ExceptionLists, http, config.start_date, config.credentials.username),
base_object(UniqueUrls, http, config.start_date, config.credentials.username),
base_object(TrackingValueRequests, http, config.start_date, config.credentials.username),
snapshot_object(Campaigns, http, config.start_date, config.credentials.username),
snapshot_object(Ads, http, config.start_date, config.credentials.username),
snapshot_object(Catalogs, http, config.start_date, config.credentials.username),
snapshot_object(Reports, http, config.start_date, config.credentials.username),
snapshot_object(PhoneNumbers, http, config.start_date, config.credentials.username),
child_object(Campaigns, Contracts, http, config.start_date, config.credentials.username),
child_object(Campaigns, Tasks, http, config.start_date, config.credentials.username),
child_object(Campaigns, Notes, http, config.start_date, config.credentials.username),
child_object(Campaigns, BlockRedirectRules, http, config.start_date, config.credentials.username),
media_groups_object(Campaigns, MediaPartnerGroups, http, config.start_date, config.credentials.username),

)

Expand Down Expand Up @@ -114,7 +114,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at)
),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -154,7 +154,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at)
),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -187,7 +187,7 @@ def open(
model=cls,
open=open,
initial_state=ResourceState(),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -225,7 +225,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at)
),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -259,6 +259,6 @@ def open(
model=cls,
open=open,
initial_state=ResourceState(),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)
Empty file.
Loading

0 comments on commit e227729

Please sign in to comment.