Skip to content

Commit

Permalink
added unfinished tasks check to the RE fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
evalott100 committed Aug 22, 2024
1 parent 98b7589 commit 24c7ccf
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(
self,
pattern_generator: PatternGenerator,
path_provider: PathProvider,
exposure: float = 0.1,
exposure: Optional[float] = 0.1,
) -> None:
self.pattern_generator: PatternGenerator = pattern_generator
if exposure is None:
Expand All @@ -30,20 +30,19 @@ async def arm(
if exposure is None:
exposure = 0.1
period: float = exposure + self.get_deadtime(exposure)
task = asyncio.create_task(
self.task = asyncio.create_task(
self._coroutine_for_image_writing(exposure, period, num)
)
self.task = task
return AsyncStatus(task)
return AsyncStatus(self.task)

async def disarm(self):
if self.task:
if self.task and not self.task.done():
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.task = None
self.task = None

def get_deadtime(self, exposure: float | None) -> float:
return 0.001
Expand Down
59 changes: 40 additions & 19 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Any, Callable

import pytest
import pytest_asyncio
from bluesky.run_engine import RunEngine, TransitionError
from pytest import FixtureRequest

Expand Down Expand Up @@ -63,34 +62,36 @@ def configure_epics_environment():
_ALLOWED_CORO_TASKS = {"async_finalizer", "async_setup", "async_teardown"}


@pytest_asyncio.fixture(autouse=True, scope="function")
async def assert_no_pending_tasks(request: FixtureRequest):
@pytest.fixture(autouse=True, scope="function")
def assert_no_pending_tasks(request: FixtureRequest):
"""
if "RE" in request.fixturenames:
# The RE fixture will do the same, as well as some additional loop cleanup
return
"""

# There should be no tasks pending after a test has finished
fail_count = request.session.testsfailed

def error_and_kill_pending_tasks():
loop = asyncio.get_event_loop()
unfinished_tasks = [
unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_CORO_TASKS and not task.done()
]
if unfinished_tasks:
for task in unfinished_tasks:
task.cancel()
}
for task in unfinished_tasks:
task.cancel()

# If the tasks are still pending because the test failed
# for other reasons then we don't have to error here
if request.session.testsfailed == fail_count:
raise RuntimeError(
f"Not all tasks {unfinished_tasks} "
f"closed during test {request.node.name}."
)
# If the tasks are still pending because the test failed
# for other reasons then we don't have to error here
if unfinished_tasks and request.session.testsfailed == fail_count:
raise RuntimeError(
f"Not all tasks {unfinished_tasks} "
f"closed during test {request.node.name}."
)

try:
yield
finally:
error_and_kill_pending_tasks()
request.addfinalizer(error_and_kill_pending_tasks)


@pytest.fixture(scope="function")
Expand All @@ -99,16 +100,36 @@ def RE(request: FixtureRequest):
loop.set_debug(True)
RE = RunEngine({}, call_returns_result=True, loop=loop)

fail_count = request.session.testsfailed

def clean_event_loop():
if RE.state not in ("idle", "panicked"):
try:
RE.halt()
except TransitionError:
pass

unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_CORO_TASKS and not task.done()
}

# Cancelling the tasks is thread unsafe, but we get a new event loop
# with each RE test so we don't need to do this
loop.call_soon_threadsafe(loop.stop)
RE._th.join()
loop.close()


# If the tasks are still pending because the test failed
# for other reasons then we don't have to error here
if unfinished_tasks and request.session.testsfailed == fail_count:
raise RuntimeError(
f"Not all tasks {unfinished_tasks} "
f"closed during test {request.node.name}."
)

request.addfinalizer(clean_event_loop)
return RE

Expand Down
45 changes: 26 additions & 19 deletions tests/epics/adcore/test_single_trigger.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,49 @@
import bluesky.plan_stubs as bps
import bluesky.plans as bp
import pytest
from bluesky import RunEngine

from ophyd_async.core import DeviceCollector, set_mock_value
import ophyd_async.plan_stubs as ops
from ophyd_async.epics import adcore


@pytest.fixture
async def single_trigger_det():
async with DeviceCollector(mock=True):
stats = adcore.NDPluginStatsIO("PREFIX:STATS")
det = adcore.SingleTriggerDetector(
drv=adcore.ADBaseIO("PREFIX:DRV"),
stats=stats,
read_uncached=[stats.unique_id],
)
async def single_trigger_det_with_stats():
stats = adcore.NDPluginStatsIO("PREFIX:STATS", name="stats")
det = adcore.SingleTriggerDetector(
drv=adcore.ADBaseIO("PREFIX:DRV"),
stats=stats,
read_uncached=[stats.unique_id],
name="det"
)

assert det.name == "det"
assert stats.name == "det-stats"
# Set non-default values to check they are set back
# These are using set_mock_value to simulate the backend IOC being setup
# in a particular way, rather than values being set by the Ophyd signals
set_mock_value(det.drv.acquire_time, 0.5)
set_mock_value(det.drv.array_counter, 1)
set_mock_value(det.drv.image_mode, adcore.ImageMode.continuous)
set_mock_value(stats.unique_id, 3)
yield det
yield det, stats


async def test_single_trigger_det(
single_trigger_det: adcore.SingleTriggerDetector, RE: RunEngine
single_trigger_det_with_stats: adcore.SingleTriggerDetector, RE: RunEngine
):
single_trigger_det, stats = single_trigger_det_with_stats
names = []
docs = []
RE.subscribe(lambda name, _: names.append(name))
RE.subscribe(lambda _, doc: docs.append(doc))

RE(bp.count([single_trigger_det]))
def plan():
yield from ops.ensure_connected(single_trigger_det, mock=True)
yield from bps.abs_set(single_trigger_det.drv.acquire_time, 0.5)
yield from bps.abs_set(single_trigger_det.drv.array_counter, 1)
yield from bps.abs_set(
single_trigger_det.drv.image_mode, adcore.ImageMode.continuous
)
#set_mock_value(stats.unique_id, 3)
yield from bp.count([single_trigger_det])


RE(plan())

drv = single_trigger_det.drv
assert 1 == await drv.acquire.get_value()
Expand All @@ -47,4 +54,4 @@ async def test_single_trigger_det(
_, descriptor, event, _ = docs
assert descriptor["configuration"]["det"]["data"]["det-drv-acquire_time"] == 0.5
assert event["data"]["det-drv-array_counter"] == 1
assert event["data"]["det-stats-unique_id"] == 3
assert event["data"]["det-stats-unique_id"] == 0
53 changes: 29 additions & 24 deletions tests/epics/adsimdetector/test_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from bluesky import RunEngine
from bluesky.utils import new_uid

import ophyd_async.plan_stubs as ops
from ophyd_async.core import (
AsyncStatus,
DetectorTrigger,
Expand Down Expand Up @@ -328,41 +329,45 @@ async def test_trigger_logic():
...


async def test_detector_with_unnamed_or_disconnected_config_sigs(
RE, static_filename_provider: StaticFilenameProvider, tmp_path: Path
@pytest.mark.parametrize("driver_name, error_output", [
("", "config signal must be named before it is passed to the detector"),
("some-name", (
"config signal some-name-acquire_time must be connected "
"before it is passed to the detector"
)
)
]
)
def test_detector_with_unnamed_or_disconnected_config_sigs(
RE,
static_filename_provider: StaticFilenameProvider,
tmp_path: Path,
driver_name,
error_output
):
dp = StaticPathProvider(static_filename_provider, tmp_path)
drv = adcore.ADBaseIO("FOO:DRV:")

some_other_driver = adcore.ADBaseIO("TEST")

async with DeviceCollector(mock=True):
hdf = adcore.NDFileHDFIO("FOO:HDF:")
det = adsimdetector.SimDetector(
drv,
hdf,
dp,
config_sigs=[some_other_driver.acquire_time, drv.acquire],
name="foo",
)
some_other_driver = adcore.ADBaseIO("TEST", name=driver_name)

with pytest.raises(Exception) as exc:
RE(count_sim([det], times=1))

assert isinstance(exc.value.args[0], AsyncStatus)
assert (
str(exc.value.args[0].exception())
== "config signal must be named before it is passed to the detector"
hdf = adcore.NDFileHDFIO("FOO:HDF:")
det = adsimdetector.SimDetector(
drv,
hdf,
dp,
config_sigs=[some_other_driver.acquire_time, drv.acquire],
name="foo",
)

some_other_driver.set_name("some-name")
def plan():
yield from ops.ensure_connected(det, mock=True)
yield from count_sim([det], times=1)

with pytest.raises(Exception) as exc:
RE(count_sim([det], times=1))
RE(plan())

assert isinstance(exc.value.args[0], AsyncStatus)
assert (
str(exc.value.args[0].exception())
== "config signal some-name-acquire_time must be connected before it is "
+ "passed to the detector"
== error_output
)
5 changes: 1 addition & 4 deletions tests/sim/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import pytest

from ophyd_async.core import DeviceCollector
from ophyd_async.sim.demo import PatternDetector


@pytest.fixture
async def sim_pattern_detector(tmp_path_factory) -> PatternDetector:
path: Path = tmp_path_factory.mktemp("tmp")
async with DeviceCollector(mock=True):
sim_pattern_detector = PatternDetector(name="PATTERN1", path=path)
return PatternDetector(name="PATTERN1", path=path)

return sim_pattern_detector
18 changes: 7 additions & 11 deletions tests/sim/test_sim_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,13 @@
import bluesky.plans as bp
import h5py
import numpy as np
import pytest
from bluesky import RunEngine

from ophyd_async.core import DeviceCollector, assert_emitted
from ophyd_async.epics import motor
from ophyd_async.core import assert_emitted
from ophyd_async.plan_stubs import ensure_connected
from ophyd_async.sim.demo import PatternDetector


@pytest.fixture
async def sim_motor():
async with DeviceCollector(mock=True):
sim_motor = motor.Motor("test")
return sim_motor


async def test_sim_pattern_detector_initialization(
sim_pattern_detector: PatternDetector,
):
Expand All @@ -43,7 +35,11 @@ async def test_writes_pattern_to_file(
def capture_emitted(name, doc):
docs[name].append(doc)

RE(bp.count([sim_pattern_detector]), capture_emitted)
def plan():
yield from ensure_connected(sim_pattern_detector, mock=True)
yield from bp.count([sim_pattern_detector])

RE(plan(), capture_emitted)
assert_emitted(
docs, start=1, descriptor=1, stream_resource=2, stream_datum=2, event=1, stop=1
)
Expand Down
14 changes: 11 additions & 3 deletions tests/sim/test_streaming_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from bluesky.run_engine import RunEngine

from ophyd_async.core import assert_emitted
from ophyd_async.plan_stubs import ensure_connected
from ophyd_async.sim.demo import PatternDetector


Expand All @@ -19,8 +20,11 @@ def append_and_print(name, doc):
docs.append(doc)

RE.subscribe(append_and_print)
def plan():
yield from ensure_connected(sim_pattern_detector, mock=True)
yield from bp.count([sim_pattern_detector], num=1)

RE(bp.count([sim_pattern_detector], num=1))
RE(plan())

# NOTE - double resource because double stream
assert names == [
Expand All @@ -38,8 +42,12 @@ def append_and_print(name, doc):

async def test_plan(RE: RunEngine, sim_pattern_detector: PatternDetector):
docs = defaultdict(list)
RE(bp.count([sim_pattern_detector]), lambda name, doc: docs[name].append(doc))
def plan():
yield from ensure_connected(sim_pattern_detector, mock=True)
yield from bp.count([sim_pattern_detector])

RE(plan(), lambda name, doc: docs[name].append(doc))

assert_emitted(
docs, start=1, descriptor=1, stream_resource=2, stream_datum=2, event=1, stop=1
)
await sim_pattern_detector.writer.close()

0 comments on commit 24c7ccf

Please sign in to comment.