Skip to content

Commit

Permalink
bring in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dperl-dls committed Sep 9, 2024
1 parent e925084 commit e056cfb
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
set_transmission,
)
from mx_bluesky.beamlines.i24.jungfrau_commissioning.plans.zebra_plans import (
arm_zebra,
disarm_zebra,
setup_zebra_for_rotation,
)
from mx_bluesky.beamlines.i24.jungfrau_commissioning.utils import i24
Expand All @@ -44,6 +42,10 @@
RotationScanParameters,
)
from mx_bluesky.beamlines.i24.jungfrau_commissioning.utils.utils import run_number
from mx_bluesky.beamlines.i24.serial.setup_beamline.setup_zebra_plans import (
arm_zebra,
disarm_zebra,
)


class JfDevices(TypedDict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from dodal.devices.zebra import (
DISCONNECT,
OR1,
PC_GATE_SOURCE_POSITION,
PC_PULSE,
PC_PULSE_SOURCE_POSITION,
TTL_DETECTOR,
TTL_XSPRESS3,
EncEnum,
I24Axes,
RotationDirection,
TrigSource,
Zebra,
)

Expand All @@ -17,35 +17,9 @@
TTL_SHUTTER = 4


def arm_zebra(zebra: Zebra, timeout: float = 3):
"""Send a demand to arm the Zebra, wait timeout seconds before failing"""
yield from bps.abs_set(zebra.pc.arm_demand, 1)
armed = yield from bps.rd(zebra.pc.armed)
time = 0.0
while not armed and time < timeout:
armed = yield from bps.rd(zebra.pc.armed)
time += 0.1
yield from bps.sleep(0.1)
if not armed:
raise TimeoutError("Zebra failed to arm!")


def disarm_zebra(zebra: Zebra, timeout: float = 3):
"""Send a demand to disarm the Zebra, wait timeout seconds before failing"""
yield from bps.abs_set(zebra.pc.disarm_demand, 1)
armed = yield from bps.rd(zebra.pc.armed)
time = 0.0
while armed and time < timeout:
armed = yield from bps.rd(zebra.pc.armed)
time += 0.1
yield from bps.sleep(0.1)
if armed:
raise TimeoutError("Zebra failed to disarm!")


def setup_zebra_for_rotation(
zebra: Zebra,
axis: I24Axes = I24Axes.OMEGA,
axis: EncEnum = I24Axes.OMEGA,
start_angle: float = 0,
scan_width: float = 360,
direction: RotationDirection = RotationDirection.POSITIVE,
Expand Down Expand Up @@ -79,18 +53,18 @@ def setup_zebra_for_rotation(

LOGGER.info("ZEBRA SETUP: START")
LOGGER.info("ZEBRA SETUP: Enable PC")
yield from bps.abs_set(zebra.pc.gate_source, PC_GATE_SOURCE_POSITION, group=group)
yield from bps.abs_set(zebra.pc.pulse_source, 1, group=group)
yield from bps.abs_set(zebra.pc.gate_source, TrigSource.POSITION, group=group)
yield from bps.abs_set(zebra.pc.pulse_source, TrigSource.TIME, group=group)
# must be on for shutter trigger to be enabled
yield from bps.abs_set(zebra.inputs.soft_in_1, 1, group=group)
# set rotation direction
yield from bps.abs_set(
zebra.pc.dir, ("Negative" if direction.value < 0 else "Positive"), group=group
)
yield from bps.abs_set(zebra.pc.dir, direction.value, group=group)
# Set gate start, adjust for shutter opening time if necessary
LOGGER.info(f"ZEBRA SETUP: shutter_opening_deg = {shutter_opening_deg}")
LOGGER.info(f"ZEBRA SETUP: start angle start: {start_angle}")
shutter_adjusted_start_angle = start_angle - (shutter_opening_deg * direction.value)
shutter_adjusted_start_angle = start_angle - (
shutter_opening_deg * direction.multiplier
)
LOGGER.info(f"ZEBRA SETUP: {shutter_adjusted_start_angle=} for gate start")
yield from bps.abs_set(
zebra.pc.gate_start,
Expand All @@ -115,7 +89,7 @@ def setup_zebra_for_rotation(
yield from bps.abs_set(zebra.output.out_pvs[TTL_DETECTOR], PC_PULSE, group=group)
# Don't use the fluorescence detector
yield from bps.abs_set(zebra.output.out_pvs[TTL_XSPRESS3], DISCONNECT, group=group)
yield from bps.abs_set(zebra.output.pulse_1_input, DISCONNECT, group=group)
yield from bps.abs_set(zebra.output.pulse_1.input, DISCONNECT, group=group)
LOGGER.info(f"ZEBRA SETUP: END - {'' if wait else 'not'} waiting for completion")
if wait:
yield from bps.wait(group)
Expand All @@ -133,7 +107,7 @@ def set_zebra_shutter_to_manual(

def setup_zebra_for_darks(
zebra: Zebra,
axis: I24Axes = I24Axes.OMEGA,
axis: EncEnum = I24Axes.OMEGA,
group: str = "setup_zebra_for_darks",
wait: bool = False,
):
Expand All @@ -142,8 +116,8 @@ def setup_zebra_for_darks(
"""
LOGGER.info("ZEBRA SETUP: START")
LOGGER.info("ZEBRA SETUP: Enable PC")
yield from bps.abs_set(zebra.pc.gate_source, PC_GATE_SOURCE_POSITION, group=group)
yield from bps.abs_set(zebra.pc.pulse_source, PC_PULSE_SOURCE_POSITION, group=group)
yield from bps.abs_set(zebra.pc.gate_source, TrigSource.POSITION, group=group)
yield from bps.abs_set(zebra.pc.pulse_source, TrigSource.POSITION, group=group)
# must be on for triggers to be enabled
yield from bps.abs_set(zebra.inputs.soft_in_1, 1, group=group)
yield from bps.abs_set(zebra.pc.gate_start, 0.5, group=group)
Expand All @@ -159,7 +133,7 @@ def setup_zebra_for_darks(
yield from bps.abs_set(zebra.output.out_pvs[TTL_DETECTOR], PC_PULSE, group=group)
# Don't use the fluorescence detector
yield from bps.abs_set(zebra.output.out_pvs[TTL_XSPRESS3], DISCONNECT, group=group)
yield from bps.abs_set(zebra.output.pulse_1_input, DISCONNECT, group=group)
yield from bps.abs_set(zebra.output.pulse_1.input, DISCONNECT, group=group)
LOGGER.info(f"ZEBRA SETUP: END - {'' if wait else 'not'} waiting for completion")
if wait:
yield from bps.wait(group)
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
from dodal.devices.oav.oav_detector import OAV
from dodal.devices.zebra import Zebra
from dodal.log import set_beamline
from dodal.utils import get_beamline_name, skip_device
from dodal.utils import skip_device

from mx_bluesky.beamlines.i24.jungfrau_commissioning.utils.jf_commissioning_devices import (
JungfrauM1,
ReadOnlyEnergyAndAttenuator,
SetAttenuator,
)

BL = get_beamline_name("s24")
set_beamline(BL)
set_utils_beamline(BL)

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import time
from collections.abc import Callable
from unittest.mock import MagicMock

import pytest
from dodal.devices.i24.i24_vgonio import VGonio
from ophyd.device import Device
from ophyd.status import Status

from mx_bluesky.beamlines.i24.jungfrau_commissioning.plans.rotation_scan_plans import (
JfDevices,
)
from mx_bluesky.beamlines.i24.jungfrau_commissioning.utils import i24
from mx_bluesky.beamlines.i24.jungfrau_commissioning.utils.jf_commissioning_devices import (
JungfrauM1,
ReadOnlyEnergyAndAttenuator,
SetAttenuator,
)


@pytest.fixture
def completed_status():
result = Status()
result.set_finished()
return result


@pytest.fixture
def fake_vgonio(completed_status) -> VGonio:
gon: VGonio = i24.vgonio(fake_with_ophyd_sim=True)

def set_omega_side_effect(val):
gon.omega.user_readback.sim_put(val) # type: ignore
return completed_status

gon.omega.set = MagicMock(side_effect=set_omega_side_effect)

gon.x.user_setpoint._use_limits = False
gon.yh.user_setpoint._use_limits = False
gon.z.user_setpoint._use_limits = False
gon.omega.user_setpoint._use_limits = False
return gon


@pytest.fixture
def fake_jungfrau() -> JungfrauM1:
JF: JungfrauM1 = i24.jungfrau(fake_with_ophyd_sim=True)

def set_acquire_side_effect(val):
JF.acquire_rbv.sim_put(1) # type: ignore
time.sleep(1)
JF.acquire_rbv.sim_put(0) # type: ignore
return completed_status

JF.acquire_start.set = MagicMock(side_effect=set_acquire_side_effect)

return JF


@pytest.fixture
def fake_beam_params() -> ReadOnlyEnergyAndAttenuator:
BP: ReadOnlyEnergyAndAttenuator = i24.beam_params(fake_with_ophyd_sim=True)
BP.transmission.sim_put(0.1) # type: ignore
BP.energy.sim_put(20000) # type: ignore
BP.wavelength.sim_put(0.65) # type: ignore
BP.intensity.sim_put(9999999) # type: ignore
return BP


@pytest.fixture
def attenuator() -> SetAttenuator:
return i24.attenuator(fake_with_ophyd_sim=True)


@pytest.fixture
def fake_devices(
fake_vgonio, fake_jungfrau, zebra, fake_beam_params, attenuator
) -> JfDevices:
return {
"jungfrau": fake_jungfrau,
"gonio": fake_vgonio,
"zebra": zebra,
"beam_params": fake_beam_params,
"attenuator": attenuator,
}


@pytest.fixture
def fake_create_devices_function(fake_devices) -> Callable[..., dict[str, Device]]:
return lambda: fake_devices
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from unittest.mock import MagicMock, patch

from bluesky.run_engine import RunEngine
from dodal.devices.i24.jungfrau import JungfrauM1

from mx_bluesky.beamlines.i24.jungfrau_commissioning.plans.gain_mode_darks_plans import (
GainMode,
do_manual_acquisition,
set_gain_mode,
)


@patch(
"bluesky.plan_stubs.wait",
)
def test_set_gain_mode(
bps_wait: MagicMock,
fake_devices,
RE: RunEngine,
):
jungfrau: JungfrauM1 = fake_devices["jungfrau"]

RE(set_gain_mode(jungfrau, GainMode.dynamic))
assert jungfrau.gain_mode.get() == "dynamic"
RE(set_gain_mode(jungfrau, GainMode.forceswitchg1))
assert jungfrau.gain_mode.get() == "forceswitchg1"
RE(set_gain_mode(jungfrau, GainMode.forceswitchg2))
assert jungfrau.gain_mode.get() == "forceswitchg2"


@patch(
"bluesky.plan_stubs.wait",
)
def test_do_dark_acq(
bps_wait: MagicMock,
fake_devices,
RE: RunEngine,
):
# gonio: VGonio = fake_devices["gonio"]
# zebra: Zebra = fake_devices["zebra"]
jungfrau: JungfrauM1 = fake_devices["jungfrau"]

RE(do_manual_acquisition(jungfrau, 0.001, 0.001, 1000))
jungfrau.acquire_start.set.assert_called()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"rotation_axis": "omega",
"scan_width_deg": 360.0,
"image_width_deg": 0.1,
"omega_start_deg": 0.0,
"exposure_time_s": 0.01,
"acquire_time_s": 0.001,
"rotation_direction": "POSITIVE",
"offset_deg": 1.0,
"shutter_opening_time_s": 0.6,
"storage_directory": "/tmp/jungfrau_data/",
"nexus_filename": "scan"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from unittest.mock import MagicMock, patch

from mx_bluesky.beamlines.i24.jungfrau_commissioning.__main__ import hlp


@patch("builtins.print")
def test_hlp(mock_print: MagicMock):
hlp()
assert "There are a bunch of available functions." in mock_print.call_args.args[0]
hlp(hlp)
assert (
"When called with no arguments, displays a welcome message."
in mock_print.call_args.args[0]
)
Loading

0 comments on commit e056cfb

Please sign in to comment.