diff --git a/tracardi/domain/payload/tracker_payload.py b/tracardi/domain/payload/tracker_payload.py index 3c8a45ef..30764631 100644 --- a/tracardi/domain/payload/tracker_payload.py +++ b/tracardi/domain/payload/tracker_payload.py @@ -111,6 +111,9 @@ def tracardi_referer(self): def scheduled_event_config(self) -> ScheduledEventConfig: return ScheduledEventConfig(flow_id=self._scheduled_flow_id, node_id=self._scheduled_node_id) + def queue_required(self) -> bool: + return self.options.get('queue', False) + def _set_user_agent(self): if self._user_agent is None: try: diff --git a/tracardi/service/tracker.py b/tracardi/service/tracker.py index e329fa4d..a9d28649 100644 --- a/tracardi/service/tracker.py +++ b/tracardi/service/tracker.py @@ -1,8 +1,8 @@ +from time import time from typing import Optional from tracardi.domain.bridges.configurable_bridges import WebHookBridge, RestApiBridge, ConfigurableBridge from tracardi.exceptions.exception import BlockedException -from tracardi.service.change_monitoring.field_change_logger import FieldChangeLogger from tracardi.service.license import License from tracardi.domain.payload.tracker_payload import TrackerPayload from tracardi.service.tracking.source_validation import validate_source @@ -13,12 +13,29 @@ if License.has_license(): from com_tracardi.service.tracking.tracker import com_tracker + from com_tracardi.decorator.deffer_decorator import deferred_execution else: from tracardi.service.tracking.tracker import os_tracker logger = get_logger(__name__) +async def process_com_tracker(tracker_config, tracker_payload: TrackerPayload, source, tracking_start: float): + + result = await com_tracker( + source, + tracker_payload, + tracker_config, + tracking_start + ) + + # if result and tracardi.enable_errors_on_response: + # result['errors'] += self.console_log.get_errors() + # result['warnings'] += self.console_log.get_warnings() + + return result + + class Tracker: def __init__(self, tracker_config: TrackerConfig): @@ -81,27 +98,29 @@ async def track_event(self, tracker_payload: TrackerPayload, tracking_start: flo if tracker_payload.source.transitional is True: tracker_payload.set_ephemeral() - field_change_logger = FieldChangeLogger() - - if License.has_license(): - result = await com_tracker( - field_change_logger, + if not License.has_license(): + return await os_tracker( source, tracker_payload, self.tracker_config, tracking_start ) - else: - result = await os_tracker( - field_change_logger, - source, - tracker_payload, + + # Only commercial + + if not tracker_payload.queue_required(): + # Process without queue + return await process_com_tracker(self.tracker_config, tracker_payload, source, tracking_start) + + # Queue + t = time() + with deferred_execution() as defer: + await defer(process_com_tracker)( self.tracker_config, + tracker_payload, + source, tracking_start - ) - - # if result and tracardi.enable_errors_on_response: - # result['errors'] += self.console_log.get_errors() - # result['warnings'] += self.console_log.get_warnings() + ).push('queue_track') - return result + logger.info(f"Queued in {time()-t}") + return {} diff --git a/tracardi/service/tracking/tracker.py b/tracardi/service/tracking/tracker.py index 6a25de09..6602889b 100644 --- a/tracardi/service/tracking/tracker.py +++ b/tracardi/service/tracking/tracker.py @@ -22,7 +22,6 @@ async def os_tracker( - field_change_logger: FieldChangeLogger, source: EventSource, tracker_payload: TrackerPayload, tracker_config: TrackerConfig, @@ -30,6 +29,8 @@ async def os_tracker( ): try: + field_change_logger = FieldChangeLogger() + if not tracker_payload.events: logger.warning(f"No events have been sent in tracker payload.") return None