Skip to content

Commit

Permalink
added observe signals
Browse files Browse the repository at this point in the history
  • Loading branch information
ZohebShaikh committed Sep 17, 2024
1 parent c817587 commit 6e86354
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,59 @@ async def get_value():
signal.clear_sub(q.put_nowait)


async def observe_signals_values(
*signals: SignalR[T],
timeout: float | None = None,
done_status: Status | None = None,
) -> AsyncGenerator[T, None]:
"""Subscribe to the value of a signal so it can be iterated from.
Parameters
----------
signals:
Call subscribe_value on this at the start, and clear_sub on it at the
end
timeout:
If given, how long to wait for each updated value in seconds. If an update
is not produced in this time then raise asyncio.TimeoutError
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
Notes
-----
Example usage::
async for value in observe_value(sig):
do_something_with(value)
"""

q: asyncio.Queue[T | Status] = asyncio.Queue()
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

if done_status is not None:
done_status.add_callback(q.put_nowait)
for signal in signals:
signal.subscribe_value(q.put_nowait)
try:
while True:
item = await get_value()
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
else:
break
else:
yield item
finally:
signal.clear_sub(q.put_nowait)


class _ValueChecker(Generic[T]):
def __init__(self, matcher: Callable[[T], bool], matcher_name: str):
self._last_value: Optional[T] = None
Expand Down

0 comments on commit 6e86354

Please sign in to comment.