Skip to content

Commit

Permalink
enabled pagination (#3)
Browse files Browse the repository at this point in the history
Co-authored-by: Josh Lloyd <jlloyd@jlloyd-mbp19.local>
  • Loading branch information
jlloyd-widen and Josh Lloyd authored Apr 13, 2022
1 parent 84679eb commit d2e0ef6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 101 deletions.
16 changes: 7 additions & 9 deletions tap_workato/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class WorkatoStream(RESTStream):
url_base = "https://www.workato.com"

records_jsonpath = "$[*]"
next_page_token_jsonpath = "$.next_page" # Or override `get_next_page_token`.
current_page = None

@property
def http_headers(self) -> dict:
Expand All @@ -30,14 +30,11 @@ def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.next_page_token_jsonpath:
all_matches = extract_jsonpath(
self.next_page_token_jsonpath, response.json()
)
first_match = next(iter(all_matches), None)
next_page_token = first_match
else:
next_page_token = response.headers.get("X-Next-Page", None)
next_page_token: Optional[int] = None
if self.current_page:
all_matches = extract_jsonpath(self.records_jsonpath, response.json())
records_cnt = sum([1 for m in all_matches])
next_page_token = self.current_page + 1 if records_cnt == 100 else None

return next_page_token

Expand All @@ -48,6 +45,7 @@ def get_url_params(
params: dict = {"per_page": 100}
if next_page_token:
params["page"] = next_page_token
self.current_page = next_page_token
if self.replication_key:
params["order"] = "default"
return params
Expand Down
122 changes: 30 additions & 92 deletions tap_workato/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class ApiCollectionsStream(WorkatoStream):
path = "/api/api_collections"
primary_keys = ["id"]
replication_key = None
current_page = 1
schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("name", th.StringType),
Expand Down Expand Up @@ -70,6 +71,7 @@ class FoldersStream(WorkatoStream):
path = "/api/folders"
primary_keys = ["id"]
replication_key = None
current_page = 1
schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("name", th.StringType),
Expand All @@ -87,6 +89,7 @@ class RecipesStream(WorkatoStream):
primary_keys = ["id"]
replication_key = None
records_jsonpath = "$.items[*]"
current_page = 1
schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("user_id", th.IntegerType),
Expand Down Expand Up @@ -238,6 +241,7 @@ class CustomerAccountsStream(WorkatoStream):
primary_keys = ["id"]
replication_key = None
records_jsonpath = "$.result[*]"
current_page = 1
schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("external_id", th.StringType),
Expand Down Expand Up @@ -268,15 +272,26 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
return {"customer_account_id": record["id"]}


class CustomerMembersStream(WorkatoStream):
class CustomerChildStreams(WorkatoStream):
"""Parent Stream for all children to the CustomerAccountsStream for DRY code."""

parent_stream_type = CustomerAccountsStream
primary_keys = ["customer_account_id", "id"]
replication_key = None

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row


class CustomerMembersStream(CustomerChildStreams):
"""Stream for extracting customers' connections."""

name = "customer_members"
path = "/api/managed_users/{customer_account_id}/members"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$[*]"
parent_stream_type = CustomerAccountsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -288,23 +303,13 @@ class CustomerMembersStream(WorkatoStream):
th.Property("time_zone", th.StringType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row


class CustomerConnectionsStream(WorkatoStream):
class CustomerConnectionsStream(CustomerChildStreams):
"""Stream for extracting customers' connections."""

name = "customer_connections"
path = "/api/managed_users/{customer_account_id}/connections"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$.result[*]"
parent_stream_type = CustomerAccountsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -320,23 +325,14 @@ class CustomerConnectionsStream(WorkatoStream):
th.Property("parent_id", th.IntegerType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row


class CustomerFoldersStream(WorkatoStream):
class CustomerFoldersStream(CustomerChildStreams):
"""Stream for extracting customers' folders."""

name = "customer_folders"
path = "/api/managed_users/{customer_account_id}/folders"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$.result[*]"
parent_stream_type = CustomerAccountsStream
current_page = 1
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -346,23 +342,14 @@ class CustomerFoldersStream(WorkatoStream):
th.Property("updated_at", th.DateTimeType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row


class CustomerRecipesStream(WorkatoStream):
class CustomerRecipesStream(CustomerChildStreams):
"""Stream for extracting customers' recipes."""

name = "customer_recipes"
path = "/api/managed_users/{customer_account_id}/recipes"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$.result[*]"
parent_stream_type = CustomerAccountsStream
current_page = 1
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand Down Expand Up @@ -402,13 +389,6 @@ class CustomerRecipesStream(WorkatoStream):
),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {
Expand All @@ -418,7 +398,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict:


# not currently possible to hit this endpoint for managed users' recipes
# class CustomerJobsStream(WorkatoStream):
# class CustomerJobsStream(CustomerChildStreams):
# """Stream for extracting Jobs."""
#
# name = "customer_jobs"
Expand Down Expand Up @@ -468,15 +448,12 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
# return row


class CustomerApiCollectionsStream(WorkatoStream):
class CustomerApiCollectionsStream(CustomerChildStreams):
"""Stream for extracting customers' folders."""

name = "customer_api_collections"
path = "/api/managed_users/{customer_account_id}/api_collections"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$.result[*]"
parent_stream_type = CustomerAccountsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -488,23 +465,13 @@ class CustomerApiCollectionsStream(WorkatoStream):
th.Property("api_spec_url", th.StringType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row


class CustomerApiEndpointsStream(WorkatoStream):
class CustomerApiEndpointsStream(CustomerChildStreams):
"""Stream for extracting customers' folders."""

name = "customer_api_endpoints"
path = "/api/managed_users/{customer_account_id}/api_endpoints"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$.result[*]"
parent_stream_type = CustomerAccountsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -522,23 +489,13 @@ class CustomerApiEndpointsStream(WorkatoStream):
th.Property("updated_at", th.DateTimeType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row


class CustomerApiClientsStream(WorkatoStream):
class CustomerApiClientsStream(CustomerChildStreams):
"""Stream for extracting customers' folders."""

name = "customer_api_clients"
path = "/api/managed_users/{customer_account_id}/api_clients"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$.result[*]"
parent_stream_type = CustomerAccountsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -547,13 +504,6 @@ class CustomerApiClientsStream(WorkatoStream):
th.Property("updated_at", th.DateTimeType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {
Expand All @@ -569,7 +519,6 @@ class CustomerApiAccessProfilesStream(WorkatoStream):
path = "/api/managed_users/{customer_account_id}/api_access_profiles"
primary_keys = ["customer_account_id", "api_client_id", "id"]
replication_key = None
records_jsonpath = "$[*]"
parent_stream_type = CustomerApiClientsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
Expand All @@ -593,15 +542,11 @@ def post_process( # type: ignore[override]
return row


class CustomerRolesStream(WorkatoStream):
class CustomerRolesStream(CustomerChildStreams):
"""Stream for extracting customers' folders."""

name = "customer_roles"
path = "/api/managed_users/{customer_account_id}/roles"
primary_keys = ["customer_account_id", "id"]
replication_key = None
records_jsonpath = "$[*]"
parent_stream_type = CustomerAccountsStream
schema = th.PropertiesList(
th.Property("customer_account_id", th.IntegerType),
th.Property("id", th.IntegerType),
Expand All @@ -611,10 +556,3 @@ class CustomerRolesStream(WorkatoStream):
th.Property("created_at", th.DateTimeType),
th.Property("updated_at", th.DateTimeType),
).to_dict()

def post_process( # type: ignore[override]
self, row: dict, context: dict
) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
row["customer_account_id"] = context["customer_account_id"]
return row

0 comments on commit d2e0ef6

Please sign in to comment.