Skip to content

Commit

Permalink
MRG: Merge pull request #650 from octue/upgrade-twined
Browse files Browse the repository at this point in the history
Use updated `twined`
  • Loading branch information
cortadocodes committed May 7, 2024
2 parents 55a6394 + 5db9609 commit 450a280
Show file tree
Hide file tree
Showing 14 changed files with 908 additions and 755 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
- id: isort

- repo: https://github.com/psf/black
rev: 22.6.0
rev: 24.4.2
hooks:
- id: black
args: ["--line-length", "120"]
Expand Down
150 changes: 77 additions & 73 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions octue/cloud/deployment/google/answer_pub_sub_question.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from octue.cloud.events.counter import EventCounter
from octue.cloud.pub_sub.service import Service
from octue.cloud.service_id import create_sruid, get_sruid_parts
from octue.configuration import load_service_and_app_configuration
Expand Down Expand Up @@ -32,6 +33,7 @@ def answer_question(question, project_name):

service = Service(service_id=service_sruid, backend=GCPPubSubBackend(project_name=project_name))
question_uuid = get_nested_attribute(question, "attributes.question_uuid")
order = EventCounter()

try:
runner = Runner.from_configuration(
Expand All @@ -42,9 +44,9 @@ def answer_question(question, project_name):
)

service.run_function = runner.run
service.answer(question)
service.answer(question, order)
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)

except BaseException as error: # noqa
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN")
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN", order=order)
logger.exception(error)
4 changes: 2 additions & 2 deletions octue/cloud/deployment/google/cloud_run/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ def index():
return _log_bad_request_and_return_400_response("No Pub/Sub message received.")

if not isinstance(envelope, dict) or "message" not in envelope:
return _log_bad_request_and_return_400_response("Invalid Pub/Sub message format.")
return _log_bad_request_and_return_400_response(f"Invalid Pub/Sub message format - received {envelope!r}.")

question = envelope["message"]

if "data" not in question or "attributes" not in question or "question_uuid" not in question["attributes"]:
return _log_bad_request_and_return_400_response("Invalid Pub/Sub message format.")
return _log_bad_request_and_return_400_response(f"Invalid Pub/Sub message format - received {envelope!r}.")

question_uuid = question["attributes"]["question_uuid"]

Expand Down
9 changes: 0 additions & 9 deletions octue/cloud/pub_sub/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json

from google.cloud.bigquery import Client, QueryJobConfig, ScalarQueryParameter

from octue.cloud.events.validation import VALID_EVENT_KINDS
Expand Down Expand Up @@ -76,14 +74,7 @@ def get_events(table_id, sender, question_uuid, kind=None, include_backend_metad
)

df = result.to_dataframe()

# Convert JSON strings to python primitives.
df["event"] = df["event"].map(json.loads)
df["event"].apply(_deserialise_manifest_if_present)
df["other_attributes"] = df["other_attributes"].map(json.loads)

if "backend_metadata" in df:
df["backend_metadata"] = df["backend_metadata"].map(json.loads)

events = df.to_dict(orient="records")
return _unflatten_events(events)
Expand Down
6 changes: 4 additions & 2 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,19 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow

return future, subscriber

def answer(self, question, heartbeat_interval=120, timeout=30):
def answer(self, question, order=None, heartbeat_interval=120, timeout=30):
"""Answer a question from a parent - i.e. run the child's app on the given data and return the output values.
Answers conform to the output values and output manifest schemas specified in the child's Twine file.
:param dict|google.cloud.pubsub_v1.subscriber.message.Message question:
:param octue.cloud.events.counter.EventCounter|None order: an event counter keeping track of the order of emitted events
:param int|float heartbeat_interval: the time interval, in seconds, at which to send heartbeats
:param float|None timeout: time in seconds to keep retrying sending of the answer once it has been calculated
:raise Exception: if any exception arises during running analysis and sending its results
:return None:
"""
order = order or EventCounter()

try:
(
question,
Expand All @@ -226,7 +229,6 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
return

heartbeater = None
order = EventCounter()

try:
self._send_delivery_acknowledgment(question_uuid, originator, order)
Expand Down
2 changes: 2 additions & 0 deletions octue/metadata/recorded_questions.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@
{"parent_sdk_version": "0.51.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"3bf3b178-3cf1-49aa-a1d9-89cd8ec05b1a\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmppu85nw5c\"}}}", "attributes": {"question_uuid": "6f7f6e1c-7632-4b4d-b91d-6f58dcb43c40", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.51.0", "message_number": "0"}}}
{"parent_sdk_version": "0.52.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.52.0", "message_number": "0"}}}
{"parent_sdk_version": "0.52.1", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.52.1", "message_number": "0"}}}
{"parent_sdk_version": "0.52.2", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.52.2", "message_number": "0"}}}
{"parent_sdk_version": "0.53.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"datetime": "2024-04-11T10:46:48.236064", "uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f", "question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "originator": "octue/test-service:0.1.0", "sender": "octue/test-service:0.1.0", "sender_type": "PARENT", "sender_sdk_version": "0.53.0", "recipient": "octue/another-service:1.0.12", "order": "0"}}}
{"parent_sdk_version": "0.54.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"datetime": "2024-04-11T10:46:48.236064", "uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f", "question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "originator": "octue/test-service:0.1.0", "sender": "octue/test-service:0.1.0", "sender_type": "PARENT", "sender_sdk_version": "0.54.0", "recipient": "octue/another-service:1.0.12", "order": "0"}}}
{"parent_sdk_version": "0.55.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"datetime": "2024-04-11T10:46:48.236064", "uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f", "question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "originator": "octue/test-service:0.1.0", "sender": "octue/test-service:0.1.0", "sender_type": "PARENT", "sender_sdk_version": "0.55.0", "recipient": "octue/another-service:1.0.12", "order": "0"}}}
Loading

0 comments on commit 450a280

Please sign in to comment.