Skip to content

Commit

Permalink
MRG: Merge pull request #669 from octue/speed-up-event-replaying
Browse files Browse the repository at this point in the history
Speed up event replaying
  • Loading branch information
cortadocodes committed Jul 17, 2024
2 parents b3cfa00 + a0b68d4 commit 84b418a
Show file tree
Hide file tree
Showing 19 changed files with 413 additions and 217 deletions.
164 changes: 83 additions & 81 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._subscriber = MockSubscriber()

@property
def subscriber(self):
return self._subscriber

def create(self, allow_existing=False):
"""Register the subscription in the global subscriptions dictionary.
Expand All @@ -78,11 +82,14 @@ def create(self, allow_existing=False):
self._created = True

def delete(self):
"""Do nothing.
"""Delete the subscription from the global subscriptions dictionary.
:return None:
"""
pass
try:
SUBSCRIPTIONS.remove(self.name)
except KeyError:
pass

def exists(self, timeout=5):
"""Check if the subscription exists in the global subscriptions dictionary.
Expand Down Expand Up @@ -310,7 +317,6 @@ def __init__(self, backend, service_id=None, run_function=None, children=None, *
super().__init__(backend, service_id, run_function, *args, **kwargs)
self.children = children or {}
self._publisher = MockPublisher()
self.subscriber = MockSubscriber()

@property
def publisher(self):
Expand Down
11 changes: 7 additions & 4 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class AbstractEventHandler:
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
:param dict schema: the JSON schema to validate events against
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
:param bool only_handle_result: if `True`, skip handling non-result events and only handle the "result" event when received
:param bool only_handle_result: if `True`, skip handling non-result events and only handle the "result" event when received (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
:return None:
"""

Expand All @@ -51,12 +52,14 @@ def __init__(
schema=SERVICE_COMMUNICATION_SCHEMA,
include_service_metadata_in_logs=True,
only_handle_result=False,
validate_events=True,
):
self.handle_monitor_message = handle_monitor_message
self.record_events = record_events
self.schema = schema
self.include_service_metadata_in_logs = include_service_metadata_in_logs
self.only_handle_result = only_handle_result
self.validate_events = validate_events

self.handled_events = []
self._start_time = None
Expand Down Expand Up @@ -113,7 +116,7 @@ def _extract_and_validate_event(self, container):
recipient = attributes.get("recipient")
child_sdk_version = attributes.get("sender_sdk_version")

if not is_event_valid(
if self.validate_events and not is_event_valid(
event=event,
attributes=attributes,
recipient=recipient,
Expand All @@ -125,8 +128,8 @@ def _extract_and_validate_event(self, container):

logger.debug(
"%r: Received an event related to question %r.",
attributes["recipient"],
attributes["question_uuid"],
attributes.get("recipient"),
attributes.get("question_uuid"),
)

return (event, attributes)
Expand Down
9 changes: 8 additions & 1 deletion octue/cloud/events/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class EventReplayer(AbstractEventHandler):
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
:param dict|str schema: the JSON schema to validate events against
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
:return None:
"""

Expand All @@ -27,6 +28,7 @@ def __init__(
schema=SERVICE_COMMUNICATION_SCHEMA,
include_service_metadata_in_logs=True,
only_handle_result=False,
validate_events=True,
):
event_handlers = event_handlers or {
"question": self._handle_question,
Expand All @@ -45,6 +47,7 @@ def __init__(
schema=schema,
include_service_metadata_in_logs=include_service_metadata_in_logs,
only_handle_result=only_handle_result,
validate_events=validate_events,
)

def handle_events(self, events):
Expand All @@ -56,6 +59,10 @@ def handle_events(self, events):
super().handle_events()

for event in events:
# Skip validation and handling of other event kinds if only the result event is wanted.
if self.only_handle_result and event.get("event", {}).get("kind") != "result":
continue

event, attributes = self._extract_and_validate_event(event)

# Skip the event if it fails validation.
Expand Down
18 changes: 14 additions & 4 deletions octue/cloud/pub_sub/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import time
from datetime import datetime, timedelta
from functools import cached_property

from google.api_core import retry
from google.cloud.pubsub_v1 import SubscriberClient
Expand Down Expand Up @@ -91,11 +92,20 @@ def __init__(
include_service_metadata_in_logs=include_service_metadata_in_logs,
)

self._subscriber = SubscriberClient()
self._heartbeat_checker = None
self._last_heartbeat = None
self._alive = True

@cached_property
def subscriber(self):
"""Get or instantiate the subscriber client. The client isn't instantiated until this property is called for the
first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to be put off
until it's needed.
:return google.cloud.pubsub_v1.SubscriberClient:
"""
return SubscriberClient()

@property
def total_run_time(self):
"""The amount of time elapsed since `self.handle_events` was called. If it hasn't been called yet, this is
Expand Down Expand Up @@ -156,7 +166,7 @@ def handle_events(self, timeout=60, maximum_heartbeat_interval=300):

finally:
self._heartbeat_checker.cancel()
self._subscriber.close()
self.subscriber.close()

if self.handled_events:
last_event = self.handled_events[-1]
Expand Down Expand Up @@ -221,7 +231,7 @@ def _pull_available_events(self, timeout):
while self._alive:
logger.debug("Pulling events from Google Pub/Sub: attempt %d.", attempt)

pull_response = self._subscriber.pull(
pull_response = self.subscriber.pull(
request={"subscription": self.subscription.path, "max_messages": MAX_SIMULTANEOUS_MESSAGES_PULL},
retry=retry.Retry(),
)
Expand All @@ -240,7 +250,7 @@ def _pull_available_events(self, timeout):
if not pull_response.received_messages:
return []

self._subscriber.acknowledge(
self.subscriber.acknowledge(
request={
"subscription": self.subscription.path,
"ack_ids": [message.ack_id for message in pull_response.received_messages],
Expand Down
30 changes: 11 additions & 19 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ def __init__(self, backend, service_id=None, run_function=None, service_registri

self._pub_sub_id = convert_service_id_to_pub_sub_form(self.id)
self._local_sdk_version = importlib.metadata.version("octue")
self._publisher = None
self._services_topic = None
self._event_handler = None

def __repr__(self):
Expand All @@ -95,23 +93,20 @@ def __repr__(self):
"""
return f"<{type(self).__name__}({self.id!r})>"

@property
@functools.cached_property
def publisher(self):
"""Get or instantiate the publisher client for the service. No publisher is instantiated until this property is
called for the first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to
be put off until it's needed.
:return google.cloud.pubsub_v1.PublisherClient:
"""
if not self._publisher:
self._publisher = pubsub_v1.PublisherClient(
batch_settings=BATCH_SETTINGS,
publisher_options=pubsub_v1.types.PublisherOptions(enable_message_ordering=True),
)

return self._publisher
return pubsub_v1.PublisherClient(
batch_settings=BATCH_SETTINGS,
publisher_options=pubsub_v1.types.PublisherOptions(enable_message_ordering=True),
)

@property
@functools.cached_property
def services_topic(self):
"""Get the Octue services topic that all events in the project are published to. No topic is instantiated until
this property is called for the first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS`
Expand All @@ -120,15 +115,12 @@ def services_topic(self):
:raise octue.exceptions.ServiceNotFound: if the topic doesn't exist in the project
:return octue.cloud.pub_sub.topic.Topic: the Octue services topic for the project
"""
if not self._services_topic:
topic = Topic(name=OCTUE_SERVICES_PREFIX, project_name=self.backend.project_name)

if not topic.exists():
raise octue.exceptions.ServiceNotFound(f"{topic!r} cannot be found.")
topic = Topic(name=OCTUE_SERVICES_PREFIX, project_name=self.backend.project_name)

self._services_topic = topic
if not topic.exists():
raise octue.exceptions.ServiceNotFound(f"{topic!r} cannot be found.")

return self._services_topic
return topic

@property
def received_events(self):
Expand Down Expand Up @@ -533,7 +525,7 @@ def _emit_event(
attributes.update(
{
"uuid": str(uuid.uuid4()),
"datetime": datetime.datetime.utcnow().isoformat(),
"datetime": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
"question_uuid": question_uuid,
"parent_question_uuid": parent_question_uuid,
"originator_question_uuid": originator_question_uuid,
Expand Down
22 changes: 16 additions & 6 deletions octue/cloud/pub_sub/subscription.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from functools import cached_property

import google.api_core.exceptions
from google.cloud.pubsub_v1 import SubscriberClient
Expand Down Expand Up @@ -72,9 +73,18 @@ def __init__(

self.push_endpoint = push_endpoint
self.enable_message_ordering = enable_message_ordering
self._subscriber = SubscriberClient()
self._created = False

@cached_property
def subscriber(self):
"""Get or instantiate the subscriber client. The client isn't instantiated until this property is called for the
first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to be put off
until it's needed.
:return google.cloud.pubsub_v1.SubscriberClient:
"""
return SubscriberClient()

@property
def creation_triggered_locally(self):
"""Was the subscription successfully created by calling `self.create` locally? This is `False` if its creation
Expand Down Expand Up @@ -116,13 +126,13 @@ def create(self, allow_existing=False):
subscription = self._create_proto_message_subscription()

if not allow_existing:
subscription = self._subscriber.create_subscription(request=subscription)
subscription = self.subscriber.create_subscription(request=subscription)
self._created = True
self._log_creation()
return subscription

try:
subscription = self._subscriber.create_subscription(request=subscription)
subscription = self.subscriber.create_subscription(request=subscription)
self._created = True
except google.api_core.exceptions.AlreadyExists:
pass
Expand All @@ -135,7 +145,7 @@ def update(self):
:return None:
"""
self._subscriber.update_subscription(
self.subscriber.update_subscription(
request=UpdateSubscriptionRequest(
mapping=None,
subscription=self._create_proto_message_subscription(), # noqa
Expand All @@ -156,7 +166,7 @@ def delete(self):
:return None:
"""
self._subscriber.delete_subscription(subscription=self.path)
self.subscriber.delete_subscription(subscription=self.path)
logger.info("Subscription %r deleted.", self.path)

def exists(self, timeout=5):
Expand All @@ -166,7 +176,7 @@ def exists(self, timeout=5):
:return bool:
"""
try:
self._subscriber.get_subscription(subscription=self.path, timeout=timeout)
self.subscriber.get_subscription(subscription=self.path, timeout=timeout)
return True
except google.api_core.exceptions.NotFound:
return False
Expand Down
22 changes: 16 additions & 6 deletions octue/cloud/pub_sub/topic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import time
from datetime import datetime
from functools import cached_property

import google.api_core.exceptions
from google.cloud.pubsub_v1 import PublisherClient
Expand All @@ -24,9 +25,18 @@ def __init__(self, name, project_name):
self.name = name
self.project_name = project_name
self.path = self.generate_topic_path(self.project_name, self.name)
self._publisher = PublisherClient()
self._created = False

@cached_property
def publisher(self):
"""Get or instantiate the publisher client. The client isn't instantiated until this property is called for the
first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to be put off
until it's needed.
:return google.cloud.pubsub_v1.PublisherClient:
"""
return PublisherClient()

@property
def creation_triggered_locally(self):
"""Was the topic successfully created by calling `self.create` locally? This is `False` if its creation was
Expand All @@ -52,15 +62,15 @@ def create(self, allow_existing=False):
posix_timestamp_with_no_decimals = str(datetime.now().timestamp()).split(".")[0]

if not allow_existing:
self._publisher.create_topic(
self.publisher.create_topic(
request=Topic_(name=self.path, labels={"created": posix_timestamp_with_no_decimals})
)
self._created = True
self._log_creation()
return

try:
self._publisher.create_topic(
self.publisher.create_topic(
request=Topic_(name=self.path, labels={"created": posix_timestamp_with_no_decimals})
)
self._created = True
Expand All @@ -74,14 +84,14 @@ def get_subscriptions(self):
:return list(str):
"""
return list(self._publisher.list_topic_subscriptions(topic=self.path))
return list(self.publisher.list_topic_subscriptions(topic=self.path))

def delete(self):
"""Delete the topic from Google Pub/Sub.
:return None:
"""
self._publisher.delete_topic(topic=self.path)
self.publisher.delete_topic(topic=self.path)
logger.info("Topic %r deleted.", self.path)

def exists(self, timeout=10):
Expand All @@ -94,7 +104,7 @@ def exists(self, timeout=10):

while time.time() - start_time <= timeout:
try:
self._publisher.get_topic(topic=self.path)
self.publisher.get_topic(topic=self.path)
return True
except google.api_core.exceptions.NotFound:
time.sleep(1)
Expand Down
Loading

0 comments on commit 84b418a

Please sign in to comment.