From 75c33a20708a00fd4dcdc77a8cd4abc8df1c2347 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Tue, 3 Sep 2024 15:02:09 -0400 Subject: [PATCH] source-mixpanel-native: Remove state checkpoint property for `export` stream. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During the connector import, the `state_checkpoint_interval` was added to the `export` stream, which checkpoints state after every X documents. This assumes that documents are in ascending order of the cursor field. However, `export` does not return documents in ascending order, so this strategy does not work. I also suspect it interfers with `get_updated_state` - the other older way to checkpoint state. This could explain why the `export` stream for an existing task is missing documents. Also, sometimes the API returns an `export` document with a `insert_id�` property instead of `insert_id`. Logic to strip off the extra bytes was added. --- .../source_mixpanel_native/streams/base.py | 7 ------- .../source_mixpanel_native/streams/export.py | 11 ++++++++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/source-mixpanel-native/source_mixpanel_native/streams/base.py b/source-mixpanel-native/source_mixpanel_native/streams/base.py index cd5da4005a..809766430b 100644 --- a/source-mixpanel-native/source_mixpanel_native/streams/base.py +++ b/source-mixpanel-native/source_mixpanel_native/streams/base.py @@ -26,13 +26,6 @@ class MixpanelStream(HttpStream, ABC): DEFAULT_REQS_PER_HOUR_LIMIT = 60 - @property - def state_checkpoint_interval(self) -> int: - # to meet the requirement of emitting state at least once per 15 minutes, - # we assume there's at least 1 record per request returned. Given that each request is followed by a 60 seconds sleep - # we'll have to emit state every 15 records - return 10000 - @property def url_base(self): prefix = "eu." if self.region == "EU" else "" diff --git a/source-mixpanel-native/source_mixpanel_native/streams/export.py b/source-mixpanel-native/source_mixpanel_native/streams/export.py index eb4aae9cbe..901739fa6d 100644 --- a/source-mixpanel-native/source_mixpanel_native/streams/export.py +++ b/source-mixpanel-native/source_mixpanel_native/streams/export.py @@ -3,6 +3,7 @@ # import json +import re from functools import cache from typing import Any, Iterable, Mapping, MutableMapping @@ -18,6 +19,8 @@ from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream +INSERT_ID_PATTERN = r'^insert_id.+' + class ExportSchema(MixpanelStream): """ Export helper stream for dynamic schema extraction. @@ -194,7 +197,13 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma for result in transform_property_names(properties.keys()): # Convert all values to string (this is default property type) # because API does not provide properties type information - item[result.transformed_name] = str(properties[result.source_name]) + + # Sometimes, the API returns a document that has a `insert_id�` property instead of `insert_id`. + # When this happens, we have to remove the extra byte(s). + if re.match(INSERT_ID_PATTERN, result.transformed_name): + item["insert_id"] = str(properties[result.source_name]) + else: + item[result.transformed_name] = str(properties[result.source_name]) # convert timestamp to datetime string item["time"] = pendulum.from_timestamp(int(item["time"]), tz="UTC").to_iso8601_string()