Skip to content

Commit

Permalink
REF: Remove order event attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Jul 11, 2024
1 parent 5a1452b commit a21d295
Show file tree
Hide file tree
Showing 14 changed files with 34 additions and 130 deletions.
6 changes: 1 addition & 5 deletions octue/cloud/deployment/google/answer_pub_sub_question.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

from octue.cloud.events.counter import EventCounter
from octue.cloud.pub_sub.service import Service
from octue.cloud.service_id import create_sruid, get_sruid_parts
from octue.configuration import load_service_and_app_configuration
Expand Down Expand Up @@ -37,8 +36,6 @@ def answer_question(question, project_name):
originator = get_nested_attribute(question, "attributes.originator")
retry_count = get_nested_attribute(question, "attributes.retry_count")

order = EventCounter()

try:
runner = Runner.from_configuration(
service_configuration=service_configuration,
Expand All @@ -48,7 +45,7 @@ def answer_question(question, project_name):
)

service.run_function = runner.run
service.answer(question, order)
service.answer(question)
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)

except BaseException as error: # noqa
Expand All @@ -58,7 +55,6 @@ def answer_question(question, project_name):
originator_question_uuid=originator_question_uuid,
parent=parent,
originator=originator,
order=order,
retry_count=retry_count,
)

Expand Down
1 change: 0 additions & 1 deletion octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ def ask(
"originator_question_uuid": originator_question_uuid,
"forward_logs": subscribe_to_logs,
"save_diagnostics": save_diagnostics,
"order": 0,
"parent": self.id,
"originator": originator,
"sender": self.id,
Expand Down
35 changes: 0 additions & 35 deletions octue/cloud/events/counter.py

This file was deleted.

1 change: 0 additions & 1 deletion octue/cloud/events/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def _extract_event_and_attributes(self, container):
:param dict container: the container of the event
:return (any, dict): the event and its attributes
"""
container["attributes"]["order"] = int(container["attributes"]["order"])
return container["event"], container["attributes"]

def _handle_question(self, event, attributes):
Expand Down
2 changes: 1 addition & 1 deletion octue/cloud/events/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"result",
}

SERVICE_COMMUNICATION_SCHEMA_VERSION = "0.14.0"
SERVICE_COMMUNICATION_SCHEMA_VERSION = "0.14.1"
SERVICE_COMMUNICATION_SCHEMA_INFO_URL = "https://strands.octue.com/octue/service-communication"

SERVICE_COMMUNICATION_SCHEMA = {
Expand Down
1 change: 0 additions & 1 deletion octue/cloud/pub_sub/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"`sender_type`",
"`sender_sdk_version`",
"`recipient`",
"`order`",
"`other_attributes`",
)

Expand Down
5 changes: 1 addition & 4 deletions octue/cloud/pub_sub/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ def extract_event_and_attributes_from_pub_sub_message(message):
# Cast attributes to a dictionary to avoid defaultdict-like behaviour from Pub/Sub message attributes container.
attributes = dict(getattr_or_subscribe(message, "attributes"))

# Deserialise the `order`, `parent_question_uuid`, `forward_logs`, and `retry_count`, fields if they're present
# Deserialise the `parent_question_uuid`, `forward_logs`, and `retry_count`, fields if they're present
# (don't assume they are before validation).
if attributes.get("order"):
attributes["order"] = int(attributes["order"])

if attributes.get("parent_question_uuid") == "null":
attributes["parent_question_uuid"] = None

Expand Down
4 changes: 0 additions & 4 deletions octue/cloud/pub_sub/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class GoogleCloudPubSubHandler(logging.Handler):
:param str parent: the SRUID of the parent that asked the question these log records are related to
:param str originator: the SRUID of the service revision that triggered the tree of questions these log records are related to
:param str recipient: the SRUID of the service to send these log records to
:param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param float timeout: timeout in seconds for attempting to publish each log record
:return None:
Expand All @@ -30,7 +29,6 @@ def __init__(
parent,
originator,
recipient,
order,
retry_count,
timeout=60,
*args,
Expand All @@ -43,7 +41,6 @@ def __init__(
self.parent = parent
self.originator = originator
self.recipient = recipient
self.order = order
self.retry_count = retry_count
self.timeout = timeout
self._emit_event = event_emitter
Expand All @@ -63,7 +60,6 @@ def emit(self, record):
parent=self.parent,
originator=self.originator,
recipient=self.recipient,
order=self.order,
retry_count=self.retry_count,
question_uuid=self.question_uuid,
parent_question_uuid=self.parent_question_uuid,
Expand Down
80 changes: 25 additions & 55 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import importlib.metadata
import json
import logging
import threading
import uuid

import google.api_core.exceptions
Expand All @@ -15,7 +14,6 @@

import octue.exceptions
from octue.cloud.events import OCTUE_SERVICES_PREFIX
from octue.cloud.events.counter import EventCounter
from octue.cloud.events.validation import raise_if_event_is_invalid
from octue.cloud.pub_sub import Subscription, Topic
from octue.cloud.pub_sub.events import GoogleCloudPubSubEventHandler, extract_event_and_attributes_from_pub_sub_message
Expand All @@ -37,10 +35,6 @@

logger = logging.getLogger(__name__)

# A lock to ensure only one event can be emitted at a time so that the order is incremented correctly when events are
# being emitted on multiple threads (e.g. via the main thread and a periodic monitor message thread). This avoids 1)
# events overwriting each other in the parent's message handler and 2) events losing their order.
emit_event_lock = threading.Lock()

DEFAULT_NAMESPACE = "default"
ANSWERS_NAMESPACE = "answers"
Expand Down Expand Up @@ -205,19 +199,16 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow

return future, subscriber

def answer(self, question, order=None, heartbeat_interval=120, timeout=30):
def answer(self, question, heartbeat_interval=120, timeout=30):
"""Answer a question from a parent - i.e. run the child's app on the given data and return the output values.
Answers conform to the output values and output manifest schemas specified in the child's Twine file.
:param dict|google.cloud.pubsub_v1.subscriber.message.Message question:
:param octue.cloud.events.counter.EventCounter|None order: an event counter keeping track of the order of emitted events
:param int|float heartbeat_interval: the time interval, in seconds, at which to send heartbeats
:param float|None timeout: time in seconds to keep retrying sending of the answer once it has been calculated
:raise Exception: if any exception arises during running analysis and sending its results
:return None:
"""
order = order or EventCounter()

try:
(
question,
Expand All @@ -242,7 +233,6 @@ def answer(self, question, order=None, heartbeat_interval=120, timeout=30):
"originator_question_uuid": originator_question_uuid,
"parent": parent,
"originator": originator,
"order": order,
"retry_count": retry_count,
}

Expand Down Expand Up @@ -463,7 +453,6 @@ def send_exception(
originator_question_uuid,
parent,
originator,
order,
retry_count,
timeout=30,
):
Expand All @@ -474,7 +463,6 @@ def send_exception(
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the parent that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param float|None timeout: time in seconds to keep retrying sending of the exception
:return None:
Expand All @@ -495,7 +483,6 @@ def send_exception(
parent=parent,
originator=originator,
recipient=parent,
order=order,
retry_count=retry_count,
attributes={"sender_type": CHILD_SENDER_TYPE},
timeout=timeout,
Expand All @@ -510,14 +497,13 @@ def _emit_event(
parent,
originator,
recipient,
order,
retry_count,
attributes=None,
timeout=30,
):
"""Emit a JSON-serialised event as a Pub/Sub message to the services topic with optional message attributes,
incrementing the `order` argument by one. This method is thread-safe. Extra attributes can be added to an event
via the `attributes` argument but the following attributes are always included:
"""Emit a JSON-serialised event as a Pub/Sub message to the services topic with optional message attributes.
Extra attributes can be added to an event via the `attributes` argument but the following attributes are always
included:
- `uuid` (event UUID)
- `question_uuid`
- `parent_question_uuid`
Expand All @@ -527,7 +513,6 @@ def _emit_event(
- `sender`
- `sender_sdk_version`
- `recipient`
- `order`
- `datetime`
- `retry_count`
Expand All @@ -538,7 +523,6 @@ def _emit_event(
:param str parent: the SRUID of the parent that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param str recipient: the SRUID of the service the event is intended for
:param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param dict|None attributes: key-value pairs to attach to the event - the values must be strings or bytes
:param int|float timeout: the timeout for sending the event in seconds
Expand All @@ -555,31 +539,27 @@ def _emit_event(
attributes["sender_sdk_version"] = self._local_sdk_version
attributes["recipient"] = recipient
attributes["retry_count"] = retry_count

with emit_event_lock:
attributes["order"] = int(order)
attributes["datetime"] = datetime.datetime.utcnow().isoformat()
converted_attributes = {}

for key, value in attributes.items():
if isinstance(value, bool):
value = str(int(value))
elif isinstance(value, (int, float)):
value = str(value)
elif value is None:
value = json.dumps(value)

converted_attributes[key] = value

future = self.publisher.publish(
topic=self.services_topic.path,
data=json.dumps(event, cls=OctueJSONEncoder).encode(),
ordering_key=question_uuid,
retry=retry.Retry(deadline=timeout),
**converted_attributes,
)

order += 1
attributes["datetime"] = datetime.datetime.utcnow().isoformat()

converted_attributes = {}

for key, value in attributes.items():
if isinstance(value, bool):
value = str(int(value))
elif isinstance(value, (int, float)):
value = str(value)
elif value is None:
value = json.dumps(value)

converted_attributes[key] = value

future = self.publisher.publish(
topic=self.services_topic.path,
data=json.dumps(event, cls=OctueJSONEncoder).encode(),
ordering_key=question_uuid,
retry=retry.Retry(deadline=timeout),
**converted_attributes,
)

return future

Expand Down Expand Up @@ -628,7 +608,6 @@ def _send_question(
parent=self.id,
originator=originator,
recipient=recipient,
order=EventCounter(),
retry_count=retry_count,
attributes={
"forward_logs": forward_logs,
Expand All @@ -649,7 +628,6 @@ def _send_delivery_acknowledgment(
originator_question_uuid,
parent,
originator,
order,
retry_count,
timeout=30,
):
Expand All @@ -660,7 +638,6 @@ def _send_delivery_acknowledgment(
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the service that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param float timeout: time in seconds after which to give up sending
:return None:
Expand All @@ -674,7 +651,6 @@ def _send_delivery_acknowledgment(
parent=parent,
originator=originator,
recipient=parent,
order=order,
retry_count=retry_count,
attributes={"sender_type": CHILD_SENDER_TYPE},
)
Expand All @@ -688,7 +664,6 @@ def _send_heartbeat(
originator_question_uuid,
parent,
originator,
order,
retry_count,
timeout=30,
):
Expand All @@ -699,7 +674,6 @@ def _send_heartbeat(
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the parent that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param float timeout: time in seconds after which to give up sending
:return None:
Expand All @@ -712,7 +686,6 @@ def _send_heartbeat(
parent=parent,
originator=originator,
recipient=parent,
order=order,
retry_count=retry_count,
attributes={"sender_type": CHILD_SENDER_TYPE},
timeout=timeout,
Expand All @@ -728,7 +701,6 @@ def _send_monitor_message(
originator_question_uuid,
parent,
originator,
order,
retry_count,
timeout=30,
):
Expand All @@ -740,7 +712,6 @@ def _send_monitor_message(
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the service that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param float timeout: time in seconds to retry sending the message
:return None:
Expand All @@ -753,7 +724,6 @@ def _send_monitor_message(
parent=parent,
originator=originator,
recipient=parent,
order=order,
retry_count=retry_count,
timeout=timeout,
attributes={"sender_type": CHILD_SENDER_TYPE},
Expand Down
Loading

0 comments on commit a21d295

Please sign in to comment.