Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-impact-native: fix Actions backfill cursor and other misc. fixes #2186

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source-impact-native/source_impact_native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def request_class(self):

async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
return ConnectorSpec(
documentationUrl="https://docs.estuary.dev",
documentationUrl="https://go.estuary.dev/source-impact-native",
configSchema=EndpointConfig.model_json_schema(),
oauth2=None,
resourceConfigSchema=ResourceConfig.model_json_schema(),
Expand Down
30 changes: 15 additions & 15 deletions source-impact-native/source_impact_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def fetch_incremental(

async def fetch_backfill(
cls,
stop_date,
config_start_date: datetime,
account_sid,
http: HTTPSession,
log: Logger,
Expand All @@ -80,16 +80,16 @@ async def fetch_backfill(
else:
url = f"{API}/{API_CATALOG}/{account_sid}/{cls.NAME}"
if _cls.START_DATE:
parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, stop_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}
parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, config_start_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}

result = json.loads(await http.request(log, url, method="GET", params=parameters, headers=headers))

for results in result[f"{_cls.NAME}"]:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == stop_date:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == config_start_date:
doc = _cls.model_validate_json(json.dumps(results))
yield doc
return
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < stop_date:
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < config_start_date:
return
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < cutoff:
doc = _cls.model_validate_json(json.dumps(results))
Expand Down Expand Up @@ -156,7 +156,7 @@ async def fetch_incremental_actions(
async def fetch_backfill_actions(
cls_parent,
cls,
stop_date,
config_start_date: datetime,
account_sid,
http: HTTPSession,
log: Logger,
Expand All @@ -176,8 +176,8 @@ async def fetch_backfill_actions(
parameters = {}


if cutoff - timedelta(days=1095) <= stop_date <= cutoff:
start_date = stop_date
if cutoff - timedelta(days=1095) <= config_start_date <= cutoff:
start_date = config_start_date
end_date = cutoff
else:
start_date = cutoff - timedelta(days=1095)
Expand All @@ -202,21 +202,21 @@ async def fetch_backfill_actions(
for start, end in dates:
iterating = True

parameters["LockingDateStart"] = _cursor_dt(cls.NAME, start)
parameters["LockingDateEnd"] = _cursor_dt(cls.NAME, end)
parameters["ActionDateStart"] = _cursor_dt(cls.NAME, start)
parameters["ActionDateEnd"] = _cursor_dt(cls.NAME, end)


while iterating:

result = json.loads(await http.request(log, url, method="GET", params=parameters, headers=headers))

for results in result[f"{_cls.NAME}"]:
if _s_to_dt(results[f"CreationDate"]) == stop_date:
if _s_to_dt(results[f"CreationDate"]) == config_start_date:
doc = _cls.model_validate_json(json.dumps(results))
yield doc
iterating = False
break
elif _s_to_dt(results[f"CreationDate"]) < stop_date:
elif _s_to_dt(results[f"CreationDate"]) < config_start_date:
break
elif _s_to_dt(results[f"CreationDate"]) < cutoff:
doc = _cls.model_validate_json(json.dumps(results))
Expand Down Expand Up @@ -316,7 +316,7 @@ async def fetch_incremental_child(
async def fetch_backfill_child(
cls_parent,
cls,
stop_date,
config_start_date: datetime,
account_sid,
http: HTTPSession,
log: Logger,
Expand Down Expand Up @@ -349,17 +349,17 @@ async def fetch_backfill_child(
url = f"{API}/{API_CATALOG}/{account_sid}/Programs/{campaign}/{cls.NAME}"
headers = {'Accept': 'application/json'}
# if _cls.START_DATE:
# parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, stop_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}
# parameters = {f"{_cls.START_DATE}": _cursor_dt(cls.NAME, config_start_date), f"{_cls.END_DATE}": _cursor_dt(cls.NAME, cutoff)}

while iterating:
result = json.loads(await http.request(log, url, method="GET", params=parameters, headers=headers))

for results in result[f"{_cls.NAME}"]:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == stop_date:
if _s_to_dt(results[f"{_cls.REP_KEY}"]) == config_start_date:
doc = _cls.model_validate_json(json.dumps(results))
yield doc

elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < stop_date:
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < config_start_date:
iterating = False
break
elif _s_to_dt(results[f"{_cls.REP_KEY}"]) < cutoff:
Expand Down
14 changes: 9 additions & 5 deletions source-impact-native/source_impact_native/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, UTC, timedelta
from enum import StrEnum, auto
from pydantic import BaseModel, Field, AwareDatetime
from typing import Literal, Generic, TypeVar, Annotated, ClassVar, TYPE_CHECKING, Dict, List
Expand All @@ -17,6 +17,10 @@

ConnectorState = GenericConnectorState[ResourceState]

def default_start_date():
dt = datetime.now(tz=UTC) - timedelta(days=30)
return dt


class CatalogEnum(StrEnum):
brand = "Brand"
Expand All @@ -31,10 +35,10 @@ class EndpointConfig(BaseModel, extra="allow"):
description="As of now, only BRAND catalogs are allowed"

)
stop_date: AwareDatetime = Field(
description="Replication Stop Date. Records will only be considered for backfilling "\
"before the stop_date, similar to a start date",
default=datetime.fromisoformat("2010-01-01T00:00:00Z".replace('Z', '+00:00'))
start_date: AwareDatetime = Field(
description="UTC date and time in the format YYYY-MM-DDTHH:MM:SSZ. Data generated before this date will not be replicated. If left blank, the start date will be set to 30 days before the present date.",
title="Start Date",
default_factory=default_start_date
)


Expand Down
48 changes: 24 additions & 24 deletions source-impact-native/source_impact_native/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,25 @@ async def all_resources(
if config.api_catalog == "Brand":
# We pass along the credentials.username value since its required by every endpoint
all_streams = (
action_object(Campaigns, Actions, http, config.stop_date, config.credentials.username),
action_object(Campaigns, ActionInquiries, http, config.stop_date, config.credentials.username),
base_object(Invoices, http, config.stop_date, config.credentials.username),
base_object(Jobs, http, config.stop_date, config.credentials.username),
base_object(PromoCodes, http, config.stop_date, config.credentials.username),
base_object(Deals, http, config.stop_date, config.credentials.username),
base_object(ExceptionLists, http, config.stop_date, config.credentials.username),
base_object(UniqueUrls, http, config.stop_date, config.credentials.username),
base_object(TrackingValueRequests, http, config.stop_date, config.credentials.username),
snapshot_object(Campaigns, http, config.stop_date, config.credentials.username),
snapshot_object(Ads, http, config.stop_date, config.credentials.username),
snapshot_object(Catalogs, http, config.stop_date, config.credentials.username),
snapshot_object(Reports, http, config.stop_date, config.credentials.username),
snapshot_object(PhoneNumbers, http, config.stop_date, config.credentials.username),
child_object(Campaigns, Contracts, http, config.stop_date, config.credentials.username),
child_object(Campaigns, Tasks, http, config.stop_date, config.credentials.username),
child_object(Campaigns, Notes, http, config.stop_date, config.credentials.username),
child_object(Campaigns, BlockRedirectRules, http, config.stop_date, config.credentials.username),
media_groups_object(Campaigns, MediaPartnerGroups, http, config.stop_date, config.credentials.username),
action_object(Campaigns, Actions, http, config.start_date, config.credentials.username),
action_object(Campaigns, ActionInquiries, http, config.start_date, config.credentials.username),
base_object(Invoices, http, config.start_date, config.credentials.username),
base_object(Jobs, http, config.start_date, config.credentials.username),
base_object(PromoCodes, http, config.start_date, config.credentials.username),
base_object(Deals, http, config.start_date, config.credentials.username),
base_object(ExceptionLists, http, config.start_date, config.credentials.username),
base_object(UniqueUrls, http, config.start_date, config.credentials.username),
base_object(TrackingValueRequests, http, config.start_date, config.credentials.username),
snapshot_object(Campaigns, http, config.start_date, config.credentials.username),
snapshot_object(Ads, http, config.start_date, config.credentials.username),
snapshot_object(Catalogs, http, config.start_date, config.credentials.username),
snapshot_object(Reports, http, config.start_date, config.credentials.username),
snapshot_object(PhoneNumbers, http, config.start_date, config.credentials.username),
child_object(Campaigns, Contracts, http, config.start_date, config.credentials.username),
child_object(Campaigns, Tasks, http, config.start_date, config.credentials.username),
child_object(Campaigns, Notes, http, config.start_date, config.credentials.username),
child_object(Campaigns, BlockRedirectRules, http, config.start_date, config.credentials.username),
media_groups_object(Campaigns, MediaPartnerGroups, http, config.start_date, config.credentials.username),

)

Expand Down Expand Up @@ -114,7 +114,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at)
),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -154,7 +154,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at)
),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -187,7 +187,7 @@ def open(
model=cls,
open=open,
initial_state=ResourceState(),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -225,7 +225,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at)
),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)

Expand Down Expand Up @@ -259,6 +259,6 @@ def open(
model=cls,
open=open,
initial_state=ResourceState(),
initial_config=ResourceConfig(name=cls.NAME),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=5)),
schema_inference=True,
)
Empty file.
Loading
Loading