Skip to content

Commit

Permalink
source-hubspot-native: search API breaks out of cycles with more than…
Browse files Browse the repository at this point in the history
… 10k records

There is an undocumented failure of the search API when trying to page beyond
10,000 results. It's also possible that there are more than 10,000 records
modified at the same time.

Previously the only way out of this was to re-backfill the binding. This adds a
cycle-breaker routine that walks ids in ascending order that were all modified
at the exact same time to avoid looping forever.
  • Loading branch information
williamhbaker committed Nov 4, 2024
1 parent eef01ef commit 604cec8
Showing 1 changed file with 100 additions and 4 deletions.
104 changes: 100 additions & 4 deletions source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import functools
import itertools
from datetime import UTC, datetime, timedelta
import json
from logging import Logger
from typing import (
Any,
Expand Down Expand Up @@ -485,10 +486,8 @@ async def fetch_search_objects(
Multiple records can have the same "last modified" property value, and
indeed large runs of these may have the same value. So a "new" search will
inevitably grab some records again, and deduplication is handled here.
Technically it is possible for this strategy to not work at all if there are
more than 10,000 records with the same modification time, but we'll cross
our fingers and hope that is impossible in practice.
inevitably grab some records again, and deduplication is handled here via
the set.
'''

url = f"{HUB}/crm/v3/objects/{object_name}/search"
Expand Down Expand Up @@ -560,6 +559,27 @@ async def fetch_search_objects(
# the search API, so a new search must be initiated to complete the
# request.
cursor = None

if since == max_updated:
log.info(
"cycle detected for lastmodifieddate, fetching all ids for records modified at that instant",
{"object_name": object_name, "instant": since},
)
output_items.update(await fetch_search_objects_modified_at(
object_name, log, http, max_updated, last_modified_property_name
))

# HubSpot APIs use millisecond resolution, so move the time
# cursor forward by that minimum amount now that we know we have
# all of the records modified at the common `max_updated` time.
max_updated = max_updated + timedelta(milliseconds=1)

if until and max_updated > until:
# Unlikely edge case, but this would otherwise result in an
# error from the API.
break


since = max_updated
log.info(
"initiating a new search to satisfy requested time range",
Expand All @@ -579,6 +599,82 @@ async def fetch_search_objects(
return sorted(list(output_items), reverse=True), None


async def fetch_search_objects_modified_at(
object_name: str,
log: Logger,
http: HTTPSession,
modified: datetime,
last_modified_property_name: str = "hs_lastmodifieddate"
) -> set[tuple[datetime, str]]:
'''
Fetch all of the IDs of the given object that were modified at the given
time. Used exclusively for breaking out of cycles in the search API
resulting from more than 10,000 records being modified at the same time,
which is a thing that can happen.
To simplify the pagination strategy, the actual `paging` result isn't used
at all other than to see when we have reached the end, and the search query
just always asks for ids larger than what it had previously seen.
'''

url = f"{HUB}/crm/v3/objects/{object_name}/search"
limit = 200
output_items: set[tuple[datetime, str]] = set()
id_cursor: int | None = None
round = 0

while True:
filters: list[dict[str, Any]] = [{
"propertyName": last_modified_property_name,
"operator": "EQ",
"value": modified.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
}]

if id_cursor:
filters.append({
"propertyName": "hs_object_id",
"operator": "GT",
"value": id_cursor,
})

input = {
"filters": filters,
"sorts": [
{
"propertyName": "hs_object_id",
"direction": "ASCENDING"
}
],
"limit": limit,
}

result: SearchPageResult[CustomObjectSearchResult] = SearchPageResult[CustomObjectSearchResult].model_validate_json(
await http.request(log, url, method="POST", json=input)
)

if round % 50 == 0:
log.info(
"fetching ids for records modified at instant",
{
"object_name": object_name,
"instant": modified,
"count": len(output_items),
"remaining": result.total,
}
)

for r in result.results:
id_cursor = r.id
output_items.add((r.properties.hs_lastmodifieddate, str(r.id)))

if not result.paging:
break

round += 1

return output_items


def fetch_recent_custom_objects(
object_name: str, log: Logger, http: HTTPSession, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, CustomObject], None]:
Expand Down

0 comments on commit 604cec8

Please sign in to comment.