Skip to content

Commit

Permalink
```
Browse files Browse the repository at this point in the history
refactor(tracker): remove field_change_logger parameter

Remove the field_change_logger parameter from com_tracker and os_tracker
functions. Instantiate FieldChangeLogger inside these functions instead.

feat(tracker_payload): add queue_required method

Add a new method queue_required to TrackerPayload class to check if
queueing is required based on tracker options.

refactor(tracker): implement deferred execution for com_tracker

Add deferred execution for com_tracker in case queueing is required.
Introduce process_com_tracker to handle the tracking process and log
queueing time.

feat(event_server_endpoint): add patch endpoint for tracking

Introduce a new PATCH endpoint /track with queueing enabled. Update
tracker payload with request headers and set profile_less flag based on
query parameter.
```
  • Loading branch information
atompie committed Sep 26, 2024
1 parent 79a0150 commit 412b41b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
3 changes: 3 additions & 0 deletions tracardi/domain/payload/tracker_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def tracardi_referer(self):
def scheduled_event_config(self) -> ScheduledEventConfig:
return ScheduledEventConfig(flow_id=self._scheduled_flow_id, node_id=self._scheduled_node_id)

def queue_required(self) -> bool:
return self.options.get('queue', False)

def _set_user_agent(self):
if self._user_agent is None:
try:
Expand Down
53 changes: 36 additions & 17 deletions tracardi/service/tracker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from time import time
from typing import Optional

from tracardi.domain.bridges.configurable_bridges import WebHookBridge, RestApiBridge, ConfigurableBridge
from tracardi.exceptions.exception import BlockedException
from tracardi.service.change_monitoring.field_change_logger import FieldChangeLogger
from tracardi.service.license import License
from tracardi.domain.payload.tracker_payload import TrackerPayload
from tracardi.service.tracking.source_validation import validate_source
Expand All @@ -13,12 +13,29 @@

if License.has_license():
from com_tracardi.service.tracking.tracker import com_tracker
from com_tracardi.decorator.deffer_decorator import deferred_execution
else:
from tracardi.service.tracking.tracker import os_tracker

logger = get_logger(__name__)


async def process_com_tracker(tracker_config, tracker_payload: TrackerPayload, source, tracking_start: float):

result = await com_tracker(
source,
tracker_payload,
tracker_config,
tracking_start
)

# if result and tracardi.enable_errors_on_response:
# result['errors'] += self.console_log.get_errors()
# result['warnings'] += self.console_log.get_warnings()

return result


class Tracker:

def __init__(self, tracker_config: TrackerConfig):
Expand Down Expand Up @@ -81,27 +98,29 @@ async def track_event(self, tracker_payload: TrackerPayload, tracking_start: flo
if tracker_payload.source.transitional is True:
tracker_payload.set_ephemeral()

field_change_logger = FieldChangeLogger()

if License.has_license():
result = await com_tracker(
field_change_logger,
if not License.has_license():
return await os_tracker(
source,
tracker_payload,
self.tracker_config,
tracking_start
)
else:
result = await os_tracker(
field_change_logger,
source,
tracker_payload,

# Only commercial

if not tracker_payload.queue_required():
# Process without queue
return await process_com_tracker(self.tracker_config, tracker_payload, source, tracking_start)

# Queue
t = time()
with deferred_execution() as defer:
await defer(process_com_tracker)(
self.tracker_config,
tracker_payload,
source,
tracking_start
)

# if result and tracardi.enable_errors_on_response:
# result['errors'] += self.console_log.get_errors()
# result['warnings'] += self.console_log.get_warnings()
).push('queue_track')

return result
logger.info(f"Queued in {time()-t}")
return {}
3 changes: 2 additions & 1 deletion tracardi/service/tracking/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@


async def os_tracker(
field_change_logger: FieldChangeLogger,
source: EventSource,
tracker_payload: TrackerPayload,
tracker_config: TrackerConfig,
tracking_start: float
):
try:

field_change_logger = FieldChangeLogger()

if not tracker_payload.events:
logger.warning(f"No events have been sent in tracker payload.")
return None
Expand Down

0 comments on commit 412b41b

Please sign in to comment.