Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure records do not get stuck in processing state #175

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Changed:

- `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_
- `Ensure returned numpy arrays are not writeable <../../pull/164>`_
- `Ensure records do not get stuck in processing state <../../pull/175>`_

Fixed:

Expand Down
5 changes: 3 additions & 2 deletions softioc/asyncio_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ async def async_wrapper():
ret = func(*func_args)
if inspect.isawaitable(ret):
await ret
if completion:
completion(*completion_args)
except Exception:
logging.exception("Exception when running dispatched callback")
finally:
if completion:
completion(*completion_args)
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)

def __enter__(self):
Expand Down
16 changes: 13 additions & 3 deletions softioc/cothread_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import inspect
import logging

class CothreadDispatcher:
def __init__(self, dispatcher = None):
Expand Down Expand Up @@ -28,7 +30,15 @@ def __call__(
completion = None,
completion_args=()):
def wrapper():
func(*func_args)
if completion:
completion(*completion_args)
try:
func(*func_args)
except Exception:
logging.exception("Exception when running dispatched callback")
finally:
if completion:
completion(*completion_args)

assert not inspect.iscoroutinefunction(func)
assert not inspect.iscoroutinefunction(completion)

self.__dispatcher(wrapper)
130 changes: 130 additions & 0 deletions tests/test_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,136 @@ async def query_record(index):
if process.exitcode is None:
pytest.fail("Process did not terminate")


def blocking_test_func_broken_on_update(
self, device_name, conn, use_asyncio
):

builder.SetDeviceName(device_name)

count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0)

async def async_blocking_broken_on_update(new_val):
"""on_update function that always throws an exception"""
log("CHILD: blocking_broken_on_update starting")
completed_count = count_rec.get() + 1
count_rec.set(completed_count)
log(
f"CHILD: blocking_update_func updated count: {completed_count}",
)
raise Exception("on_update is broken!")

def sync_blocking_broken_on_update(new_val):
"""on_update function that always throws an exception"""
log("CHILD: blocking_broken_on_update starting")
completed_count = count_rec.get() + 1
count_rec.set(completed_count)
log(
f"CHILD: blocking_update_func updated count: {completed_count}",
)
raise Exception("on_update is broken!")

if use_asyncio:
on_update_callback = async_blocking_broken_on_update
else:
on_update_callback = sync_blocking_broken_on_update

builder.longOut(
"BLOCKING-BROKEN-ON-UPDATE",
on_update=on_update_callback,
always_update=True,
blocking=True
)

if use_asyncio:
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
else:
dispatcher = None
builder.LoadDatabase()
softioc.iocInit(dispatcher)

conn.send("R") # "Ready"

log("CHILD: Sent R over Connection to Parent")

# Keep process alive while main thread runs CAGET
if not use_asyncio:
log("CHILD: Beginning cothread poll_list")
import cothread
cothread.poll_list([(conn.fileno(), cothread.POLLIN)], TIMEOUT)
if conn.poll(TIMEOUT):
val = conn.recv()
assert val == "D", "Did not receive expected Done character"

log("CHILD: Received exit command, child exiting")

@requires_cothread
@pytest.mark.asyncio
@pytest.mark.parametrize("use_asyncio", [True, False])
async def test_blocking_broken_on_update(self, use_asyncio):
"""Test that a blocking record with an on_update record that will
always throw an exception will not permanently block record processing.

Runs using both cothread and asyncio dispatchers in the IOC."""
ctx = get_multiprocessing_context()

parent_conn, child_conn = ctx.Pipe()

device_name = create_random_prefix()

process = ctx.Process(
target=self.blocking_test_func_broken_on_update,
args=(device_name, child_conn, use_asyncio),
)

process.start()

log("PARENT: Child started, waiting for R command")

from aioca import caget, caput

try:
# Wait for message that IOC has started
select_and_recv(parent_conn, "R")

log("PARENT: received R command")

assert await caget(device_name + ":BLOCKING-COUNTER") == 0

log("PARENT: BLOCKING-COUNTER was 0")

await caput(
device_name + ":BLOCKING-BROKEN-ON-UPDATE",
1,
wait=True,
timeout=TIMEOUT
)

assert await caget(device_name + ":BLOCKING-COUNTER") == 1

await caput(
device_name + ":BLOCKING-BROKEN-ON-UPDATE",
2,
wait=True,
timeout=TIMEOUT
)

assert await caget(device_name + ":BLOCKING-COUNTER") == 2


finally:
# Clear the cache before stopping the IOC stops
# "channel disconnected" error messages
aioca_cleanup()

log("PARENT: Sending Done command to child")
parent_conn.send("D") # "Done"
process.join(timeout=TIMEOUT)
log(f"PARENT: Join completed with exitcode {process.exitcode}")
if process.exitcode is None:
pytest.fail("Process did not terminate")


class TestGetSetField:
"""Tests related to get_field and set_field on records"""

Expand Down
Loading