From 86ec0af03a51e18ac1298d6436c0e5381c6f34c4 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Wed, 9 Oct 2024 16:22:41 -0400 Subject: [PATCH 1/2] source-klaviyo: fix incremental pagination Incremental streams were using `>` instead of `>=` when filtering results with a specific datetime. Although subsequent requests use cursors to paginate through responses, it's possible to miss results within the same second on connector restarts (like for schema inference updates). --- source-klaviyo/source_klaviyo/streams.py | 2 +- source-klaviyo/tests/test_streams.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/source-klaviyo/source_klaviyo/streams.py b/source-klaviyo/source_klaviyo/streams.py index a6bcb3412d..733d873030 100644 --- a/source-klaviyo/source_klaviyo/streams.py +++ b/source-klaviyo/source_klaviyo/streams.py @@ -176,7 +176,7 @@ def request_params( # Setting a minimum value of at least 3 seconds from the current time ensures this will never happen, # and allows our 'abnormal_state' acceptance test to pass. latest_cursor = min(latest_cursor, pendulum.now().subtract(seconds=3)) - params["filter"] = f"greater-than({self.cursor_field},{latest_cursor.isoformat()})" + params["filter"] = f"greater-or-equal({self.cursor_field},{latest_cursor.isoformat()})" params["sort"] = self.cursor_field return params diff --git a/source-klaviyo/tests/test_streams.py b/source-klaviyo/tests/test_streams.py index cbd6f4bc8b..625609d305 100644 --- a/source-klaviyo/tests/test_streams.py +++ b/source-klaviyo/tests/test_streams.py @@ -168,7 +168,7 @@ def test_cursor_field_is_required(self): {"updated": "2023-01-01T00:00:00+00:00"}, {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, { - "filter": "greater-than(updated,2023-01-01T00:00:00+00:00)", + "filter": "greater-or-equal(updated,2023-01-01T00:00:00+00:00)", "page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa", "sort": "updated", }, @@ -178,7 +178,7 @@ def test_cursor_field_is_required(self): None, {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, { - "filter": "greater-than(updated,2020-10-10T00:00:00+00:00)", + "filter": "greater-or-equal(updated,2020-10-10T00:00:00+00:00)", "page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa", "sort": "updated", }, @@ -203,7 +203,7 @@ def test_cursor_field_is_required(self): {"updated": "2023-01-01T00:00:00+00:00"}, {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, { - "filter": "greater-than(updated,2023-01-01T00:00:00+00:00)", + "filter": "greater-or-equal(updated,2023-01-01T00:00:00+00:00)", "page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa", "sort": "updated", }, @@ -504,17 +504,17 @@ class TestArchivedRecordsStream: ( {"archived": {"updated_at": "2023-10-10 00:00:00"}}, None, - {"filter": "and(greater-than(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", "sort": "updated_at"}, + {"filter": "and(greater-or-equal(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", "sort": "updated_at"}, ), ( {"archived": {"updated_at": "2023-10-10 00:00:00"}}, { - "filter": "and(greater-than(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", + "filter": "and(greater-or-equal(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", "sort": "updated_at", "page[cursor]": "next_page_cursor", }, { - "filter": "and(greater-than(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", + "filter": "and(greater-or-equal(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", "sort": "updated_at", "page[cursor]": "next_page_cursor", }, @@ -522,12 +522,12 @@ class TestArchivedRecordsStream: ( {}, { - "filter": "and(greater-than(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", + "filter": "and(greater-or-equal(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", "sort": "updated_at", "page[cursor]": "next_page_cursor", }, { - "filter": "and(greater-than(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", + "filter": "and(greater-or-equal(updated_at,2023-10-10T00:00:00+00:00),equals(archived,true))", "sort": "updated_at", "page[cursor]": "next_page_cursor", }, From d95d51d54d4ea09a13e8e3dd7319174a2ff7453a Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Wed, 9 Oct 2024 18:37:53 -0400 Subject: [PATCH 2/2] source-klaviyo: update snapshot file names --- source-klaviyo/tests/__init__.py | 0 .../snapshots__discover__capture.stdout.json | 307 ++++++++++++++++++ .../snapshots__spec__capture.stdout.json | 77 +++++ ...t_snapshots__discover__capture.stdout.json | 306 ----------------- ..._test_snapshots__spec__capture.stdout.json | 76 ----- 5 files changed, 384 insertions(+), 382 deletions(-) create mode 100644 source-klaviyo/tests/__init__.py create mode 100644 source-klaviyo/tests/snapshots/snapshots__discover__capture.stdout.json create mode 100644 source-klaviyo/tests/snapshots/snapshots__spec__capture.stdout.json delete mode 100644 source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__discover__capture.stdout.json delete mode 100644 source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__spec__capture.stdout.json diff --git a/source-klaviyo/tests/__init__.py b/source-klaviyo/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/source-klaviyo/tests/snapshots/snapshots__discover__capture.stdout.json b/source-klaviyo/tests/snapshots/snapshots__discover__capture.stdout.json new file mode 100644 index 0000000000..3600319c70 --- /dev/null +++ b/source-klaviyo/tests/snapshots/snapshots__discover__capture.stdout.json @@ -0,0 +1,307 @@ +[ + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "campaigns", + "resourceConfig": { + "cursorField": [ + "updated_at" + ], + "stream": "campaigns", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "events", + "resourceConfig": { + "cursorField": [ + "datetime" + ], + "stream": "events", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "global_exclusions", + "resourceConfig": { + "cursorField": [ + "updated" + ], + "stream": "global_exclusions", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "lists", + "resourceConfig": { + "cursorField": [ + "updated" + ], + "stream": "lists", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "metrics", + "resourceConfig": { + "cursorField": [ + "updated" + ], + "stream": "metrics", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "flows", + "resourceConfig": { + "cursorField": [ + "updated" + ], + "stream": "flows", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "email_templates", + "resourceConfig": { + "cursorField": [ + "updated" + ], + "stream": "email_templates", + "syncMode": "incremental" + } + }, + { + "documentSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "_meta": { + "properties": { + "row_id": { + "type": "integer" + } + }, + "required": [ + "row_id" + ], + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "recommendedName": "profiles", + "resourceConfig": { + "cursorField": [ + "updated" + ], + "stream": "profiles", + "syncMode": "incremental" + } + } + ] + \ No newline at end of file diff --git a/source-klaviyo/tests/snapshots/snapshots__spec__capture.stdout.json b/source-klaviyo/tests/snapshots/snapshots__spec__capture.stdout.json new file mode 100644 index 0000000000..04638e9b2e --- /dev/null +++ b/source-klaviyo/tests/snapshots/snapshots__spec__capture.stdout.json @@ -0,0 +1,77 @@ +[ + { + "configSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "api_key": { + "airbyte_secret": true, + "description": "Klaviyo API Key. See our docs if you need help finding this key: https://go.estuary.dev/JqH0gQ", + "order": 0, + "title": "Api Key", + "type": "string" + }, + "start_date": { + "description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field is optional - if not provided, all data will be replicated.", + "examples": [ + "2017-01-25T00:00:00Z" + ], + "format": "date-time", + "order": 1, + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + "title": "Start Date", + "type": "string" + } + }, + "required": [ + "api_key" + ], + "title": "Klaviyo Spec", + "type": "object" + }, + "documentationUrl": "https://go.estuary.dev/JqH0gQ", + "protocol": 3032023, + "resourceConfigSchema": { + "additionalProperties": false, + "description": "ResourceConfig encodes a configured resource stream", + "properties": { + "cursorField": { + "items": { + "type": "string" + }, + "title": "Cursor Field", + "type": "array" + }, + "namespace": { + "description": "Enclosing schema namespace of this resource", + "title": "Namespace", + "type": "string" + }, + "stream": { + "description": "Name of this stream", + "title": "Stream", + "type": "string" + }, + "syncMode": { + "description": "Sync this resource incrementally, or fully refresh it every run", + "enum": [ + "full_refresh", + "incremental" + ], + "title": "Sync Mode", + "type": "string" + } + }, + "required": [ + "stream", + "syncMode" + ], + "title": "ResourceConfig", + "type": "object" + }, + "resourcePathPointers": [ + "/namespace", + "/stream" + ] + } + ] + \ No newline at end of file diff --git a/source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__discover__capture.stdout.json b/source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__discover__capture.stdout.json deleted file mode 100644 index 7b35002914..0000000000 --- a/source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__discover__capture.stdout.json +++ /dev/null @@ -1,306 +0,0 @@ -[ - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "campaigns", - "resourceConfig": { - "cursorField": [ - "updated_at" - ], - "stream": "campaigns", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "events", - "resourceConfig": { - "cursorField": [ - "datetime" - ], - "stream": "events", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "global_exclusions", - "resourceConfig": { - "cursorField": [ - "updated" - ], - "stream": "global_exclusions", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "lists", - "resourceConfig": { - "cursorField": [ - "updated" - ], - "stream": "lists", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "metrics", - "resourceConfig": { - "cursorField": [ - "updated" - ], - "stream": "metrics", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "flows", - "resourceConfig": { - "cursorField": [ - "updated" - ], - "stream": "flows", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "email_templates", - "resourceConfig": { - "cursorField": [ - "updated" - ], - "stream": "email_templates", - "syncMode": "incremental" - } - }, - { - "documentSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "properties": { - "_meta": { - "properties": { - "row_id": { - "type": "integer" - } - }, - "required": [ - "row_id" - ], - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - }, - "key": [ - "/id" - ], - "recommendedName": "profiles", - "resourceConfig": { - "cursorField": [ - "updated" - ], - "stream": "profiles", - "syncMode": "incremental" - } - } -] diff --git a/source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__spec__capture.stdout.json b/source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__spec__capture.stdout.json deleted file mode 100644 index 69065662ca..0000000000 --- a/source-klaviyo/tests/snapshots/source_klaviyo_tests_test_snapshots__spec__capture.stdout.json +++ /dev/null @@ -1,76 +0,0 @@ -[ - { - "configSchema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "api_key": { - "airbyte_secret": true, - "description": "Klaviyo API Key. See our docs if you need help finding this key: https://go.estuary.dev/JqH0gQ", - "order": 0, - "title": "Api Key", - "type": "string" - }, - "start_date": { - "description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field is optional - if not provided, all data will be replicated.", - "examples": [ - "2017-01-25T00:00:00Z" - ], - "format": "date-time", - "order": 1, - "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - "title": "Start Date", - "type": "string" - } - }, - "required": [ - "api_key" - ], - "title": "Klaviyo Spec", - "type": "object" - }, - "documentationUrl": "https://go.estuary.dev/JqH0gQ", - "protocol": 3032023, - "resourceConfigSchema": { - "additionalProperties": false, - "description": "ResourceConfig encodes a configured resource stream", - "properties": { - "cursorField": { - "items": { - "type": "string" - }, - "title": "Cursor Field", - "type": "array" - }, - "namespace": { - "description": "Enclosing schema namespace of this resource", - "title": "Namespace", - "type": "string" - }, - "stream": { - "description": "Name of this stream", - "title": "Stream", - "type": "string" - }, - "syncMode": { - "description": "Sync this resource incrementally, or fully refresh it every run", - "enum": [ - "full_refresh", - "incremental" - ], - "title": "Sync Mode", - "type": "string" - } - }, - "required": [ - "stream", - "syncMode" - ], - "title": "ResourceConfig", - "type": "object" - }, - "resourcePathPointers": [ - "/namespace", - "/stream" - ] - } -]