Skip to content

Commit

Permalink
135 stream to murko (#150)
Browse files Browse the repository at this point in the history
* Initial murko callback and centring plan

* Add callback to stream info to murko

* Use ophyd_async OAV device to send image data

* Clean up based on beamline testing

* Added documentation

* Add type dependencies and fix docs

---------

Co-authored-by: David Perl <115003895+dperl-dls@users.noreply.github.com>
  • Loading branch information
DominicOram and dperl-dls authored Aug 15, 2024
1 parent f210ae4 commit 315d2e0
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ repos:
hooks:
- id: mypy
files: 'src/.*\.py$'
additional_dependencies: [types-requests]
additional_dependencies: [types-requests, types-redis]
args: ["--ignore-missing-imports", "--no-strict-optional"]
11 changes: 11 additions & 0 deletions docs/developer/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ Documentation is split general and topic-specific sections. Each section is spli

Documentation relating to serial crystallography on I24

.. grid-item-card:: :material-regular:`apps;3em`

.. toctree::
:caption: Murko Integration
:maxdepth: 1

murko-integration/index

+++

Documentation relating to integrating the murko image recognition system
18 changes: 18 additions & 0 deletions docs/developer/murko-integration/explanations/architecture.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Murko Architecture
------------------

The architecture of how Murko is integrated is still in flux but as of 07/08 the following has been tried on the beamline.

.. image:: ../images/murko_setup.drawio.png

The mx_bluesky code is deployed into the `beamline kubernetes cluster <https://k8s-i04.diamond.ac.uk/>`_ behind a `blueAPI <https://github.com/DiamondLightSource/blueapi>`_ REST interface. Alongside this an instance of murko is also deployed to the k8s cluster.

When GDA reaches a point where it will thaw the pin (usually just after robot load), if the ``gda.mx.bluesky.thaw`` property has been set to True it will instead call out to the thaw_and_center plan inside mx_bluesky.

The thawing plan will first create a ``MurkoCallback`` callback, this will stream metadata into a key in a redis database running on i04-control (this should be moved in `#145 <https://github.com/DiamondLightSource/mx_bluesky/issues/145>`_).

It will then trigger the ``OAVToRedisForwarder`` device in ``dodal`` that will stream image data to redis in the form of pickled RGB numpy arrays. Each image is given a uuid by this device, which is used to correlate the images with the other metadata, which could be streamed with a different frequency.

The image streaming must be done with an ophyd device as there is too much data for it all to be emitted in bluesky documents.

When the data is entered into redis it will publish a message to the redis ``murko`` channel. This will get picked up by the `socket_handler <https://github.com/DiamondLightSource/mx_auto_mjpeg_capture/tree/main/socket_handler>`_, which will forward the data to murko. Currently, during testing the socket_handler is just manually run on a workstation, `#146 <https://github.com/DiamondLightSource/mx_bluesky/issues/146>`_ should fix this.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions docs/developer/murko-integration/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Murko Integration
=================

The `Murko system <https://github.com/MartinSavko/murko>`_ uses ML to analyse images of sample from optical cameras and determine where the sample is.

Its integration at DLS, using Bluesky, is still a work in progress but this documentation aims to give an overview of how it works.


.. grid:: 2
:gutter: 2

.. grid-item-card:: :material-regular:`apartment;3em`

.. toctree::
:caption: Explanations
:maxdepth: 1

explanations/architecture

+++

Explanations of how and why the architecture is why it is.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ dependencies = [
"requests",
"opencv-python",
"pydantic",
"dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@7e01eeaf18046eadf4d513c79b06e0a11d616776",
"fastapi[all]<0.113",
"dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git",
"fastapi[all]",
"blueapi @ git+https://github.com/DiamondLightSource/blueapi.git@main",
]
dynamic = ["version"]
Expand Down
4 changes: 2 additions & 2 deletions src/mx_bluesky/i04/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from mx_bluesky.i04.thawing_plan import thaw
from mx_bluesky.i04.thawing_plan import thaw, thaw_and_center

__all__ = ["thaw"]
__all__ = ["thaw", "thaw_and_center"]
46 changes: 46 additions & 0 deletions src/mx_bluesky/i04/callbacks/murko_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import copy
import json
from typing import Optional

from bluesky.callbacks import CallbackBase
from dodal.log import LOGGER
from event_model.documents import Event, RunStart
from redis import StrictRedis


class MurkoCallback(CallbackBase):
def __init__(self, redis_host: str, redis_password: str, redis_db: int = 0):
self.redis_client = StrictRedis(
host=redis_host, password=redis_password, db=redis_db
)
self.last_uuid = None

def start(self, doc: RunStart) -> Optional[RunStart]:
self.murko_metadata = {
"zoom_percentage": doc.get("zoom_percentage"),
"microns_per_x_pixel": doc.get("microns_per_x_pixel"),
"microns_per_y_pixel": doc.get("microns_per_y_pixel"),
"beam_centre_i": doc.get("beam_centre_i"),
"beam_centre_j": doc.get("beam_centre_j"),
"sample_id": doc.get("sample_id"),
}
self.last_uuid = None
return doc

def event(self, doc: Event) -> Event:
if latest_omega := doc["data"].get("smargon-omega"):
if self.last_uuid is not None:
self.call_murko(self.last_uuid, latest_omega)
elif (uuid := doc["data"].get("oav_to_redis_forwarder-uuid")) is not None:
self.last_uuid = uuid
return doc

def call_murko(self, uuid: str, omega: float):
metadata = copy.deepcopy(self.murko_metadata)
metadata["omega_angle"] = omega
metadata["uuid"] = uuid

# Send metadata to REDIS and trigger murko
self.redis_client.hset("test-metadata", uuid, json.dumps(metadata))
self.redis_client.publish("murko", json.dumps(metadata))
LOGGER.info("Metadata sent to redis")
46 changes: 44 additions & 2 deletions src/mx_bluesky/i04/thawing_plan.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,58 @@
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky.preprocessors import run_decorator, subs_decorator
from dls_bluesky_core.core import MsgGenerator
from dodal.beamlines.i04 import MURKO_REDIS_DB, REDIS_HOST, REDIS_PASSWORD
from dodal.common import inject
from dodal.devices.oav.oav_detector import OAV
from dodal.devices.oav.oav_to_redis_forwarder import OAVToRedisForwarder
from dodal.devices.robot import BartRobot
from dodal.devices.smargon import Smargon
from dodal.devices.thawer import Thawer, ThawerStates

from mx_bluesky.i04.callbacks.murko_callback import MurkoCallback


def thaw_and_center(
time_to_thaw: float,
rotation: float = 360,
robot: BartRobot = inject("robot"),
thawer: Thawer = inject("thawer"),
smargon: Smargon = inject("smargon"),
oav: OAV = inject("oav"),
oav_to_redis_forwarder: OAVToRedisForwarder = inject("oav_to_redis_forwarder"),
) -> MsgGenerator:
zoom_percentage = yield from bps.rd(oav.zoom_controller.percentage)
sample_id = yield from bps.rd(robot.sample_id)

yield from bps.abs_set(oav.zoom_controller.level, "1.0x", wait=True)

@subs_decorator(MurkoCallback(REDIS_HOST, REDIS_PASSWORD, MURKO_REDIS_DB))
@run_decorator(
md={
"microns_per_x_pixel": oav.parameters.micronsPerXPixel,
"microns_per_y_pixel": oav.parameters.micronsPerYPixel,
"beam_centre_i": oav.parameters.beam_centre_i,
"beam_centre_j": oav.parameters.beam_centre_j,
"zoom_percentage": zoom_percentage,
"sample_id": sample_id,
}
)
def _thaw_and_center():
yield from bps.kickoff(oav_to_redis_forwarder, wait=True)
yield from bps.monitor(smargon.omega.user_readback, name="smargon")
yield from bps.monitor(oav_to_redis_forwarder.uuid, name="oav")
yield from thaw(time_to_thaw, rotation, thawer, smargon)
yield from bps.complete(oav_to_redis_forwarder)

yield from _thaw_and_center()


def thaw(
time_to_thaw: float,
rotation: float = 360,
thawer: Thawer = inject("thawer"), # type: ignore
smargon: Smargon = inject("smargon"), # type: ignore
thawer: Thawer = inject("thawer"),
smargon: Smargon = inject("smargon"),
) -> MsgGenerator:
"""Rotates the sample and thaws it at the same time.
Expand Down
107 changes: 107 additions & 0 deletions tests/i04/callbacks/test_murko_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import json
from unittest.mock import MagicMock

import pytest
from event_model import Event

from mx_bluesky.i04.callbacks.murko_callback import MurkoCallback

test_oav_uuid = "UUID"
test_smargon_data = 90


def event_template(data_key, data_value) -> Event:
return {
"descriptor": "bd45c2e5-2b85-4280-95d7-a9a15800a78b",
"time": 1666604299.828203,
"data": {data_key: data_value},
"timestamps": {data_key: 1666604299.8220396},
"seq_num": 1,
"uid": "29033ecf-e052-43dd-98af-c7cdd62e8173",
"filled": {},
}


test_oav_event = event_template("oav_to_redis_forwarder-uuid", test_oav_uuid)
test_smargon_event = event_template("smargon-omega", test_smargon_data)

test_start_document = {
"uid": "event_uuid",
"zoom_percentage": 80,
"microns_per_x_pixel": 1.2,
"microns_per_y_pixel": 2.5,
"beam_centre_i": 158,
"beam_centre_j": 452,
"sample_id": 12345,
}


@pytest.fixture
def murko_callback() -> MurkoCallback:
callback = MurkoCallback("", "")
callback.redis_client = MagicMock()
return callback


@pytest.fixture
def murko_with_mock_call(murko_callback) -> MurkoCallback:
murko_callback.call_murko = MagicMock()
return murko_callback


def test_when_oav_data_arrives_then_murko_not_called(
murko_with_mock_call: MurkoCallback,
):
murko_with_mock_call.event(test_oav_event)
murko_with_mock_call.call_murko.assert_not_called() # type: ignore


def test_when_smargon_data_arrives_then_murko_not_called(
murko_with_mock_call: MurkoCallback,
):
murko_with_mock_call.event(test_smargon_event)
murko_with_mock_call.call_murko.assert_not_called() # type: ignore


def test_when_smargon_data_arrives_before_oav_data_then_murko_not_called(
murko_with_mock_call: MurkoCallback,
):
murko_with_mock_call.event(test_smargon_event)
murko_with_mock_call.event(test_oav_event)
murko_with_mock_call.call_murko.assert_not_called() # type: ignore


def test_when_smargon_data_arrives_before_oav_data_then_murko_called_with_smargon_data(
murko_with_mock_call: MurkoCallback,
):
murko_with_mock_call.event(test_oav_event)
murko_with_mock_call.event(test_smargon_event)
murko_with_mock_call.call_murko.assert_called_once_with( # type: ignore
test_oav_uuid, test_smargon_data
)


def test_when_murko_called_with_event_data_then_meta_data_put_in_redis(
murko_callback: MurkoCallback,
):
murko_callback.start(test_start_document) # type: ignore
murko_callback.event(test_oav_event)
murko_callback.event(test_smargon_event)

expected_metadata = {
"zoom_percentage": 80,
"microns_per_x_pixel": 1.2,
"microns_per_y_pixel": 2.5,
"beam_centre_i": 158,
"beam_centre_j": 452,
"sample_id": 12345,
"omega_angle": test_smargon_data,
"uuid": test_oav_uuid,
}

murko_callback.redis_client.hset.assert_called_once_with( # type: ignore
"test-metadata", test_oav_uuid, json.dumps(expected_metadata)
)
murko_callback.redis_client.publish.assert_called_once_with( # type: ignore
"murko", json.dumps(expected_metadata)
)
Loading

0 comments on commit 315d2e0

Please sign in to comment.