Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async event retrieval workflow #647

Merged
merged 28 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c3953df
TST: Test retrieving results from real asynchronous question
cortadocodes Apr 15, 2024
610b635
ENH: Deserialise manifests from async events in `get_event`
cortadocodes Apr 15, 2024
bcde18e
ENH: Return question UUID from `Child.ask`
cortadocodes Apr 15, 2024
b69f189
DOC: Add DOI badge to readme
cortadocodes Apr 17, 2024
6f91f08
DEP: Upgrade `gunicorn` to avoid vulnerability
cortadocodes Apr 17, 2024
92c61a5
FIX: Use correct base for `python3.11` dockerfile base image
cortadocodes Apr 17, 2024
40f23e9
TST: Update deployment test
cortadocodes Apr 17, 2024
40e16b0
OPS: Import missing APIs into terraform config
cortadocodes Apr 17, 2024
4a894a4
OPS: Deploy version `0.5.0` of event handler cloud function
cortadocodes Apr 17, 2024
b173ba6
OPS: Update event store schema
cortadocodes Apr 17, 2024
12d9882
FIX: Always return attributes from event store in `get_events`
cortadocodes Apr 17, 2024
4b32c36
FIX: Return schema-compliant events and attributes from `get_events`
cortadocodes Apr 17, 2024
f496b4b
DEP: Make `db-dtypes` and `google-cloud-bigquery` optional
cortadocodes Apr 23, 2024
0a731ac
TST: Update deployment test
cortadocodes Apr 23, 2024
7376f91
REF: Rename `_denormalise_events` to `_unflatten_events`
cortadocodes Apr 23, 2024
76a4867
ENH: Raise error if no events found when calling `get_events`
cortadocodes Apr 23, 2024
6d04c97
TST: Always make `total_rows=0` in `MockEmptyResult`
cortadocodes Apr 23, 2024
8239c19
OPS: Update `actions/setup-python` to version 5
cortadocodes Apr 23, 2024
6d47bb1
OPS: Cache dependencies in some workflows
cortadocodes Apr 23, 2024
63a6ad8
OPS: Install poetry before caching dependencies
cortadocodes Apr 23, 2024
b259028
OPS: Run tests with python3.10
cortadocodes Apr 23, 2024
0c45c2f
REV: Revert "OPS: Install poetry before caching dependencies"
cortadocodes Apr 23, 2024
3b1ffa7
REV: Revert "OPS: Cache dependencies in some workflows"
cortadocodes Apr 23, 2024
c86b441
OPS: Use python3.11 for tests
cortadocodes Apr 23, 2024
abfe68b
DEP: Loosen numpy dependency
cortadocodes Apr 23, 2024
b357580
REV: Revert "OPS: Use python3.11 for tests"
cortadocodes Apr 23, 2024
330ea82
TST: Update test for `python3.10`
cortadocodes Apr 23, 2024
2f4e8b7
OPS: Use `python3.10` for release workflow
cortadocodes Apr 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: "!contains(github.event.head_commit.message, 'skipci')"
runs-on: ${{ matrix.os }}
env:
USING_COVERAGE: "3.9"
USING_COVERAGE: "3.10"
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
Expand All @@ -34,9 +34,9 @@ jobs:
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.9
python-version: "3.10"

- name: Install Poetry
uses: snok/install-poetry@v1.3.2
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
if: "github.event.pull_request.merged == true"
runs-on: ${{ matrix.os }}
env:
USING_COVERAGE: "3.9"
USING_COVERAGE: "3.10"
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
Expand All @@ -26,9 +26,9 @@ jobs:
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.9
python-version: "3.10"

- name: Install Poetry
uses: snok/install-poetry@v1.3.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/version-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Check version compatibility has been tested
Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[![Documentation Status](https://readthedocs.org/projects/octue-python-sdk/badge/?version=latest)](https://octue-python-sdk.readthedocs.io/en/latest/?badge=latest)
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
[![black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.10961975.svg)](https://doi.org/10.5281/zenodo.10961975)

# Octue Python SDK <img src="./docs/source/images/213_purple-fruit-snake-transparent.gif" alt="Purple Fruit Snake" width="100"/></span>

Expand All @@ -15,7 +16,9 @@ Read the docs [here.](https://octue-python-sdk.readthedocs.io/en/latest/)
Uses our [twined](https://twined.readthedocs.io/en/latest/) library for data validation.

## Installation and usage

To install, run one of:

```shell
pip install octue
```
Expand All @@ -25,6 +28,7 @@ poetry add octue
```

The command line interface (CLI) can then be accessed via:

```shell
octue --help
```
Expand Down Expand Up @@ -59,13 +63,15 @@ Commands:
```

## Deprecated code

When code is deprecated, it will still work but a deprecation warning will be issued with a suggestion on how to update
it. After an adjustment period, deprecations will be removed from the codebase according to the [code removal schedule](https://github.com/octue/octue-sdk-python/issues/415).
This constitutes a breaking change.

## Developer notes

### Installation

We use [Poetry](https://python-poetry.org/) as our package manager. For development, run the following from the
repository root, which will editably install the package:

Expand All @@ -76,18 +82,24 @@ poetry install --all-extras
Then run the tests to check everything's working.

### Testing

These environment variables need to be set to run the tests:
* `GOOGLE_APPLICATION_CREDENTIALS=/absolute/path/to/service/account/file.json`
* `TEST_PROJECT_NAME=<name-of-google-cloud-project-to-run-pub-sub-tests-on>`

- `GOOGLE_APPLICATION_CREDENTIALS=/absolute/path/to/service/account/file.json`
- `TEST_PROJECT_NAME=<name-of-google-cloud-project-to-run-pub-sub-tests-on>`

Then, from the repository root, run

```shell
python3 -m unittest
```

or

```shell
tox
```

## Contributing

Take a look at our [contributing](/docs/contributing.md) page.
7 changes: 3 additions & 4 deletions docs/source/asking_questions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Asking a question
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)

answer = child.ask(
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
input_manifest=manifest,
)
Expand Down Expand Up @@ -104,7 +104,6 @@ access the event store and run:
**Options**

- ``kind`` - Only retrieve this kind of event if present (e.g. "result")
- ``include_attributes`` - If ``True``, retrieve all the events' attributes as well
- ``include_backend_metadata`` - If ``True``, retrieve information about the service backend that produced the event
- ``limit`` - If set to a positive integer, limit the number of events returned to this

Expand Down Expand Up @@ -232,7 +231,7 @@ this:

.. code-block:: python

answer = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})
answer, question_uuid = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})

if your app configuration file is:

Expand Down Expand Up @@ -323,7 +322,7 @@ then you can override them like this:

.. code-block:: python

answer = child.ask(
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
children=[
{
Expand Down
4 changes: 2 additions & 2 deletions docs/source/manifest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Get an Octue service to analyse data for you as part of a larger analysis.
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)

answer = child.ask(input_manifest=manifest)
answer, question_uuid = child.ask(input_manifest=manifest)

See :doc:`here <asking_questions>` for more information.

Expand Down Expand Up @@ -108,7 +108,7 @@ the cloud and then download them again for each service (as would happen with cl
}
)

analysis.children["wind_speed"].ask(
answer, question_uuid = analysis.children["wind_speed"].ask(
input_values=analysis.input_values,
input_manifest=analysis.input_manifest,
allow_local_files=True,
Expand Down
8 changes: 4 additions & 4 deletions docs/source/testing_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Instantiating a child emulator in python
def handle_monitor_message(message):
...

result = child_emulator.ask(
result, question_uuid = child_emulator.ask(
input_values={"hello": "world"},
handle_monitor_message=handle_monitor_message,
)
Expand Down Expand Up @@ -133,7 +133,7 @@ You can then instantiate a child emulator from this in python:
def handle_monitor_message(message):
...

result = child_emulator.ask(
result, question_uuid = child_emulator.ask(
input_values={"hello": "world"},
handle_monitor_message=handle_monitor_message,
)
Expand Down Expand Up @@ -226,7 +226,7 @@ child.
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)

result = child.ask(input_values=[1, 2, 3, 4])
result, question_uuid = child.ask(input_values=[1, 2, 3, 4])

child.received_events
>>> [
Expand Down Expand Up @@ -260,6 +260,6 @@ You can then feed these into a child emulator to emulate one possible response o
child_emulator = ChildEmulator(events=child.received_events)

child_emulator.ask(input_values=[1, 2, 3, 4])
>>> {"some": "results"}
>>> {"some": "results"}, "9cab579f-c486-4324-ac9b-96491d26266b"

You can also create test fixtures from :ref:`downloaded service crash diagnostics <test_fixtures_from_crash_diagnostics>`.
2 changes: 1 addition & 1 deletion docs/source/troubleshooting_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ For example:
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)

answer = child.ask(
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
save_diagnostics="SAVE_DIAGNOSTICS_OFF",
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.11-slim
FROM windpioneers/gdal-python:modest-heron-gdal-2.4.1-python-3.11-slim

# Ensure print statements and log messages appear promptly in Cloud Logging.
ENV PYTHONUNBUFFERED True
Expand Down
8 changes: 5 additions & 3 deletions octue/cloud/emulators/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ def ask(
:param bool asynchronous: if `True`, don't create an answer subscription
:param float timeout: time in seconds to wait for an answer before raising a timeout error
:raise TimeoutError: if the timeout is exceeded while waiting for an answer
:return dict: a dictionary containing the keys "output_values" and "output_manifest"
:return dict, str: a dictionary containing the keys "output_values" and "output_manifest", and the question UUID
"""
with ServicePatcher():
self._child.serve(allow_existing=True)

subscription, _ = self._parent.ask(
subscription, question_uuid = self._parent.ask(
service_id=self._child.id,
input_values=input_values,
input_manifest=input_manifest,
Expand All @@ -141,13 +141,15 @@ def ask(
asynchronous=asynchronous,
)

return self._parent.wait_for_answer(
answer = self._parent.wait_for_answer(
subscription,
handle_monitor_message=handle_monitor_message,
record_events=record_events,
timeout=timeout,
)

return answer, question_uuid

def _emulate_analysis(
self,
analysis_id,
Expand Down
107 changes: 74 additions & 33 deletions octue/cloud/pub_sub/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,46 @@
from google.cloud.bigquery import Client, QueryJobConfig, ScalarQueryParameter

from octue.cloud.events.validation import VALID_EVENT_KINDS
from octue.exceptions import ServiceNotFound
from octue.resources import Manifest


def get_events(
table_id,
sender,
question_uuid,
kind=None,
include_attributes=False,
include_backend_metadata=False,
limit=1000,
):
def get_events(table_id, sender, question_uuid, kind=None, include_backend_metadata=False, limit=1000):
"""Get Octue service events for a question from a sender from a Google BigQuery event store.

:param str table_id: the full ID of the table e.g. "your-project.your-dataset.your-table"
:param str sender: the SRUID of the sender of the events
:param str question_uuid: the UUID of the question to get the events for
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
:param bool include_attributes: if `True`, include events' attributes (excluding question UUID)
:param bool include_backend_metadata: if `True`, include the service backend metadata
:param int limit: the maximum number of events to return
:raise ValueError: if the `kind` parameter is invalid
:raise octue.exceptions.ServiceNotFound: if the sender hasn't emitted any events related to the question UUID (or any events at all)
:return list(dict): the events for the question
"""
if kind:
if kind not in VALID_EVENT_KINDS:
raise ValueError(f"`kind` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")

event_kind_condition = [f'AND JSON_EXTRACT_SCALAR(event, "$.kind") = "{kind}"']
event_kind_condition = [f"AND kind={kind!r}"]
else:
event_kind_condition = []

client = Client()
fields = ["`event`"]

if include_attributes:
fields.extend(
(
"`datetime`",
"`uuid`",
"`originator`",
"`sender`",
"`sender_type`",
"`sender_sdk_version`",
"`recipient`",
"`order`",
"`other_attributes`",
)
)

fields = [
"`event`",
"`kind`",
"`datetime`",
"`uuid`",
"`originator`",
"`sender`",
"`sender_type`",
"`sender_sdk_version`",
"`recipient`",
"`order`",
"`other_attributes`",
]

if include_backend_metadata:
fields.extend(("`backend`", "`backend_metadata`"))
Expand All @@ -74,16 +67,64 @@ def get_events(
)

query_job = client.query(query, job_config=job_config)
rows = query_job.result()
df = rows.to_dataframe()
result = query_job.result()

if result.total_rows == 0:
raise ServiceNotFound(
f"No events found. The requested sender {sender!r} may not exist or it hasn't emitted any events for "
f"question {question_uuid!r} (or any events at all)."
)

df = result.to_dataframe()

# Convert JSON strings to python primitives.
df["event"] = df["event"].map(json.loads)

if "other_attributes" in df:
df["other_attributes"] = df["other_attributes"].map(json.loads)
df["event"].apply(_deserialise_manifest_if_present)
df["other_attributes"] = df["other_attributes"].map(json.loads)

if "backend_metadata" in df:
df["backend_metadata"] = df["backend_metadata"].map(json.loads)

return df.to_dict(orient="records")
events = df.to_dict(orient="records")
return _unflatten_events(events)


def _deserialise_manifest_if_present(event):
"""If the event is a "question" or "result" event and a manifest is present, deserialise the manifest and replace
the serialised manifest with it.

:param dict event: an Octue service event
:return None:
"""
manifest_keys = {"input_manifest", "output_manifest"}

for key in manifest_keys:
if key in event:
event[key] = Manifest.deserialise(event[key])
# Only one of the manifest types will be in the event, so return if one is found.
return


def _unflatten_events(events):
"""Convert the events and attributes from the flat structure of the BigQuery table into the nested structure of the
service communication schema.

:param list(dict) events: flattened events
:return list(dict): unflattened events
"""
for event in events:
event["event"]["kind"] = event.pop("kind")

event["attributes"] = {
"datetime": event.pop("datetime").isoformat(),
"uuid": event.pop("uuid"),
"originator": event.pop("originator"),
"sender": event.pop("sender"),
"sender_type": event.pop("sender_type"),
"sender_sdk_version": event.pop("sender_sdk_version"),
"recipient": event.pop("recipient"),
"order": event.pop("order"),
**event.pop("other_attributes"),
}

return events
Loading
Loading