From ca904f68a3459cf955de9ed0a7661aa1e245486a Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 19 Sep 2024 13:32:34 +0100 Subject: [PATCH 1/2] Ensure records don't get stuck in processing state If an exception occurs during an on_update callback, the "completion" function would never be called, which meant the PACT flag was never reset. This had the end result of causing the affected record to never process again, i.e. its value could never be changed. --- CHANGELOG.rst | 1 + softioc/asyncio_dispatcher.py | 5 +- softioc/cothread_dispatcher.py | 13 +++- tests/test_records.py | 130 +++++++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c3da414a..21dafa53 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -24,6 +24,7 @@ Changed: - `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_ - `Ensure returned numpy arrays are not writeable <../../pull/164>`_ +- `Ensure records do not get stuck in processing state <../../pull/175>`_ Fixed: diff --git a/softioc/asyncio_dispatcher.py b/softioc/asyncio_dispatcher.py index d4886b5f..78b84b2d 100644 --- a/softioc/asyncio_dispatcher.py +++ b/softioc/asyncio_dispatcher.py @@ -84,10 +84,11 @@ async def async_wrapper(): ret = func(*func_args) if inspect.isawaitable(ret): await ret - if completion: - completion(*completion_args) except Exception: logging.exception("Exception when running dispatched callback") + finally: + if completion: + completion(*completion_args) asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop) def __enter__(self): diff --git a/softioc/cothread_dispatcher.py b/softioc/cothread_dispatcher.py index 3f9d23b8..c6982a50 100644 --- a/softioc/cothread_dispatcher.py +++ b/softioc/cothread_dispatcher.py @@ -1,3 +1,5 @@ +import inspect +import logging class CothreadDispatcher: def __init__(self, dispatcher = None): @@ -28,7 +30,12 @@ def __call__( completion = None, completion_args=()): def wrapper(): - func(*func_args) - if completion: - completion(*completion_args) + try: + func(*func_args) + except Exception: + logging.exception("Exception when running dispatched callback") + finally: + if completion: + completion(*completion_args) + self.__dispatcher(wrapper) diff --git a/tests/test_records.py b/tests/test_records.py index 094c19b1..bf0c1bd9 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -912,6 +912,136 @@ async def query_record(index): if process.exitcode is None: pytest.fail("Process did not terminate") + + def blocking_test_func_broken_on_update( + self, device_name, conn, use_asyncio + ): + + builder.SetDeviceName(device_name) + + count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0) + + async def async_blocking_broken_on_update(new_val): + """on_update function that always throws an exception""" + log("CHILD: blocking_broken_on_update starting") + completed_count = count_rec.get() + 1 + count_rec.set(completed_count) + log( + f"CHILD: blocking_update_func updated count: {completed_count}", + ) + raise Exception("on_update is broken!") + + def sync_blocking_broken_on_update(new_val): + """on_update function that always throws an exception""" + log("CHILD: blocking_broken_on_update starting") + completed_count = count_rec.get() + 1 + count_rec.set(completed_count) + log( + f"CHILD: blocking_update_func updated count: {completed_count}", + ) + raise Exception("on_update is broken!") + + if use_asyncio: + on_update_callback = async_blocking_broken_on_update + else: + on_update_callback = sync_blocking_broken_on_update + + builder.longOut( + "BLOCKING-BROKEN-ON-UPDATE", + on_update=on_update_callback, + always_update=True, + blocking=True + ) + + if use_asyncio: + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + else: + dispatcher = None + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + conn.send("R") # "Ready" + + log("CHILD: Sent R over Connection to Parent") + + # Keep process alive while main thread runs CAGET + if not use_asyncio: + log("CHILD: Beginning cothread poll_list") + import cothread + cothread.poll_list([(conn.fileno(), cothread.POLLIN)], TIMEOUT) + if conn.poll(TIMEOUT): + val = conn.recv() + assert val == "D", "Did not receive expected Done character" + + log("CHILD: Received exit command, child exiting") + + @requires_cothread + @pytest.mark.asyncio + @pytest.mark.parametrize("use_asyncio", [True, False]) + async def test_blocking_broken_on_update(self, use_asyncio): + """Test that a blocking record with an on_update record that will + always throw an exception will not permanently block record processing. + + Runs using both cothread and asyncio dispatchers in the IOC.""" + ctx = get_multiprocessing_context() + + parent_conn, child_conn = ctx.Pipe() + + device_name = create_random_prefix() + + process = ctx.Process( + target=self.blocking_test_func_broken_on_update, + args=(device_name, child_conn, use_asyncio), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from aioca import caget, caput + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + log("PARENT: received R command") + + assert await caget(device_name + ":BLOCKING-COUNTER") == 0 + + log("PARENT: BLOCKING-COUNTER was 0") + + await caput( + device_name + ":BLOCKING-BROKEN-ON-UPDATE", + 1, + wait=True, + timeout=TIMEOUT + ) + + assert await caget(device_name + ":BLOCKING-COUNTER") == 1 + + await caput( + device_name + ":BLOCKING-BROKEN-ON-UPDATE", + 2, + wait=True, + timeout=TIMEOUT + ) + + assert await caget(device_name + ":BLOCKING-COUNTER") == 2 + + + finally: + # Clear the cache before stopping the IOC stops + # "channel disconnected" error messages + aioca_cleanup() + + log("PARENT: Sending Done command to child") + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + log(f"PARENT: Join completed with exitcode {process.exitcode}") + if process.exitcode is None: + pytest.fail("Process did not terminate") + + class TestGetSetField: """Tests related to get_field and set_field on records""" From 7c886b7e7948bb6b1fab832b622599b5c8b1a932 Mon Sep 17 00:00:00 2001 From: AlexWells Date: Thu, 19 Sep 2024 13:33:08 +0100 Subject: [PATCH 2/2] Check that cothread isn't passed an async function --- softioc/cothread_dispatcher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/softioc/cothread_dispatcher.py b/softioc/cothread_dispatcher.py index c6982a50..4aa579ff 100644 --- a/softioc/cothread_dispatcher.py +++ b/softioc/cothread_dispatcher.py @@ -38,4 +38,7 @@ def wrapper(): if completion: completion(*completion_args) + assert not inspect.iscoroutinefunction(func) + assert not inspect.iscoroutinefunction(completion) + self.__dispatcher(wrapper)