diff --git a/src/dodal/beamlines/i03.py b/src/dodal/beamlines/i03.py index 5921d7b23a..958bf42a67 100644 --- a/src/dodal/beamlines/i03.py +++ b/src/dodal/beamlines/i03.py @@ -25,7 +25,8 @@ from dodal.devices.flux import Flux from dodal.devices.focusing_mirror import FocusingMirrorWithStripes, MirrorVoltages from dodal.devices.motors import XYZPositioner -from dodal.devices.oav.oav_detector import OAV, OAVConfigParams +from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_parameters import OAVConfig from dodal.devices.oav.pin_image_recognition import PinTipDetection from dodal.devices.qbpm import QBPM from dodal.devices.robot import BartRobot @@ -227,7 +228,7 @@ def panda_fast_grid_scan( def oav( wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False, - params: OAVConfigParams | None = None, + params: OAVConfig | None = None, ) -> OAV: """Get the i03 OAV device, instantiate it if it hasn't already been. If this is called when already instantiated in i03, it will return the existing object. @@ -235,10 +236,10 @@ def oav( return device_instantiation( OAV, "oav", - "", + "-DI-OAV-01:", wait_for_connection, fake_with_ophyd_sim, - params=params or OAVConfigParams(ZOOM_PARAMS_FILE, DISPLAY_CONFIG), + config=params or OAVConfig(ZOOM_PARAMS_FILE, DISPLAY_CONFIG), ) diff --git a/src/dodal/beamlines/i04.py b/src/dodal/beamlines/i04.py index 97f53e16c7..a2be702c0a 100644 --- a/src/dodal/beamlines/i04.py +++ b/src/dodal/beamlines/i04.py @@ -18,7 +18,8 @@ from dodal.devices.i04.transfocator import Transfocator from dodal.devices.ipin import IPin from dodal.devices.motors import XYZPositioner -from dodal.devices.oav.oav_detector import OAV, OAVConfigParams +from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_parameters import OAVConfig from dodal.devices.oav.oav_to_redis_forwarder import OAVToRedisForwarder from dodal.devices.robot import BartRobot from dodal.devices.s4_slit_gaps import S4SlitGaps @@ -344,17 +345,21 @@ def zebra(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) - @skip_device(lambda: BL == "s04") -def oav(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) -> OAV: +def oav( + wait_for_connection: bool = True, + fake_with_ophyd_sim: bool = False, + params: OAVConfig | None = None, +) -> OAV: """Get the i04 OAV device, instantiate it if it hasn't already been. If this is called when already instantiated in i04, it will return the existing object. """ return device_instantiation( OAV, "oav", - "", + "-DI-OAV-01:", wait_for_connection, fake_with_ophyd_sim, - params=OAVConfigParams(ZOOM_PARAMS_FILE, DISPLAY_CONFIG), + config=params or OAVConfig(ZOOM_PARAMS_FILE, DISPLAY_CONFIG), ) diff --git a/src/dodal/beamlines/i24.py b/src/dodal/beamlines/i24.py index 7cba0a0e64..353eab6063 100644 --- a/src/dodal/beamlines/i24.py +++ b/src/dodal/beamlines/i24.py @@ -13,7 +13,7 @@ from dodal.devices.i24.i24_detector_motion import DetectorMotion from dodal.devices.i24.i24_vgonio import VGonio from dodal.devices.i24.pmac import PMAC -from dodal.devices.oav.oav_async import OAV +from dodal.devices.oav.oav_detector import OAV from dodal.devices.oav.oav_parameters import OAVConfig from dodal.devices.zebra import Zebra from dodal.log import set_beamline as set_log_beamline diff --git a/src/dodal/devices/areadetector/plugins/CAM.py b/src/dodal/devices/areadetector/plugins/CAM.py new file mode 100644 index 0000000000..88fd418da7 --- /dev/null +++ b/src/dodal/devices/areadetector/plugins/CAM.py @@ -0,0 +1,31 @@ +from enum import Enum + +from ophyd_async.core import StandardReadable +from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw + + +class ColorMode(str, Enum): + """ + Enum to store the various color modes of the camera. We use RGB1. + """ + + MONO = "Mono" + BAYER = "Bayer" + RGB1 = "RGB1" + RGB2 = "RGB2" + RGB3 = "RGB3" + YUV444 = "YUV444" + YUV422 = "YUV422" + YUV421 = "YUV421" + + +class Cam(StandardReadable): + def __init__(self, prefix: str, name: str = "") -> None: + self.color_mode = epics_signal_rw(ColorMode, prefix + "ColorMode") + self.acquire_period = epics_signal_rw(float, prefix + "AcquirePeriod") + self.acquire_time = epics_signal_rw(float, prefix + "AcquireTime") + self.gain = epics_signal_rw(float, prefix + "Gain") + + self.array_size_x = epics_signal_r(int, prefix + "ArraySizeX_RBV") + self.array_size_y = epics_signal_r(int, prefix + "ArraySizeY_RBV") + super().__init__(name) diff --git a/src/dodal/devices/areadetector/plugins/MJPG.py b/src/dodal/devices/areadetector/plugins/MJPG.py index b82d5dd10f..202d3caf2c 100644 --- a/src/dodal/devices/areadetector/plugins/MJPG.py +++ b/src/dodal/devices/areadetector/plugins/MJPG.py @@ -1,138 +1,83 @@ -import os -import threading from abc import ABC, abstractmethod from io import BytesIO from pathlib import Path -import requests -from ophyd import Component, Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Signal -from PIL import Image, ImageDraw +import aiofiles +from aiohttp import ClientSession +from bluesky.protocols import Triggerable +from ophyd_async.core import AsyncStatus, StandardReadable, soft_signal_rw +from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw +from PIL import Image -from dodal.devices.oav.oav_parameters import OAVConfigParams from dodal.log import LOGGER +IMG_FORMAT = "png" -class MJPG(Device, ABC): + +async def asyncio_save_image(image: Image.Image, path: str): + buffer = BytesIO() + image.save(buffer, format=IMG_FORMAT) + async with aiofiles.open(path, "wb") as fh: + await fh.write(buffer.getbuffer()) + + +class MJPG(StandardReadable, Triggerable, ABC): """The MJPG areadetector plugin creates an MJPG video stream of the camera's output. This devices uses that stream to grab images. When it is triggered it will send the latest image from the stream to the `post_processing` method for child classes to handle. """ - filename = Component(Signal) - directory = Component(Signal) - last_saved_path = Component(Signal) - url = Component(EpicsSignal, "JPG_URL_RBV", string=True) - x_size = Component(EpicsSignalRO, "ArraySize1_RBV") - y_size = Component(EpicsSignalRO, "ArraySize2_RBV") - input_rbpv = Component(EpicsSignalRO, "NDArrayPort_RBV") - input_plugin = Component(EpicsSignal, "NDArrayPort") + def __init__(self, prefix: str, name: str = "") -> None: + self.url = epics_signal_rw(str, prefix + "JPG_URL_RBV") - # scaling factors for the snapshot at the time it was triggered - microns_per_pixel_x = Component(Signal) - microns_per_pixel_y = Component(Signal) + self.x_size = epics_signal_r(int, prefix + "ArraySize1_RBV") + self.y_size = epics_signal_r(int, prefix + "ArraySize2_RBV") - oav_params: OAVConfigParams | None = None + with self.add_children_as_readables(): + self.filename = soft_signal_rw(str) + self.directory = soft_signal_rw(str) + self.last_saved_path = soft_signal_rw(str) - KICKOFF_TIMEOUT: float = 30.0 + self.KICKOFF_TIMEOUT = 30.0 - def _save_image(self, image: Image.Image): - """A helper function to save a given image to the path supplied by the directory - and filename signals. The full resultant path is put on the last_saved_path signal + super().__init__(name) + + async def _save_image(self, image: Image.Image): + """A helper function to save a given image to the path supplied by the \ + directory and filename signals. The full resultant path is put on the \ + last_saved_path signal """ - filename_str = self.filename.get() - directory_str: str = self.directory.get() # type: ignore + filename_str = await self.filename.get_value() + directory_str = await self.directory.get_value() - path = Path(f"{directory_str}/{filename_str}.png").as_posix() - if not os.path.isdir(Path(directory_str)): + path = Path(f"{directory_str}/{filename_str}.{IMG_FORMAT}").as_posix() + if not Path(directory_str).is_dir(): LOGGER.info(f"Snapshot folder {directory_str} does not exist, creating...") - os.mkdir(directory_str) + Path(directory_str).mkdir(parents=True) LOGGER.info(f"Saving image to {path}") - image.save(path) - self.last_saved_path.put(path) - def trigger(self): + await asyncio_save_image(image, path) + + await self.last_saved_path.set(path, wait=True) + + @AsyncStatus.wrap + async def trigger(self): """This takes a snapshot image from the MJPG stream and send it to the post_processing method, expected to be implemented by a child of this class. - It is the responsibility of the child class to save any resulting images. + It is the responsibility of the child class to save any resulting images by \ + calling _save_image. """ - st = DeviceStatus(device=self, timeout=self.KICKOFF_TIMEOUT) - url_str = self.url.get() + url_str = await self.url.get_value() - assert isinstance( - self.oav_params, OAVConfigParams - ), "MJPG does not have valid OAV parameters" - self.microns_per_pixel_x.set(self.oav_params.micronsPerXPixel) - self.microns_per_pixel_y.set(self.oav_params.micronsPerYPixel) - - def get_snapshot(): - try: - response = requests.get(url_str, stream=True) - response.raise_for_status() - with Image.open(BytesIO(response.content)) as image: - self.post_processing(image) - st.set_finished() - except requests.HTTPError as e: - st.set_exception(e) - - threading.Thread(target=get_snapshot, daemon=True).start() - - return st + async with ClientSession(raise_for_status=True) as session: + async with session.get(url_str) as response: + data = await response.read() + with Image.open(BytesIO(data)) as image: + await self.post_processing(image) @abstractmethod - def post_processing(self, image: Image.Image): + async def post_processing(self, image: Image.Image): pass - - -class SnapshotWithBeamCentre(MJPG): - """A child of MJPG which, when triggered, draws an outlined crosshair at the beam - centre in the image and saves the image to disk.""" - - CROSSHAIR_LENGTH_PX = 20 - CROSSHAIR_OUTLINE_COLOUR = "Black" - CROSSHAIR_FILL_COLOUR = "White" - - def post_processing(self, image: Image.Image): - assert ( - self.oav_params is not None - ), "Snapshot device does not have valid OAV parameters" - beam_x = self.oav_params.beam_centre_i - beam_y = self.oav_params.beam_centre_j - - SnapshotWithBeamCentre.draw_crosshair(image, beam_x, beam_y) - - self._save_image(image) - - @classmethod - def draw_crosshair(cls, image: Image.Image, beam_x: int, beam_y: int): - draw = ImageDraw.Draw(image) - OUTLINE_WIDTH = 1 - HALF_LEN = cls.CROSSHAIR_LENGTH_PX / 2 - draw.rectangle( - [ - beam_x - OUTLINE_WIDTH, - beam_y - HALF_LEN - OUTLINE_WIDTH, - beam_x + OUTLINE_WIDTH, - beam_y + HALF_LEN + OUTLINE_WIDTH, - ], - fill=cls.CROSSHAIR_OUTLINE_COLOUR, - ) - draw.rectangle( - [ - beam_x - HALF_LEN - OUTLINE_WIDTH, - beam_y - OUTLINE_WIDTH, - beam_x + HALF_LEN + OUTLINE_WIDTH, - beam_y + OUTLINE_WIDTH, - ], - fill=cls.CROSSHAIR_OUTLINE_COLOUR, - ) - draw.line( - ((beam_x, beam_y - HALF_LEN), (beam_x, beam_y + HALF_LEN)), - fill=cls.CROSSHAIR_FILL_COLOUR, - ) - draw.line( - ((beam_x - HALF_LEN, beam_y), (beam_x + HALF_LEN, beam_y)), - fill=cls.CROSSHAIR_FILL_COLOUR, - ) diff --git a/src/dodal/devices/oav/oav_async.py b/src/dodal/devices/oav/oav_async.py deleted file mode 100644 index bc7f12962f..0000000000 --- a/src/dodal/devices/oav/oav_async.py +++ /dev/null @@ -1,118 +0,0 @@ -from enum import IntEnum - -from ophyd_async.core import ( - AsyncStatus, - StandardReadable, -) -from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw - -from dodal.common.signal_utils import create_hardware_backed_soft_signal -from dodal.devices.oav.oav_parameters import DEFAULT_OAV_WINDOW, OAVConfig - - -class Coords(IntEnum): - X = 0 - Y = 1 - - -# Workaround to deal with the fact that beamlines may have slightly different string -# descriptions of the zoom level" -def _get_correct_zoom_string(zoom: str) -> str: - if zoom.endswith("x"): - zoom = zoom.strip("x") - return zoom - - -class ZoomController(StandardReadable): - """ - Device to control the zoom level. This should be set like - o = OAV(name="oav") - oav.zoom_controller.set("1.0x") - - Note that changing the zoom may change the AD wiring on the associated OAV, as such - you should wait on any zoom changs to finish before changing the OAV wiring. - """ - - def __init__(self, prefix: str, name: str = "") -> None: - super().__init__(name=name) - self.percentage = epics_signal_rw(float, f"{prefix}ZOOMPOSCMD") - - # Level is the string description of the zoom level e.g. "1.0x" or "1.0" - self.level = epics_signal_rw(str, f"{prefix}MP:SELECT") - - @AsyncStatus.wrap - async def set(self, level_to_set: str): - await self.level.set(level_to_set, wait=True) - - -class OAV(StandardReadable): - def __init__(self, prefix: str, config: OAVConfig, name: str = ""): - _bl_prefix = prefix.split("-")[0] - self.zoom_controller = ZoomController(f"{_bl_prefix}-EA-OAV-01:FZOOM:", name) - - # TODO See https://github.com/DiamondLightSource/dodal/issues/824 - self.x_size = epics_signal_r(int, prefix + "CAM:ArraySizeX_RBV") - self.y_size = epics_signal_r(int, prefix + "CAM:ArraySizeY_RBV") - - self.sizes = [self.x_size, self.y_size] - - self.parameters = config.get_parameters() - - self.microns_per_pixel_x = create_hardware_backed_soft_signal( - float, - lambda: self._get_microns_per_pixel(Coords.X), - ) - self.microns_per_pixel_y = create_hardware_backed_soft_signal( - float, - lambda: self._get_microns_per_pixel(Coords.Y), - ) - - self.beam_centre_i = create_hardware_backed_soft_signal( - int, lambda: self._get_beam_position(Coords.X) - ) - - self.beam_centre_j = create_hardware_backed_soft_signal( - int, lambda: self._get_beam_position(Coords.Y) - ) - - super().__init__(name) - - async def _read_current_zoom(self) -> str: - _zoom = await self.zoom_controller.level.get_value() - return _get_correct_zoom_string(_zoom) - - async def _get_microns_per_pixel(self, coord: int) -> float: - """Extracts the microns per x pixel and y pixel for a given zoom level.""" - _zoom = await self._read_current_zoom() - value = self.parameters[_zoom].microns_per_pixel[coord] - size = await self.sizes[coord].get_value() - return value * DEFAULT_OAV_WINDOW[coord] / size - - async def _get_beam_position(self, coord: int) -> int: - """Extracts the beam location in pixels `xCentre` `yCentre`, for a requested \ - zoom level. """ - _zoom = await self._read_current_zoom() - value = self.parameters[_zoom].crosshair[coord] - size = await self.sizes[coord].get_value() - return int(value * size / DEFAULT_OAV_WINDOW[coord]) - - async def calculate_beam_distance( - self, horizontal_pixels: int, vertical_pixels: int - ) -> tuple[int, int]: - """ - Calculates the distance between the beam centre and the given (horizontal, vertical). - - Args: - horizontal_pixels (int): The x (camera coordinates) value in pixels. - vertical_pixels (int): The y (camera coordinates) value in pixels. - Returns: - The distance between the beam centre and the (horizontal, vertical) point in pixels as a tuple - (horizontal_distance, vertical_distance). - """ - beam_x = await self.beam_centre_i.get_value() - beam_y = await self.beam_centre_j.get_value() - - return ( - beam_x - horizontal_pixels, - beam_y - vertical_pixels, - ) diff --git a/src/dodal/devices/oav/oav_calculations.py b/src/dodal/devices/oav/oav_calculations.py index 6796e3a411..122a2cd68c 100644 --- a/src/dodal/devices/oav/oav_calculations.py +++ b/src/dodal/devices/oav/oav_calculations.py @@ -37,3 +37,25 @@ def camera_coordinates_to_xyz( z = vertical * sine return np.array([x, y, z], dtype=np.float64) + + +def calculate_beam_distance( + beam_centre: tuple[int, int], + horizontal_pixels: int, + vertical_pixels: int, +) -> tuple[int, int]: + """ + Calculates the distance between the beam centre and the given (horizontal, vertical). + + Args: + horizontal_pixels (int): The x (camera coordinates) value in pixels. + vertical_pixels (int): The y (camera coordinates) value in pixels. + Returns: + The distance between the beam centre and the (horizontal, vertical) point in pixels as a tuple + (horizontal_distance, vertical_distance). + """ + beam_x, beam_y = beam_centre + return ( + beam_x - horizontal_pixels, + beam_y - vertical_pixels, + ) diff --git a/src/dodal/devices/oav/oav_detector.py b/src/dodal/devices/oav/oav_detector.py index e6c37610aa..76443e2801 100644 --- a/src/dodal/devices/oav/oav_detector.py +++ b/src/dodal/devices/oav/oav_detector.py @@ -1,26 +1,35 @@ -# type: ignore # OAV will soon be ophyd-async, see https://github.com/DiamondLightSource/dodal/issues/716 -from functools import partial - -from ophyd import ADComponent as ADC -from ophyd import ( - AreaDetector, - CamBase, - Component, - Device, - EpicsSignal, - HDF5Plugin, - OverlayPlugin, - ProcessPlugin, - ROIPlugin, - StatusBase, -) - -from dodal.devices.areadetector.plugins.MJPG import SnapshotWithBeamCentre -from dodal.devices.oav.grid_overlay import SnapshotWithGrid -from dodal.devices.oav.oav_parameters import OAVConfigParams - - -class ZoomController(Device): +from enum import IntEnum + +from ophyd_async.core import DEFAULT_TIMEOUT, AsyncStatus, StandardReadable +from ophyd_async.epics.signal import epics_signal_rw + +from dodal.common.signal_utils import create_hardware_backed_soft_signal +from dodal.devices.areadetector.plugins.CAM import Cam +from dodal.devices.oav.oav_parameters import DEFAULT_OAV_WINDOW, OAVConfig +from dodal.devices.oav.snapshots.snapshot_with_beam_centre import SnapshotWithBeamCentre +from dodal.devices.oav.snapshots.snapshot_with_grid import SnapshotWithGrid +from dodal.log import LOGGER + + +class ZoomLevelNotFoundError(Exception): + def __init__(self, errmsg): + LOGGER.error(errmsg) + + +class Coords(IntEnum): + X = 0 + Y = 1 + + +# Workaround to deal with the fact that beamlines may have slightly different string +# descriptions of the zoom level" +def _get_correct_zoom_string(zoom: str) -> str: + if zoom.endswith("x"): + zoom = zoom.strip("x") + return zoom + + +class ZoomController(StandardReadable): """ Device to control the zoom level. This should be set like o = OAV(name="oav") @@ -30,63 +39,89 @@ class ZoomController(Device): you should wait on any zoom changs to finish before changing the OAV wiring. """ - percentage = Component(EpicsSignal, "ZOOMPOSCMD") - - # Level is the string description of the zoom level e.g. "1.0x" - level = Component(EpicsSignal, "MP:SELECT", string=True) - - zrst = Component(EpicsSignal, "MP:SELECT.ZRST") - onst = Component(EpicsSignal, "MP:SELECT.ONST") - twst = Component(EpicsSignal, "MP:SELECT.TWST") - thst = Component(EpicsSignal, "MP:SELECT.THST") - frst = Component(EpicsSignal, "MP:SELECT.FRST") - fvst = Component(EpicsSignal, "MP:SELECT.FVST") - sxst = Component(EpicsSignal, "MP:SELECT.SXST") - - @property - def allowed_zoom_levels(self): - return [ - self.zrst.get(), - self.onst.get(), - self.twst.get(), - self.thst.get(), - self.frst.get(), - self.fvst.get(), - self.sxst.get(), - ] - - def set(self, level_to_set: str) -> StatusBase: - return self.level.set(level_to_set) - - -class OAV(AreaDetector): - cam = ADC(CamBase, "-DI-OAV-01:CAM:") - roi = ADC(ROIPlugin, "-DI-OAV-01:ROI:") - proc = ADC(ProcessPlugin, "-DI-OAV-01:PROC:") - over = ADC(OverlayPlugin, "-DI-OAV-01:OVER:") - tiff = ADC(OverlayPlugin, "-DI-OAV-01:TIFF:") - hdf5 = ADC(HDF5Plugin, "-DI-OAV-01:HDF5:") - grid_snapshot = Component(SnapshotWithGrid, "-DI-OAV-01:MJPG:") - snapshot = Component(SnapshotWithBeamCentre, "-DI-OAV-01:MJPG:") - zoom_controller = Component(ZoomController, "-EA-OAV-01:FZOOM:") - - def __init__(self, *args, params: OAVConfigParams, **kwargs): - super().__init__(*args, **kwargs) - self.parameters = params - self.grid_snapshot.oav_params = params - self.snapshot.oav_params = params - self.subscription_id = None - self._snapshot_trigger_subscription_id = None - - def wait_for_connection(self, all_signals=False, timeout=2): - connected = super().wait_for_connection(all_signals, timeout) - x = self.grid_snapshot.x_size.get() - y = self.grid_snapshot.y_size.get() - - cb = partial(self.parameters.update_on_zoom, xsize=x, ysize=y) - - if self.subscription_id is not None: - self.zoom_controller.level.unsubscribe(self.subscription_id) - self.subscription_id = self.zoom_controller.level.subscribe(cb) - - return connected + def __init__(self, prefix: str, name: str = "") -> None: + super().__init__(name=name) + self.percentage = epics_signal_rw(float, f"{prefix}ZOOMPOSCMD") + + # Level is the string description of the zoom level e.g. "1.0x" or "1.0" + self.level = epics_signal_rw(str, f"{prefix}MP:SELECT") + + async def _get_allowed_zoom_levels(self) -> list: + zoom_levels = await self.level.describe() + return zoom_levels["level"]["choices"] # type: ignore + + @AsyncStatus.wrap + async def set(self, level_to_set: str): + allowed_zoom_levels = await self._get_allowed_zoom_levels() + if level_to_set not in allowed_zoom_levels: + raise ZoomLevelNotFoundError( + f"{level_to_set} not found, expected one of {allowed_zoom_levels}" + ) + await self.level.set(level_to_set, wait=True) + + +class OAV(StandardReadable): + def __init__(self, prefix: str, config: OAVConfig, name: str = ""): + self.oav_config = config + self._prefix = prefix + self._name = name + _bl_prefix = prefix.split("-")[0] + self.zoom_controller = ZoomController(f"{_bl_prefix}-EA-OAV-01:FZOOM:", name) + + self.cam = Cam(f"{prefix}CAM:", name=name) + + with self.add_children_as_readables(): + self.grid_snapshot = SnapshotWithGrid(f"{prefix}MJPG:", name) + self.microns_per_pixel_x = create_hardware_backed_soft_signal( + float, + lambda: self._get_microns_per_pixel(Coords.X), + ) + self.microns_per_pixel_y = create_hardware_backed_soft_signal( + float, + lambda: self._get_microns_per_pixel(Coords.Y), + ) + self.beam_centre_i = create_hardware_backed_soft_signal( + int, lambda: self._get_beam_position(Coords.X) + ) + self.beam_centre_j = create_hardware_backed_soft_signal( + int, lambda: self._get_beam_position(Coords.Y) + ) + self.snapshot = SnapshotWithBeamCentre( + f"{self._prefix}MJPG:", + self.beam_centre_i, + self.beam_centre_j, + self._name, + ) + + self.sizes = [self.grid_snapshot.x_size, self.grid_snapshot.y_size] + + super().__init__(name) + + async def _read_current_zoom(self) -> str: + _zoom = await self.zoom_controller.level.get_value() + return _get_correct_zoom_string(_zoom) + + async def _get_microns_per_pixel(self, coord: int) -> float: + """Extracts the microns per x pixel and y pixel for a given zoom level.""" + _zoom = await self._read_current_zoom() + value = self.parameters[_zoom].microns_per_pixel[coord] + size = await self.sizes[coord].get_value() + return value * DEFAULT_OAV_WINDOW[coord] / size + + async def _get_beam_position(self, coord: int) -> int: + """Extracts the beam location in pixels `xCentre` `yCentre`, for a requested \ + zoom level. """ + _zoom = await self._read_current_zoom() + value = self.parameters[_zoom].crosshair[coord] + size = await self.sizes[coord].get_value() + return int(value * size / DEFAULT_OAV_WINDOW[coord]) + + async def connect( + self, + mock: bool = False, + timeout: float = DEFAULT_TIMEOUT, + force_reconnect: bool = False, + ): + self.parameters = self.oav_config.get_parameters() + + return await super().connect(mock, timeout, force_reconnect) diff --git a/src/dodal/devices/oav/oav_errors.py b/src/dodal/devices/oav/oav_errors.py deleted file mode 100644 index 82834196df..0000000000 --- a/src/dodal/devices/oav/oav_errors.py +++ /dev/null @@ -1,35 +0,0 @@ -""" -Module for containing errors in operation of the OAV. -""" - -from dodal.log import LOGGER - - -class OAVError_ZoomLevelNotFound(Exception): - def __init__(self, errmsg): - LOGGER.error(errmsg) - - -class OAVError_BeamPositionNotFound(Exception): - def __init__(self, errmsg): - LOGGER.error(errmsg) - - -class OAVError_WaveformAllZero(Exception): - def __init__(self, errmsg): - LOGGER.error(errmsg) - - -class OAVError_NoRotationsPassValidityTest(Exception): - def __init__(self, errmsg): - LOGGER.error(errmsg) - - -class OAVError_MissingRotations(Exception): - def __init__(self, errmsg): - LOGGER.error(errmsg) - - -class OAVError_TipDistanceExceedsMax(Exception): - def __init__(self, errmsg): - LOGGER.error(errmsg) diff --git a/src/dodal/devices/oav/oav_parameters.py b/src/dodal/devices/oav/oav_parameters.py index ac4c246473..c8d1eb047c 100644 --- a/src/dodal/devices/oav/oav_parameters.py +++ b/src/dodal/devices/oav/oav_parameters.py @@ -5,12 +5,6 @@ from typing import Any from xml.etree.ElementTree import Element -from dodal.devices.oav.oav_errors import ( - OAVError_BeamPositionNotFound, - OAVError_ZoomLevelNotFound, -) -from dodal.log import LOGGER - # GDA currently assumes this aspect ratio for the OAV window size. # For some beamline this doesn't affect anything as the actual OAV aspect ratio # matches. Others need to take it into account to rescale the values stored in @@ -110,118 +104,12 @@ def get_max_tip_distance_in_pixels(self, micronsPerPixel: float) -> float: return self.max_tip_distance / micronsPerPixel -class OAVConfigParams: - """ - The OAV parameters which may update depending on settings such as the zoom level. - """ - - def __init__( - self, - zoom_params_file, - display_config, - ): - self.zoom_params_file: str = zoom_params_file - self.display_config: str = display_config - - def update_on_zoom(self, value, xsize, ysize, *args, **kwargs): - xsize, ysize = int(xsize), int(ysize) - if isinstance(value, str) and value.endswith("x"): - value = value.strip("x") - zoom = float(value) - self.load_microns_per_pixel(zoom, xsize, ysize) - self.beam_centre_i, self.beam_centre_j = self.get_beam_position_from_zoom( - zoom, xsize, ysize - ) - - def load_microns_per_pixel(self, zoom: float, xsize: int, ysize: int) -> None: - """ - Loads the microns per x pixel and y pixel for a given zoom level. These are - currently generated by GDA, though hyperion could generate them in future. - """ - tree = et.parse(self.zoom_params_file) - self.micronsPerXPixel = self.micronsPerYPixel = None - root = tree.getroot() - levels = root.findall(".//zoomLevel") - for node in levels: - if _get_element_as_float(node, "level") == zoom: - self.micronsPerXPixel = ( - _get_element_as_float(node, "micronsPerXPixel") - * DEFAULT_OAV_WINDOW[0] - / xsize - ) - self.micronsPerYPixel = ( - _get_element_as_float(node, "micronsPerYPixel") - * DEFAULT_OAV_WINDOW[1] - / ysize - ) - if self.micronsPerXPixel is None or self.micronsPerYPixel is None: - raise OAVError_ZoomLevelNotFound( - f""" - Could not find the micronsPer[X,Y]Pixel parameters in - {self.zoom_params_file} for zoom level {zoom}. - """ - ) - - def get_beam_position_from_zoom( - self, zoom: float, xsize: int, ysize: int - ) -> tuple[int, int]: - """ - Extracts the beam location in pixels `xCentre` `yCentre`, for a requested zoom \ - level. The beam location is manually inputted by the beamline operator on GDA \ - by clicking where on screen a scintillator lights up, and stored in the \ - display.configuration file. - """ - crosshair_x_line = None - crosshair_y_line = None - with open(self.display_config) as f: - file_lines = f.readlines() - for i in range(len(file_lines)): - if file_lines[i].startswith("zoomLevel = " + str(zoom)): - crosshair_x_line = file_lines[i + 1] - crosshair_y_line = file_lines[i + 2] - break - - if crosshair_x_line is None or crosshair_y_line is None: - raise OAVError_BeamPositionNotFound( - f"Could not extract beam position at zoom level {zoom}" - ) - - beam_centre_i = ( - int(crosshair_x_line.split(" = ")[1]) * xsize / DEFAULT_OAV_WINDOW[0] - ) - beam_centre_j = ( - int(crosshair_y_line.split(" = ")[1]) * ysize / DEFAULT_OAV_WINDOW[1] - ) - LOGGER.info(f"Beam centre: {beam_centre_i, beam_centre_j}") - return int(beam_centre_i), int(beam_centre_j) - - def calculate_beam_distance( - self, horizontal_pixels: int, vertical_pixels: int - ) -> tuple[int, int]: - """ - Calculates the distance between the beam centre and the given (horizontal, vertical). - - Args: - horizontal_pixels (int): The x (camera coordinates) value in pixels. - vertical_pixels (int): The y (camera coordinates) value in pixels. - Returns: - The distance between the beam centre and the (horizontal, vertical) point in pixels as a tuple - (horizontal_distance, vertical_distance). - """ - - return ( - self.beam_centre_i - horizontal_pixels, - self.beam_centre_j - vertical_pixels, - ) - - @dataclass class ZoomParams: microns_per_pixel: tuple[float, float] crosshair: tuple[int, int] -# Once all moved to async this should replace OAVConfigParams class OAVConfig: """ Read the OAV config files and return a dictionary of {'zoom_level': ZoomParams}\ with information about microns per pixels and crosshairs. diff --git a/src/dodal/devices/oav/grid_overlay.py b/src/dodal/devices/oav/snapshots/grid_overlay.py similarity index 67% rename from src/dodal/devices/oav/grid_overlay.py rename to src/dodal/devices/oav/snapshots/grid_overlay.py index 24215e72c2..56eb048727 100644 --- a/src/dodal/devices/oav/grid_overlay.py +++ b/src/dodal/devices/oav/snapshots/grid_overlay.py @@ -1,14 +1,8 @@ -# type: ignore # OAV will soon be ophyd-async, see https://github.com/DiamondLightSource/dodal/issues/716 from enum import Enum from functools import partial -from os.path import join as path_join -from ophyd import Component, Signal from PIL import Image, ImageDraw -from dodal.devices.areadetector.plugins.MJPG import MJPG -from dodal.log import LOGGER - class Orientation(Enum): horizontal = 0 @@ -122,40 +116,3 @@ def add_grid_overlay_to_image( spacing=box_width, num_lines=num_boxes_y - 1, ) - - -class SnapshotWithGrid(MJPG): - top_left_x = Component(Signal) - top_left_y = Component(Signal) - box_width = Component(Signal) - num_boxes_x = Component(Signal) - num_boxes_y = Component(Signal) - - last_path_outer = Component(Signal) - last_path_full_overlay = Component(Signal) - - def post_processing(self, image: Image.Image): - # Save an unmodified image with no suffix - self._save_image(image) - - top_left_x = self.top_left_x.get() - top_left_y = self.top_left_y.get() - box_width = self.box_width.get() - num_boxes_x = self.num_boxes_x.get() - num_boxes_y = self.num_boxes_y.get() - assert isinstance(filename_str := self.filename.get(), str) - assert isinstance(directory_str := self.directory.get(), str) - add_grid_border_overlay_to_image( - image, top_left_x, top_left_y, box_width, num_boxes_x, num_boxes_y - ) - path = path_join(directory_str, f"{filename_str}_outer_overlay.png") - self.last_path_outer.put(path) - LOGGER.info(f"Saving grid outer edge at {path}") - image.save(path) - add_grid_overlay_to_image( - image, top_left_x, top_left_y, box_width, num_boxes_x, num_boxes_y - ) - path = path_join(directory_str, f"{filename_str}_grid_overlay.png") - self.last_path_full_overlay.put(path) - LOGGER.info(f"Saving full grid overlay at {path}") - image.save(path) diff --git a/src/dodal/devices/oav/snapshots/snapshot_with_beam_centre.py b/src/dodal/devices/oav/snapshots/snapshot_with_beam_centre.py new file mode 100644 index 0000000000..1497800892 --- /dev/null +++ b/src/dodal/devices/oav/snapshots/snapshot_with_beam_centre.py @@ -0,0 +1,64 @@ +from ophyd_async.core import SignalR +from PIL import Image, ImageDraw + +from dodal.devices.areadetector.plugins.MJPG import MJPG + +CROSSHAIR_LENGTH_PX = 20 +CROSSHAIR_OUTLINE_COLOUR = "Black" +CROSSHAIR_FILL_COLOUR = "White" + + +def draw_crosshair(image: Image.Image, beam_x: int, beam_y: int): + draw = ImageDraw.Draw(image) + OUTLINE_WIDTH = 1 + HALF_LEN = CROSSHAIR_LENGTH_PX / 2 + draw.rectangle( + [ + beam_x - OUTLINE_WIDTH, + beam_y - HALF_LEN - OUTLINE_WIDTH, + beam_x + OUTLINE_WIDTH, + beam_y + HALF_LEN + OUTLINE_WIDTH, + ], + fill=CROSSHAIR_OUTLINE_COLOUR, + ) + draw.rectangle( + [ + beam_x - HALF_LEN - OUTLINE_WIDTH, + beam_y - OUTLINE_WIDTH, + beam_x + HALF_LEN + OUTLINE_WIDTH, + beam_y + OUTLINE_WIDTH, + ], + fill=CROSSHAIR_OUTLINE_COLOUR, + ) + draw.line( + ((beam_x, beam_y - HALF_LEN), (beam_x, beam_y + HALF_LEN)), + fill=CROSSHAIR_FILL_COLOUR, + ) + draw.line( + ((beam_x - HALF_LEN, beam_y), (beam_x + HALF_LEN, beam_y)), + fill=CROSSHAIR_FILL_COLOUR, + ) + + +class SnapshotWithBeamCentre(MJPG): + """A child of MJPG which, when triggered, draws an outlined crosshair at the beam + centre in the image and saves the image to disk.""" + + def __init__( + self, + prefix: str, + beam_x_signal: SignalR, + beam_y_signal: SignalR, + name: str = "", + ) -> None: + with self.add_children_as_readables(): + self.beam_centre_i = beam_x_signal + self.beam_centre_j = beam_y_signal + super().__init__(prefix, name) + + async def post_processing(self, image: Image.Image): + beam_x = await self.beam_centre_i.get_value() + beam_y = await self.beam_centre_j.get_value() + draw_crosshair(image, beam_x, beam_y) + + await self._save_image(image) diff --git a/src/dodal/devices/oav/snapshots/snapshot_with_grid.py b/src/dodal/devices/oav/snapshots/snapshot_with_grid.py new file mode 100644 index 0000000000..fc616e81d6 --- /dev/null +++ b/src/dodal/devices/oav/snapshots/snapshot_with_grid.py @@ -0,0 +1,57 @@ +from os.path import join as path_join + +from ophyd_async.core import soft_signal_rw +from PIL.Image import Image + +from dodal.devices.areadetector.plugins.MJPG import IMG_FORMAT, MJPG, asyncio_save_image +from dodal.devices.oav.snapshots.grid_overlay import ( + add_grid_border_overlay_to_image, + add_grid_overlay_to_image, +) +from dodal.log import LOGGER + + +class SnapshotWithGrid(MJPG): + def __init__(self, prefix: str, name: str = "") -> None: + with self.add_children_as_readables(): + self.top_left_x = soft_signal_rw(int) + self.top_left_y = soft_signal_rw(int) + self.box_width = soft_signal_rw(int) + self.num_boxes_x = soft_signal_rw(int) + self.num_boxes_y = soft_signal_rw(int) + + self.last_path_outer = soft_signal_rw(str) + self.last_path_full_overlay = soft_signal_rw(str) + + super().__init__(prefix, name) + + async def post_processing(self, image: Image): + # Save an unmodified image with no suffix + await self._save_image(image) + + top_left_x = await self.top_left_x.get_value() + top_left_y = await self.top_left_y.get_value() + box_witdh = await self.box_width.get_value() + num_boxes_x = await self.num_boxes_x.get_value() + num_boxes_y = await self.num_boxes_y.get_value() + + assert isinstance(filename_str := await self.filename.get_value(), str) + assert isinstance(directory_str := await self.directory.get_value(), str) + + add_grid_border_overlay_to_image( + image, top_left_x, top_left_y, box_witdh, num_boxes_x, num_boxes_y + ) + + path = path_join(directory_str, f"{filename_str}_outer_overlay.{IMG_FORMAT}") + await self.last_path_outer.set(path, wait=True) + LOGGER.info(f"Saving grid outer edge at {path}") + await asyncio_save_image(image, path) + + add_grid_overlay_to_image( + image, top_left_x, top_left_y, box_witdh, num_boxes_x, num_boxes_y + ) + + path = path_join(directory_str, f"{filename_str}_grid_overlay.{IMG_FORMAT}") + await self.last_path_full_overlay.set(path, wait=True) + LOGGER.info(f"Saving full grid overlay at {path}") + await asyncio_save_image(image, path) diff --git a/src/dodal/devices/oav/utils.py b/src/dodal/devices/oav/utils.py index 24071502fe..f1e73c13be 100644 --- a/src/dodal/devices/oav/utils.py +++ b/src/dodal/devices/oav/utils.py @@ -5,8 +5,11 @@ import numpy as np from bluesky.utils import Msg -from dodal.devices.oav.oav_calculations import camera_coordinates_to_xyz -from dodal.devices.oav.oav_detector import OAVConfigParams +from dodal.devices.oav.oav_calculations import ( + calculate_beam_distance, + camera_coordinates_to_xyz, +) +from dodal.devices.oav.oav_detector import OAV from dodal.devices.oav.pin_image_recognition import PinTipDetection from dodal.devices.smargon import Smargon @@ -36,21 +39,6 @@ def bottom_right_from_top_left( ) -class ColorMode(IntEnum): - """ - Enum to store the various color modes of the camera. We use RGB1. - """ - - MONO = 0 - BAYER = 1 - RGB1 = 2 - RGB2 = 3 - RGB3 = 4 - YUV444 = 5 - YUV422 = 6 - YUV421 = 7 - - class EdgeOutputArrayImageType(IntEnum): """ Enum to store the types of image to tweak the output array. We use Original. @@ -64,7 +52,7 @@ class EdgeOutputArrayImageType(IntEnum): def get_move_required_so_that_beam_is_at_pixel( - smargon: Smargon, pixel: Pixel, oav_params: OAVConfigParams + smargon: Smargon, pixel: Pixel, oav: OAV ) -> Generator[Msg, None, np.ndarray]: """Calculate the required move so that the given pixel is in the centre of the beam.""" @@ -78,22 +66,35 @@ def get_move_required_so_that_beam_is_at_pixel( ) current_angle = yield from bps.rd(smargon.omega) - return calculate_x_y_z_of_pixel(current_motor_xyz, current_angle, pixel, oav_params) + beam_x = yield from bps.rd(oav.beam_centre_i) + beam_y = yield from bps.rd(oav.beam_centre_j) + microns_per_pixel_x = yield from bps.rd(oav.microns_per_pixel_x) + microns_per_pixel_y = yield from bps.rd(oav.microns_per_pixel_y) + + return calculate_x_y_z_of_pixel( + current_motor_xyz, + current_angle, + pixel, + (beam_x, beam_y), + (microns_per_pixel_x, microns_per_pixel_y), + ) def calculate_x_y_z_of_pixel( - current_x_y_z, current_omega, pixel: Pixel, oav_params: OAVConfigParams + current_x_y_z, + current_omega, + pixel: Pixel, + beam_centre: tuple[int, int], + microns_per_pixel: tuple[float, float], ) -> np.ndarray: - beam_distance_px: Pixel = oav_params.calculate_beam_distance(*pixel) + beam_distance_px: Pixel = calculate_beam_distance(beam_centre, *pixel) - assert oav_params.micronsPerXPixel - assert oav_params.micronsPerYPixel return current_x_y_z + camera_coordinates_to_xyz( beam_distance_px[0], beam_distance_px[1], current_omega, - oav_params.micronsPerXPixel, - oav_params.micronsPerYPixel, + microns_per_pixel[0], + microns_per_pixel[1], ) diff --git a/system_tests/test_oav_system.py b/system_tests/test_oav_system.py index 8768594416..c1a4d485ae 100644 --- a/system_tests/test_oav_system.py +++ b/system_tests/test_oav_system.py @@ -1,8 +1,9 @@ import bluesky.plan_stubs as bps import pytest from bluesky.run_engine import RunEngine +from ophyd_async.core import DeviceCollector -from dodal.devices.oav.oav_detector import OAV, OAVConfigParams, ZoomController +from dodal.devices.oav.oav_detector import OAV, OAVConfig, ZoomController TEST_GRID_TOP_LEFT_X = 100 TEST_GRID_TOP_LEFT_Y = 100 @@ -15,8 +16,15 @@ ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" +@pytest.fixture +async def oav() -> OAV: + oav_config = OAVConfig(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) + async with DeviceCollector(connect=True): + oav = OAV("", config=oav_config, name="oav") + return oav + + def take_snapshot_with_grid(oav: OAV, snapshot_filename, snapshot_directory): - oav.wait_for_connection() yield from bps.abs_set(oav.grid_snapshot.top_left_x, TEST_GRID_TOP_LEFT_X) # type: ignore # See: https://github.com/DiamondLightSource/dodal/issues/827 yield from bps.abs_set(oav.grid_snapshot.top_left_y, TEST_GRID_TOP_LEFT_Y) # type: ignore # See: https://github.com/DiamondLightSource/dodal/issues/827 yield from bps.abs_set(oav.grid_snapshot.box_width, TEST_GRID_BOX_WIDTH) # type: ignore # See: https://github.com/DiamondLightSource/dodal/issues/827 @@ -31,8 +39,8 @@ def take_snapshot_with_grid(oav: OAV, snapshot_filename, snapshot_directory): @pytest.mark.skip(reason="Don't want to actually take snapshots during testing.") def test_grid_overlay(RE: RunEngine): beamline = "BL03I" - oav_params = OAVConfigParams(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) - oav = OAV(name="oav", prefix=f"{beamline}", params=oav_params) + oav_params = OAVConfig(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) + oav = OAV(name="oav", prefix=f"{beamline}", config=oav_params) snapshot_filename = "snapshot" snapshot_directory = "." RE(take_snapshot_with_grid(oav, snapshot_filename, snapshot_directory)) @@ -40,7 +48,8 @@ def test_grid_overlay(RE: RunEngine): @pytest.mark.skip(reason="No OAV in S03") @pytest.mark.s03 -def test_get_zoom_levels(): +async def test_get_zoom_levels(): my_zoom_controller = ZoomController("BL03S-EA-OAV-01:FZOOM:", name="test_zoom") - my_zoom_controller.wait_for_connection() - assert my_zoom_controller.allowed_zoom_levels[0] == "1.0x" + await my_zoom_controller.connect() + description = await my_zoom_controller.level.describe() + assert description["zoom_controller-level"]["choices"][0] == "1.0x" # type: ignore diff --git a/tests/devices/unit_tests/areadetector/plugins/test_MJPG.py b/tests/devices/unit_tests/areadetector/plugins/test_MJPG.py deleted file mode 100644 index 2bc8d993fd..0000000000 --- a/tests/devices/unit_tests/areadetector/plugins/test_MJPG.py +++ /dev/null @@ -1,45 +0,0 @@ -from pathlib import Path -from unittest.mock import MagicMock, patch - -from ophyd.sim import instantiate_fake_device -from PIL import Image - -from dodal.devices.areadetector.plugins.MJPG import SnapshotWithBeamCentre -from dodal.devices.oav.oav_detector import OAVConfigParams - -DISPLAY_CONFIGURATION = "tests/devices/unit_tests/test_display.configuration" -ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" - - -@patch("dodal.devices.areadetector.plugins.MJPG.Image") -@patch("dodal.devices.areadetector.plugins.MJPG.os") -@patch("dodal.devices.areadetector.plugins.MJPG.ImageDraw") -@patch("dodal.devices.areadetector.plugins.MJPG.requests") -def test_given_snapshot_triggered_then_crosshair_drawn( - patch_requests, patch_image_draw, patch_os, patch_image -): - patch_line = MagicMock() - patch_requests.get.return_value.content = b"" - params = OAVConfigParams(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) - params.update_on_zoom(1.0, 100, 100) - - patch_image_draw.Draw.return_value.line = patch_line - snapshot: SnapshotWithBeamCentre = instantiate_fake_device(SnapshotWithBeamCentre) - snapshot.oav_params = params - snapshot.directory.set("/tmp/") - snapshot.filename.set("test") - - status = snapshot.trigger() - status.wait() - - assert len(patch_line.mock_calls) == 2 - - -def test_snapshot_draws_expected_crosshair(tmp_path: Path): - image = Image.open("tests/test_data/test_images/oav_snapshot_test.png") - SnapshotWithBeamCentre.draw_crosshair(image, 510, 380) - image.save(tmp_path / "output_image.png") - expected_image = Image.open("tests/test_data/test_images/oav_snapshot_expected.png") - image_bytes = image.tobytes() - expected_bytes = expected_image.tobytes() - assert image_bytes == expected_bytes, "Actual and expected images differ" diff --git a/tests/devices/unit_tests/oav/conftest.py b/tests/devices/unit_tests/oav/conftest.py new file mode 100644 index 0000000000..792b988149 --- /dev/null +++ b/tests/devices/unit_tests/oav/conftest.py @@ -0,0 +1,25 @@ +from unittest.mock import AsyncMock + +import pytest +from ophyd_async.core import DeviceCollector, set_mock_value + +from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_parameters import OAVConfig + +DISPLAY_CONFIGURATION = "tests/devices/unit_tests/test_display.configuration" +ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" + + +@pytest.fixture +async def oav() -> OAV: + oav_config = OAVConfig(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) + async with DeviceCollector(mock=True, connect=True): + oav = OAV("", config=oav_config, name="fake_oav") + zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x"] + oav.zoom_controller.level.describe = AsyncMock( + return_value={"level": {"choices": zoom_levels_list}} + ) + set_mock_value(oav.grid_snapshot.x_size, 1024) + set_mock_value(oav.grid_snapshot.y_size, 768) + set_mock_value(oav.zoom_controller.level, "1.0x") + return oav diff --git a/tests/devices/unit_tests/test_grid_overlay.py b/tests/devices/unit_tests/oav/test_grid_overlay.py similarity index 93% rename from tests/devices/unit_tests/test_grid_overlay.py rename to tests/devices/unit_tests/oav/test_grid_overlay.py index fdde1bc423..1d194b09b7 100644 --- a/tests/devices/unit_tests/test_grid_overlay.py +++ b/tests/devices/unit_tests/oav/test_grid_overlay.py @@ -2,7 +2,7 @@ import pytest -from dodal.devices.oav.grid_overlay import ( +from dodal.devices.oav.snapshots.grid_overlay import ( add_grid_border_overlay_to_image, add_grid_overlay_to_image, ) @@ -47,7 +47,7 @@ def _test_expected_calls_to_image_draw_line(mock_image_draw: MagicMock, expected ), ], ) -@patch("dodal.devices.oav.grid_overlay.ImageDraw.Draw") +@patch("dodal.devices.oav.snapshots.grid_overlay.ImageDraw.Draw") def test_add_grid_border_overlay_to_image_makes_correct_calls_to_imagedraw( mock_imagedraw: MagicMock, top_left_x, @@ -97,7 +97,7 @@ def test_add_grid_border_overlay_to_image_makes_correct_calls_to_imagedraw( ), ], ) -@patch("dodal.devices.oav.grid_overlay.ImageDraw.Draw") +@patch("dodal.devices.oav.snapshots.grid_overlay.ImageDraw.Draw") def test_add_grid_overlay_to_image_makes_correct_calls_to_imagedraw( mock_imagedraw: MagicMock, top_left_x, diff --git a/tests/devices/unit_tests/oav/test_oav.py b/tests/devices/unit_tests/oav/test_oav.py index ebaa6eb228..e11af8d86a 100644 --- a/tests/devices/unit_tests/oav/test_oav.py +++ b/tests/devices/unit_tests/oav/test_oav.py @@ -1,41 +1,53 @@ +from unittest.mock import AsyncMock, patch + import pytest -from ophyd.sim import instantiate_fake_device +from ophyd_async.core import set_mock_value -from dodal.devices.oav.oav_detector import OAV, OAVConfigParams -from dodal.devices.oav.oav_errors import ( - OAVError_BeamPositionNotFound, - OAVError_ZoomLevelNotFound, +from dodal.devices.oav.oav_detector import ( + OAV, + Cam, + ZoomController, + ZoomLevelNotFoundError, ) -DISPLAY_CONFIGURATION = "tests/devices/unit_tests/test_display.configuration" -ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" +async def test_zoom_controller(): + zoom_controller = ZoomController("", "fake zoom controller") + await zoom_controller.connect(mock=True) + zoom_controller.level.describe = AsyncMock( + return_value={"level": {"choices": ["1.0x", "3.0x"]}} + ) + status = zoom_controller.set("3.0x") + await status + assert status.success + assert await zoom_controller.level.get_value() == "3.0x" -@pytest.fixture -def oav() -> OAV: - oav_params = OAVConfigParams(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) - oav: OAV = instantiate_fake_device(OAV, params=oav_params) - oav.proc.port_name.sim_put("proc") # type: ignore - oav.cam.port_name.sim_put("CAM") # type: ignore - oav.grid_snapshot.x_size.sim_put("1024") # type: ignore - oav.grid_snapshot.y_size.sim_put("768") # type: ignore +async def test_zoom_controller_set_raises_error_for_wrong_level(): + zoom_controller = ZoomController("", "fake zoom controller") + await zoom_controller.connect(mock=True) + zoom_controller._get_allowed_zoom_levels = AsyncMock(return_value=["1.0x", "3.0x"]) + with pytest.raises(ZoomLevelNotFoundError): + await zoom_controller.set("5.0x") - oav.zoom_controller.zrst.set("1.0x") - oav.zoom_controller.onst.set("2.0x") - oav.zoom_controller.twst.set("3.0x") - oav.zoom_controller.thst.set("5.0x") - oav.zoom_controller.frst.set("7.0x") - oav.zoom_controller.fvst.set("9.0x") - oav.wait_for_connection() +async def test_cam(): + cam = Cam("", "fake cam") + await cam.connect(mock=True) + set_mock_value(cam.array_size_x, 1024) + set_mock_value(cam.array_size_y, 768) - return oav + status = cam.acquire_period.set(0.01) + await status + assert status.success + assert await cam.acquire_period.get_value() == 0.01 + status = cam.acquire_time.set(0.01) + await status + assert status.success + assert await cam.acquire_time.get_value() == 0.01 -def test_load_microns_per_pixel_entry_not_found(oav: OAV): - with pytest.raises(OAVError_ZoomLevelNotFound): - oav.parameters.load_microns_per_pixel(0.000001, 0, 0) + assert await cam.array_size_x.get_value() == 1024 @pytest.mark.parametrize( @@ -47,87 +59,71 @@ def test_load_microns_per_pixel_entry_not_found(oav: OAV): ("15.0", 0.302, 0.302), ], ) -def test_get_micronsperpixel_from_oav( +async def test_get_micronsperpixel_from_oav( zoom_level, expected_microns_x, expected_microns_y, oav: OAV ): - oav.zoom_controller.level.sim_put(zoom_level) # type: ignore + set_mock_value(oav.zoom_controller.level, zoom_level) - assert oav.parameters.micronsPerXPixel == pytest.approx( + assert await oav.microns_per_pixel_x.get_value() == pytest.approx( expected_microns_x, abs=1e-2 ) - assert oav.parameters.micronsPerYPixel == pytest.approx( + assert await oav.microns_per_pixel_y.get_value() == pytest.approx( expected_microns_y, abs=1e-2 ) -def test_beam_position_not_found_for_wrong_entry(oav: OAV): - with pytest.raises(OAVError_BeamPositionNotFound): - oav.parameters.get_beam_position_from_zoom(2.0, 0, 0) - - -def test_get_beam_position(oav: OAV): - expected_beam_position = (493, 355) - beam_position = oav.parameters.get_beam_position_from_zoom(2.5, 1024, 768) - - assert beam_position[0] == expected_beam_position[0] - assert beam_position[1] == expected_beam_position[1] - - @pytest.mark.parametrize( "zoom_level,expected_xCentre,expected_yCentre", [("1.0", 477, 359), ("5.0", 517, 350), ("10.0x", 613, 344)], ) -def test_extract_beam_position_given_different_zoom_levels( +async def test_extract_beam_position_given_different_zoom_levels( zoom_level, expected_xCentre, expected_yCentre, oav: OAV, ): - oav.zoom_controller.level.sim_put(zoom_level) # type: ignore - - assert oav.parameters.beam_centre_i == expected_xCentre - assert oav.parameters.beam_centre_j == expected_yCentre - + set_mock_value(oav.zoom_controller.level, zoom_level) -def test_extract_rescaled_micronsperpixel(oav: OAV): - oav.grid_snapshot.x_size.sim_put("1292") # type: ignore - oav.grid_snapshot.y_size.sim_put("964") # type: ignore - oav.wait_for_connection() + assert await oav.beam_centre_i.get_value() == expected_xCentre + assert await oav.beam_centre_j.get_value() == expected_yCentre - oav.zoom_controller.level.sim_put("1.0") # type: ignore - - assert oav.parameters.micronsPerXPixel == pytest.approx(2.27, abs=1e-2) - assert oav.parameters.micronsPerYPixel == pytest.approx(2.28, abs=1e-2) +async def test_oav_returns_rescaled_beam_position_and_microns_per_pixel_correctly( + oav: OAV, +): + set_mock_value(oav.grid_snapshot.x_size, 1292) + set_mock_value(oav.grid_snapshot.y_size, 964) -def test_extract_rescaled_beam_position(oav: OAV): - oav.grid_snapshot.x_size.sim_put("1292") # type: ignore - oav.grid_snapshot.y_size.sim_put("964") # type: ignore - oav.wait_for_connection() + set_mock_value(oav.zoom_controller.level, "1.0") - oav.zoom_controller.level.sim_put("1.0") # type: ignore + microns_x = await oav.microns_per_pixel_x.get_value() + microns_y = await oav.microns_per_pixel_y.get_value() + beam_x = await oav.beam_centre_i.get_value() + beam_y = await oav.beam_centre_j.get_value() - assert oav.parameters.beam_centre_i == 601 - assert oav.parameters.beam_centre_j == 450 + assert microns_x == pytest.approx(2.27, abs=1e-2) + assert microns_y == pytest.approx(2.28, abs=1e-2) + assert beam_x == 601 + assert beam_y == 450 -@pytest.mark.parametrize( - "h, v, expected_x, expected_y", - [ - (54, 100, 517 - 54, 350 - 100), - (0, 0, 517, 350), - (500, 500, 517 - 500, 350 - 500), - ], +@patch( + "dodal.devices.areadetector.plugins.MJPG.ClientSession.get", + autospec=True, ) -def test_calculate_beam_distance(h, v, expected_x, expected_y, oav: OAV): - oav.zoom_controller.level.sim_put("5.0x") # type: ignore +@patch("dodal.devices.areadetector.plugins.MJPG.Image") +async def test_when_snapshot_triggered_post_processing_called_correctly( + patch_image, mock_get, oav: OAV +): + mock_get.return_value.__aenter__.return_value = (mock_response := AsyncMock()) + mock_response.ok = True + mock_response.read.return_value = (test_data := b"TEST") + + mock_open = patch_image.open + mock_open.return_value.__aenter__.return_value = test_data - assert oav.parameters.calculate_beam_distance( - h, - v, - ) == (expected_x, expected_y) + oav.snapshot.post_processing = (mock_proc := AsyncMock()) + await oav.snapshot.trigger() -def test_when_oav_created_then_snapshot_parameters_set(oav: OAV): - assert oav.snapshot.oav_params is not None - assert oav.grid_snapshot.oav_params is not None + mock_proc.assert_awaited_once() diff --git a/tests/devices/unit_tests/oav/test_oav_async.py b/tests/devices/unit_tests/oav/test_oav_async.py deleted file mode 100644 index fe31680b8b..0000000000 --- a/tests/devices/unit_tests/oav/test_oav_async.py +++ /dev/null @@ -1,101 +0,0 @@ -import pytest -from ophyd_async.core import DeviceCollector, set_mock_value - -from dodal.devices.oav.oav_async import OAV, ZoomController -from dodal.devices.oav.oav_parameters import OAVConfig - -DISPLAY_CONFIGURATION = "tests/devices/unit_tests/test_display.configuration" -ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" - - -@pytest.fixture -async def oav() -> OAV: - oav_config = OAVConfig(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) - async with DeviceCollector(mock=True, connect=True): - oav = OAV("", config=oav_config, name="fake_oav") - set_mock_value(oav.x_size, 1024) - set_mock_value(oav.y_size, 768) - return oav - - -async def test_zoom_controller(): - zoom_controller = ZoomController("", "fake zoom controller") - await zoom_controller.connect(mock=True) - status = zoom_controller.set("3.0x") - await status - assert status.success - assert await zoom_controller.level.get_value() == "3.0x" - - -@pytest.mark.parametrize( - "zoom_level,expected_microns_x,expected_microns_y", - [ - ("1.0x", 2.87, 2.87), - ("2.5", 2.31, 2.31), - ("5.0x", 1.58, 1.58), - ("15.0", 0.302, 0.302), - ], -) -async def test_get_micronsperpixel_from_oav( - zoom_level, expected_microns_x, expected_microns_y, oav: OAV -): - set_mock_value(oav.zoom_controller.level, zoom_level) - - assert await oav.microns_per_pixel_x.get_value() == pytest.approx( - expected_microns_x, abs=1e-2 - ) - assert await oav.microns_per_pixel_y.get_value() == pytest.approx( - expected_microns_y, abs=1e-2 - ) - - -@pytest.mark.parametrize( - "zoom_level,expected_xCentre,expected_yCentre", - [("1.0", 477, 359), ("5.0", 517, 350), ("10.0x", 613, 344)], -) -async def test_extract_beam_position_given_different_zoom_levels( - zoom_level, - expected_xCentre, - expected_yCentre, - oav: OAV, -): - set_mock_value(oav.zoom_controller.level, zoom_level) - - assert await oav.beam_centre_i.get_value() == expected_xCentre - assert await oav.beam_centre_j.get_value() == expected_yCentre - - -async def test_oav_returns_rescaled_beam_position_and_microns_per_pixel_correctly( - oav: OAV, -): - set_mock_value(oav.x_size, 1292) - set_mock_value(oav.y_size, 964) - - set_mock_value(oav.zoom_controller.level, "1.0") - - microns_x = await oav.microns_per_pixel_x.get_value() - microns_y = await oav.microns_per_pixel_y.get_value() - beam_x = await oav.beam_centre_i.get_value() - beam_y = await oav.beam_centre_j.get_value() - - assert microns_x == pytest.approx(2.27, abs=1e-2) - assert microns_y == pytest.approx(2.28, abs=1e-2) - assert beam_x == 601 - assert beam_y == 450 - - -@pytest.mark.parametrize( - "h, v, expected_x, expected_y", - [ - (54, 100, 517 - 54, 350 - 100), - (0, 0, 517, 350), - (500, 500, 517 - 500, 350 - 500), - ], -) -async def test_calculate_beam_distance(h, v, expected_x, expected_y, oav: OAV): - set_mock_value(oav.zoom_controller.level, "5.0x") # type: ignore - - assert await oav.calculate_beam_distance( - h, - v, - ) == (expected_x, expected_y) diff --git a/tests/devices/unit_tests/oav/test_oav_parameters.py b/tests/devices/unit_tests/oav/test_oav_parameters.py index 421906baaf..6f04fc1f29 100644 --- a/tests/devices/unit_tests/oav/test_oav_parameters.py +++ b/tests/devices/unit_tests/oav/test_oav_parameters.py @@ -2,7 +2,6 @@ from dodal.devices.oav.oav_parameters import ( OAVConfig, - OAVConfigParams, OAVParameters, ZoomParams, ) @@ -43,20 +42,6 @@ def test_given_key_in_context_and_default_when_load_parameters_then_value_found_ assert mock_parameters.minimum_height == 10 -def test_given_context_and_microns_per_pixel_get_max_tip_distance_in_pixels( - mock_parameters: OAVParameters, -): - zoom_level = mock_parameters.zoom - config_params = OAVConfigParams(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) - config_params.update_on_zoom(str(zoom_level), 1024, 768) - - assert mock_parameters.max_tip_distance == 300 - assert config_params.micronsPerXPixel - assert mock_parameters.get_max_tip_distance_in_pixels( - config_params.micronsPerXPixel - ) == pytest.approx(189.873, abs=1e-3) - - @pytest.mark.parametrize( "zoom_level, expected_microns, expected_crosshair", [ @@ -84,3 +69,10 @@ def test_given_oav_config_get_max_tip_distance_in_pixels( assert mock_parameters.get_max_tip_distance_in_pixels( microns_per_pixel_x ) == pytest.approx(189.873, abs=1e-3) + + +def test_given_new_context_parameters_are_updated(mock_parameters: OAVParameters): + mock_parameters.update_context("xrayCentring") + + assert mock_parameters.active_params.get("zoom") == 7.5 + assert mock_parameters.active_params.get("brightness") == 80 diff --git a/tests/devices/unit_tests/oav/test_oav_utils.py b/tests/devices/unit_tests/oav/test_oav_utils.py new file mode 100644 index 0000000000..e3d576016c --- /dev/null +++ b/tests/devices/unit_tests/oav/test_oav_utils.py @@ -0,0 +1,110 @@ +from unittest.mock import AsyncMock + +import numpy as np +import pytest +from bluesky.run_engine import RunEngine +from ophyd.sim import instantiate_fake_device +from ophyd_async.core import set_mock_value + +from dodal.devices.oav.oav_calculations import calculate_beam_distance +from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.pin_image_recognition import PinTipDetection +from dodal.devices.oav.pin_image_recognition.utils import SampleLocation +from dodal.devices.oav.utils import ( + PinNotFoundException, + bottom_right_from_top_left, + get_move_required_so_that_beam_is_at_pixel, + wait_for_tip_to_be_found, +) +from dodal.devices.smargon import Smargon + + +def test_bottom_right_from_top_left(): + top_left = np.array([123, 123]) + bottom_right = bottom_right_from_top_left( + top_left, 20, 30, 0.1, 0.15, 2.7027, 2.7027 + ) + assert bottom_right[0] == 863 and bottom_right[1] == 1788 + bottom_right = bottom_right_from_top_left(top_left, 15, 20, 0.005, 0.007, 1, 1) + assert bottom_right[0] == 198 and bottom_right[1] == 263 + + +@pytest.mark.parametrize( + "h, v, expected_x, expected_y", + [ + (54, 100, 517 - 54, 350 - 100), + (0, 0, 517, 350), + (500, 500, 517 - 500, 350 - 500), + ], +) +def test_calculate_beam_distance(h, v, expected_x, expected_y, oav: OAV): + # Beam center from test files for zoom_level = 5.0x + beam_centre = (517, 350) + + assert calculate_beam_distance( + beam_centre, + h, + v, + ) == (expected_x, expected_y) + + +@pytest.mark.parametrize( + "zoom_level, angle, pixel_to_move_to, expected_xyz", + [ + # Different zoom levels -> different um_per_pix and beam_centre + ("5.0x", 0, (100, 190), (-0.659, -0.253, 0)), + ("1.0x", 0, (100, 190), (-1.082, -0.485, 0)), + # Different position to reach, same zoom level + ("1.0x", 0, (50, 250), (-1.226, -0.313, 0)), + # Change angle + ("5.0x", 45, (100, 190), (-0.659, -0.179, 0.179)), + ], +) +async def test_values_for_move_so_that_beam_is_at_pixel( + zoom_level: str, + angle: int, + pixel_to_move_to: tuple, + expected_xyz: tuple, + oav: OAV, + smargon: Smargon, +): + set_mock_value(oav.zoom_controller.level, zoom_level) + set_mock_value(smargon.omega.user_readback, angle) + RE = RunEngine(call_returns_result=True) + pos = RE( + get_move_required_so_that_beam_is_at_pixel(smargon, pixel_to_move_to, oav) + ).plan_result # type: ignore + + assert pos == pytest.approx(expected_xyz, abs=1e-3) + + +@pytest.mark.asyncio +async def test_given_tip_found_when_wait_for_tip_to_be_found_called_then_tip_immediately_returned(): + mock_pin_tip_detect: PinTipDetection = instantiate_fake_device( + PinTipDetection, name="pin_detect" + ) + await mock_pin_tip_detect.connect(mock=True) + mock_pin_tip_detect._get_tip_and_edge_data = AsyncMock( + return_value=SampleLocation(100, 100, np.array([]), np.array([])) + ) + RE = RunEngine(call_returns_result=True) + result = RE(wait_for_tip_to_be_found(mock_pin_tip_detect)) + assert result.plan_result == (100, 100) # type: ignore + mock_pin_tip_detect._get_tip_and_edge_data.assert_called_once() + + +@pytest.mark.asyncio +async def test_given_no_tip_when_wait_for_tip_to_be_found_called_then_exception_thrown(): + mock_pin_tip_detect: PinTipDetection = instantiate_fake_device( + PinTipDetection, name="pin_detect" + ) + await mock_pin_tip_detect.connect(mock=True) + await mock_pin_tip_detect.validity_timeout.set(0.2) + mock_pin_tip_detect._get_tip_and_edge_data = AsyncMock( + return_value=SampleLocation( + *PinTipDetection.INVALID_POSITION, np.array([]), np.array([]) + ) + ) + RE = RunEngine(call_returns_result=True) + with pytest.raises(PinNotFoundException): + RE(wait_for_tip_to_be_found(mock_pin_tip_detect)) diff --git a/tests/devices/unit_tests/oav/test_snapshots.py b/tests/devices/unit_tests/oav/test_snapshots.py new file mode 100644 index 0000000000..764e709780 --- /dev/null +++ b/tests/devices/unit_tests/oav/test_snapshots.py @@ -0,0 +1,188 @@ +from pathlib import Path +from unittest.mock import ANY, AsyncMock, MagicMock, call, patch + +import pytest +from ophyd_async.core import DeviceCollector, MockSignalBackend, SignalR, set_mock_value +from PIL import Image + +from dodal.devices.oav.snapshots.snapshot_with_beam_centre import ( + SnapshotWithBeamCentre, + draw_crosshair, +) +from dodal.devices.oav.snapshots.snapshot_with_grid import ( + SnapshotWithGrid, + asyncio_save_image, +) + + +def create_and_set_mock_signal_r(dtype, name, value): + sig = SignalR(MockSignalBackend(dtype), name=name) + set_mock_value(sig, value) + return sig + + +@pytest.fixture +async def snapshot() -> SnapshotWithBeamCentre: + mock_beam_x = create_and_set_mock_signal_r(int, "moxk_beam_x", 510) + mock_beam_y = create_and_set_mock_signal_r(int, "mock_beam_y", 380) + async with DeviceCollector(mock=True): + snapshot = SnapshotWithBeamCentre("", mock_beam_x, mock_beam_y, "fake_snapshot") + set_mock_value(snapshot.directory, "/tmp/") + set_mock_value(snapshot.filename, "test") + set_mock_value(snapshot.url, "http://test.url") + return snapshot + + +@pytest.fixture +async def grid_snapshot() -> SnapshotWithGrid: + async with DeviceCollector(mock=True): + grid_snapshot = SnapshotWithGrid("", "fake_grid") + + set_mock_value(grid_snapshot.top_left_x, 100) + set_mock_value(grid_snapshot.top_left_y, 100) + set_mock_value(grid_snapshot.box_width, 50) + set_mock_value(grid_snapshot.num_boxes_x, 15) + set_mock_value(grid_snapshot.num_boxes_y, 10) + + set_mock_value(grid_snapshot.directory, "/tmp/") + set_mock_value(grid_snapshot.filename, "test") + set_mock_value(grid_snapshot.url, "http://test.url") + return grid_snapshot + + +@pytest.fixture +def mock_session_with_valid_response(): + with patch( + "dodal.devices.areadetector.plugins.MJPG.ClientSession.get", autospec=True + ) as mock_get: + mock_get.return_value.__aenter__.return_value = (mock_response := AsyncMock()) + mock_response.ok = True + mock_response.read.return_value = b"TEST" + yield mock_get + + +@pytest.fixture +def mock_image_open(): + with patch("dodal.devices.areadetector.plugins.MJPG.Image") as patch_image: + mock_open = patch_image.open + mock_open.return_value.__aenter__.return_value = b"TEST" + yield mock_open + + +@patch("dodal.devices.oav.snapshots.snapshot_with_beam_centre.ImageDraw") +async def test_snapshot_with_beam_centre_triggered_then_crosshair_drawn_and_saved( + patch_image_draw, mock_image_open, mock_session_with_valid_response, snapshot +): + patch_line = MagicMock() + patch_image_draw.Draw.return_value.line = patch_line + + snapshot._save_image = (mock_save := AsyncMock()) + + await snapshot.trigger() + + assert len(patch_line.mock_calls) == 2 + mock_save.assert_awaited_once() + + +@patch("dodal.devices.areadetector.plugins.MJPG.aiofiles", autospec=True) +async def test_snapshot_with_beam_centre_correctly_triggered_and_saved( + mock_aiofiles, + mock_image_open, + mock_session_with_valid_response, + snapshot, +): + mock_aio_open = mock_aiofiles.open + mock_aio_open.return_value.__aenter__.return_value = (mock_file := AsyncMock()) + + await snapshot.trigger() + + test_url = await snapshot.url.get_value() + # Get called with an instance of the session and correct url + mock_session_with_valid_response.assert_called_once_with(ANY, test_url) + + assert await snapshot.last_saved_path.get_value() == "/tmp/test.png" + mock_aio_open.assert_called_once_with("/tmp/test.png", "wb") + mock_file.write.assert_called_once() + + +@patch("dodal.devices.areadetector.plugins.MJPG.Path.mkdir") +@patch("dodal.devices.areadetector.plugins.MJPG.aiofiles", autospec=True) +async def test_given_directory_not_existing_when_snapshot_triggered_then_directory_created( + mock_aiofiles, + mock_mkdir, + mock_image_open, + mock_session_with_valid_response, + snapshot, +): + mock_aio_open = mock_aiofiles.open + mock_aio_open.return_value.__aenter__.return_value = AsyncMock() + + # Set new directory and test that it's created + set_mock_value(snapshot.directory, "new_dir") + + await snapshot.trigger() + + mock_mkdir.assert_called_once() + + +def test_snapshot_draws_expected_crosshair(tmp_path: Path): + image = Image.open("tests/test_data/test_images/oav_snapshot_test.png") + draw_crosshair(image, 510, 380) + image.save(tmp_path / "output_image.png") + expected_image = Image.open("tests/test_data/test_images/oav_snapshot_expected.png") + image_bytes = image.tobytes() + expected_bytes = expected_image.tobytes() + assert image_bytes == expected_bytes, "Actual and expected images differ" + + +@patch( + "dodal.devices.oav.snapshots.snapshot_with_grid.add_grid_border_overlay_to_image" +) +@patch("dodal.devices.oav.snapshots.snapshot_with_grid.add_grid_overlay_to_image") +@patch("dodal.devices.oav.snapshots.snapshot_with_grid.asyncio_save_image") +async def test_snapshot_with_grid_triggered_saves_image_and_draws_correct_grid( + mock_save_grid, + patch_add_grid, + patch_add_border, + mock_image_open, + mock_session_with_valid_response, + grid_snapshot, +): + grid_snapshot._save_image = (mock_save := AsyncMock()) + + await grid_snapshot.trigger() + + mock_save.assert_awaited_once() + patch_add_border.assert_called_once_with( + mock_image_open.return_value.__enter__.return_value, 100, 100, 50, 15, 10 + ) + patch_add_grid.assert_called_once_with( + mock_image_open.return_value.__enter__.return_value, 100, 100, 50, 15, 10 + ) + assert mock_save_grid.await_count == 2 + expected_grid_save_calls = [ + call(ANY, f"/tmp/test_{suffix}.png") + for suffix in ["outer_overlay", "grid_overlay"] + ] + assert mock_save_grid.mock_calls == expected_grid_save_calls + assert ( + await grid_snapshot.last_path_outer.get_value() == "/tmp/test_outer_overlay.png" + ) + assert ( + await grid_snapshot.last_path_full_overlay.get_value() + == "/tmp/test_grid_overlay.png" + ) + + +@patch("dodal.devices.areadetector.plugins.MJPG.Image") +@patch("dodal.devices.areadetector.plugins.MJPG.aiofiles", autospec=True) +async def test_asyncio_save_image(mock_aiofiles, patch_image): + mock_aio_open = mock_aiofiles.open + mock_aio_open.return_value.__aenter__.return_value = (mock_file := AsyncMock()) + + test_path = MagicMock(return_value="some_path/test_grid.png") + await asyncio_save_image(patch_image, test_path) + + patch_image.save.assert_called_once() + mock_aio_open.assert_called_once_with(test_path, "wb") + mock_file.write.assert_called_once() diff --git a/tests/devices/unit_tests/test_oav.py b/tests/devices/unit_tests/test_oav.py deleted file mode 100644 index 46efa5516d..0000000000 --- a/tests/devices/unit_tests/test_oav.py +++ /dev/null @@ -1,262 +0,0 @@ -from unittest.mock import AsyncMock, MagicMock, call, patch - -import numpy as np -import pytest -from bluesky.run_engine import RunEngine -from ophyd.sim import instantiate_fake_device, make_fake_device -from ophyd_async.core import set_mock_value -from PIL import Image -from requests import HTTPError, Response - -import dodal.devices.oav.utils as oav_utils -from dodal.devices.oav.oav_detector import OAV, OAVConfigParams -from dodal.devices.oav.pin_image_recognition import PinTipDetection -from dodal.devices.oav.pin_image_recognition.utils import SampleLocation -from dodal.devices.oav.utils import ( - PinNotFoundException, - get_move_required_so_that_beam_is_at_pixel, - wait_for_tip_to_be_found, -) -from dodal.devices.smargon import Smargon - -DISPLAY_CONFIGURATION = "tests/devices/unit_tests/test_display.configuration" -ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" - - -@pytest.fixture -def fake_oav() -> OAV: - oav_params = OAVConfigParams(ZOOM_LEVELS_XML, DISPLAY_CONFIGURATION) - FakeOAV = make_fake_device(OAV) - fake_oav: OAV = FakeOAV(name="test fake OAV", params=oav_params) - - fake_oav.grid_snapshot.url.sim_put("http://test.url") # type: ignore - fake_oav.grid_snapshot.filename.put("test filename") - fake_oav.grid_snapshot.directory.put("test directory") - fake_oav.grid_snapshot.top_left_x.put(100) - fake_oav.grid_snapshot.top_left_y.put(100) - fake_oav.grid_snapshot.box_width.put(50) - fake_oav.grid_snapshot.num_boxes_x.put(15) - fake_oav.grid_snapshot.num_boxes_y.put(10) - fake_oav.grid_snapshot.x_size.sim_put(1024) # type: ignore - fake_oav.grid_snapshot.y_size.sim_put(768) # type: ignore - - fake_oav.cam.port_name.sim_put("CAM") # type: ignore - fake_oav.proc.port_name.sim_put("PROC") # type: ignore - - fake_oav.wait_for_connection() - fake_oav.zoom_controller.set("1.0x").wait() - - return fake_oav - - -@pytest.fixture -def mock_get_with_valid_response(): - patcher = patch("requests.get") - mock_get = patcher.start() - mock_get.return_value.content = b"" - yield mock_get - patcher.stop() - - -@patch("requests.get") -def test_snapshot_trigger_handles_request_with_bad_status_code_correctly( - mock_get, fake_oav: OAV -): - response = Response() - response.status_code = 404 - mock_get.return_value = response - - st = fake_oav.grid_snapshot.trigger() - with pytest.raises(HTTPError): - st.wait() - - -@patch("dodal.devices.areadetector.plugins.MJPG.Image") -@patch("dodal.devices.areadetector.plugins.MJPG.os", new=MagicMock()) -def test_snapshot_trigger_loads_correct_url( - mock_image: MagicMock, mock_get_with_valid_response: MagicMock, fake_oav: OAV -): - st = fake_oav.grid_snapshot.trigger() - st.wait() - mock_get_with_valid_response.assert_called_once_with("http://test.url", stream=True) - - -@patch("dodal.devices.areadetector.plugins.MJPG.Image.open") -@patch("dodal.devices.areadetector.plugins.MJPG.os", new=MagicMock()) -def test_snapshot_trigger_saves_to_correct_file( - mock_open: MagicMock, mock_get_with_valid_response, fake_oav -): - image = Image.open("test") - mock_open.return_value.__enter__.return_value = image - with patch.object(image, "save") as mock_save: - st = fake_oav.grid_snapshot.trigger() - st.wait() - expected_calls_to_save = [ - call(f"test directory/test filename{addition}.png") - for addition in ["", "_outer_overlay", "_grid_overlay"] - ] - calls_to_save = mock_save.mock_calls - assert calls_to_save == expected_calls_to_save - - -@patch("dodal.devices.areadetector.plugins.MJPG.Image.open") -@patch("dodal.devices.areadetector.plugins.MJPG.os") -def test_given_directory_not_existing_when_snapshot_triggered_then_directory_created( - mock_os, mock_open: MagicMock, mock_get_with_valid_response, fake_oav -): - mock_os.path.isdir.return_value = False - st = fake_oav.grid_snapshot.trigger() - st.wait() - mock_os.mkdir.assert_called_once_with("test directory") - - -@patch("dodal.devices.areadetector.plugins.MJPG.Image.open") -@patch("dodal.devices.areadetector.plugins.MJPG.os", new=MagicMock()) -def test_snapshot_trigger_applies_current_microns_per_pixel_to_snapshot( - mock_open: MagicMock, mock_get_with_valid_response, fake_oav -): - image = Image.open("test") # type: ignore - mock_open.return_value.__enter__.return_value = image - - expected_mpp_x = fake_oav.parameters.micronsPerXPixel - expected_mpp_y = fake_oav.parameters.micronsPerYPixel - with patch.object(image, "save"): - st = fake_oav.grid_snapshot.trigger() - st.wait() - assert fake_oav.grid_snapshot.microns_per_pixel_x.get() == expected_mpp_x - assert fake_oav.grid_snapshot.microns_per_pixel_y.get() == expected_mpp_y - - -@patch("dodal.devices.areadetector.plugins.MJPG.Image.open") -@patch("dodal.devices.oav.grid_overlay.add_grid_overlay_to_image") -@patch("dodal.devices.oav.grid_overlay.add_grid_border_overlay_to_image") -@patch("dodal.devices.areadetector.plugins.MJPG.os", new=MagicMock()) -def test_correct_grid_drawn_on_image( - mock_border_overlay: MagicMock, - mock_grid_overlay: MagicMock, - mock_open: MagicMock, - mock_get_with_valid_response: MagicMock, - fake_oav: OAV, -): - st = fake_oav.grid_snapshot.trigger() - st.wait() - expected_border_calls = [ - call(mock_open.return_value.__enter__.return_value, 100, 100, 50, 15, 10) - ] - expected_grid_calls = [ - call(mock_open.return_value.__enter__.return_value, 100, 100, 50, 15, 10) - ] - actual_border_calls = mock_border_overlay.mock_calls - actual_grid_calls = mock_grid_overlay.mock_calls - assert actual_border_calls == expected_border_calls - assert actual_grid_calls == expected_grid_calls - - -def test_bottom_right_from_top_left(): - top_left = np.array([123, 123]) - bottom_right = oav_utils.bottom_right_from_top_left( - top_left, 20, 30, 0.1, 0.15, 2.7027, 2.7027 - ) - assert bottom_right[0] == 863 and bottom_right[1] == 1788 - bottom_right = oav_utils.bottom_right_from_top_left( - top_left, 15, 20, 0.005, 0.007, 1, 1 - ) - assert bottom_right[0] == 198 and bottom_right[1] == 263 - - -def test_get_beam_position_from_zoom_only_called_once_on_multiple_connects( - fake_oav: OAV, -): - fake_oav.wait_for_connection() - fake_oav.wait_for_connection() - fake_oav.wait_for_connection() - - with ( - patch( - "dodal.devices.oav.oav_detector.OAVConfigParams.update_on_zoom", - MagicMock(), - ), - patch( - "dodal.devices.oav.oav_detector.OAVConfigParams.get_beam_position_from_zoom", - MagicMock(), - ) as mock_get_beam_position_from_zoom, - patch( - "dodal.devices.oav.oav_detector.OAVConfigParams.load_microns_per_pixel", - MagicMock(), - ), - ): - fake_oav.zoom_controller.level.sim_put("2.0x") # type: ignore - assert mock_get_beam_position_from_zoom.call_count == 1 - - -@pytest.mark.parametrize( - "px_per_um, beam_centre, angle, pixel_to_move_to, expected_xyz", - [ - # Simple case of beam being in the top left and each pixel being 1 mm - ([1000, 1000], [0, 0], 0, [100, 190], [100, 190, 0]), - ([1000, 1000], [0, 0], -90, [50, 250], [50, 0, 250]), - ([1000, 1000], [0, 0], 90, [-60, 450], [-60, 0, -450]), - # Beam offset - ([1000, 1000], [100, 100], 0, [100, 100], [0, 0, 0]), - ([1000, 1000], [100, 100], -90, [50, 250], [-50, 0, 150]), - # Pixels_per_micron different - ([10, 50], [0, 0], 0, [100, 190], [1, 9.5, 0]), - ([60, 80], [0, 0], -90, [50, 250], [3, 0, 20]), - ], -) -def test_values_for_move_so_that_beam_is_at_pixel( - smargon: Smargon, - fake_oav: OAV, - px_per_um, - beam_centre, - angle, - pixel_to_move_to, - expected_xyz, -): - fake_oav.parameters.micronsPerXPixel = px_per_um[0] - fake_oav.parameters.micronsPerYPixel = px_per_um[1] - fake_oav.parameters.beam_centre_i = beam_centre[0] - fake_oav.parameters.beam_centre_j = beam_centre[1] - - set_mock_value(smargon.omega.user_readback, angle) - - RE = RunEngine(call_returns_result=True) - pos = RE( - get_move_required_so_that_beam_is_at_pixel( - smargon, pixel_to_move_to, fake_oav.parameters - ) - ).plan_result # type: ignore - - assert pos == pytest.approx(expected_xyz) - - -@pytest.mark.asyncio -async def test_given_tip_found_when_wait_for_tip_to_be_found_called_then_tip_immediately_returned(): - mock_pin_tip_detect: PinTipDetection = instantiate_fake_device( - PinTipDetection, name="pin_detect" - ) - await mock_pin_tip_detect.connect(mock=True) - mock_pin_tip_detect._get_tip_and_edge_data = AsyncMock( - return_value=SampleLocation(100, 100, np.array([]), np.array([])) - ) - RE = RunEngine(call_returns_result=True) - result = RE(wait_for_tip_to_be_found(mock_pin_tip_detect)) - assert result.plan_result == (100, 100) # type: ignore - mock_pin_tip_detect._get_tip_and_edge_data.assert_called_once() - - -@pytest.mark.asyncio -async def test_given_no_tip_when_wait_for_tip_to_be_found_called_then_exception_thrown(): - mock_pin_tip_detect: PinTipDetection = instantiate_fake_device( - PinTipDetection, name="pin_detect" - ) - await mock_pin_tip_detect.connect(mock=True) - await mock_pin_tip_detect.validity_timeout.set(0.2) - mock_pin_tip_detect._get_tip_and_edge_data = AsyncMock( - return_value=SampleLocation( - *PinTipDetection.INVALID_POSITION, np.array([]), np.array([]) - ) - ) - RE = RunEngine(call_returns_result=True) - with pytest.raises(PinNotFoundException): - RE(wait_for_tip_to_be_found(mock_pin_tip_detect))