Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ZohebShaikh committed Sep 20, 2024
1 parent 41da0f4 commit 232d921
Showing 1 changed file with 12 additions and 20 deletions.
32 changes: 12 additions & 20 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,37 +513,29 @@ async def observe_signals_values(
-----
Example usage::
async for value in observe_value(sig):
async for value1,value2,value3 in observe_signals_values(sig1,sig2,..):
do_something_with(value)
"""
q: asyncio.Queue[tuple[SignalR[T], T | Status]] = asyncio.Queue()
q: asyncio.Queue[tuple[SignalR[T], T] | Status] = asyncio.Queue()
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

def wrapped_signal_put(signal: SignalR[T]):
def queue_value(value: T):
q.put_nowait((signal, value))
cbs: dict[SignalR, Callback] = {}
for signal in signals:

def queue_status(status: Status):
q.put_nowait((signal, status))
def queue_value(value: T, signal=signal):
q.put_nowait((signal, value))

def clear_signals():
signal.clear_sub(queue_value)
signal.clear_sub(queue_status)
cbs[signal] = queue_value
signal.subscribe_value(queue_value)

return queue_value, queue_status, clear_signals
if done_status is not None:
done_status.add_callback(q.put_nowait)

clear_signals = []
for signal in signals:
queue_value, queue_status, clear_signal = wrapped_signal_put(signal)
clear_signals.append(clear_signal)
if done_status is not None:
done_status.add_callback(queue_status)
signal.subscribe_value(queue_value)
try:
while True:
item = await get_value()
Expand All @@ -555,8 +547,8 @@ def clear_signals():
else:
yield item # type: ignore
finally:
for clear_signal in clear_signals:
clear_signal()
for signal, cb in cbs.items():
signal.clear_sub(cb)


class _ValueChecker(Generic[T]):
Expand Down

0 comments on commit 232d921

Please sign in to comment.