Skip to content

Commit

Permalink
source-mixpanel-native: Remove state checkpoint property for export
Browse files Browse the repository at this point in the history
… stream.

During the connector import, the `state_checkpoint_interval` was added to the `export` stream, which checkpoints state after every X documents. This assumes that documents are in ascending order of the cursor field. However, `export` does not return documents in ascending order, so this strategy does not work. I also suspect it interfers with `get_updated_state` - the other older way to checkpoint state. This could explain why the `export` stream for an existing task is missing documents.

Also, sometimes the API returns an `export` document with a `insert_id�` property instead of `insert_id`. Logic to strip off the extra bytes was added.
  • Loading branch information
Alex-Bair committed Sep 3, 2024
1 parent 5021445 commit 75c33a2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
7 changes: 0 additions & 7 deletions source-mixpanel-native/source_mixpanel_native/streams/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ class MixpanelStream(HttpStream, ABC):

DEFAULT_REQS_PER_HOUR_LIMIT = 60

@property
def state_checkpoint_interval(self) -> int:
# to meet the requirement of emitting state at least once per 15 minutes,
# we assume there's at least 1 record per request returned. Given that each request is followed by a 60 seconds sleep
# we'll have to emit state every 15 records
return 10000

@property
def url_base(self):
prefix = "eu." if self.region == "EU" else ""
Expand Down
11 changes: 10 additions & 1 deletion source-mixpanel-native/source_mixpanel_native/streams/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import re
from functools import cache
from typing import Any, Iterable, Mapping, MutableMapping

Expand All @@ -18,6 +19,8 @@
from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream


INSERT_ID_PATTERN = r'^insert_id.+'

class ExportSchema(MixpanelStream):
"""
Export helper stream for dynamic schema extraction.
Expand Down Expand Up @@ -194,7 +197,13 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
for result in transform_property_names(properties.keys()):
# Convert all values to string (this is default property type)
# because API does not provide properties type information
item[result.transformed_name] = str(properties[result.source_name])

# Sometimes, the API returns a document that has a `insert_id�` property instead of `insert_id`.
# When this happens, we have to remove the extra byte(s).
if re.match(INSERT_ID_PATTERN, result.transformed_name):
item["insert_id"] = str(properties[result.source_name])
else:
item[result.transformed_name] = str(properties[result.source_name])

# convert timestamp to datetime string
item["time"] = pendulum.from_timestamp(int(item["time"]), tz="UTC").to_iso8601_string()
Expand Down

0 comments on commit 75c33a2

Please sign in to comment.