Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for bluesky suspenders #508

Open
burkeds opened this issue Aug 5, 2024 · 8 comments
Open

Implement support for bluesky suspenders #508

burkeds opened this issue Aug 5, 2024 · 8 comments
Assignees
Milestone

Comments

@burkeds
Copy link
Contributor

burkeds commented Aug 5, 2024

There is currently no support for bluesky suspenders. SignalR.subscribe() may need a refactor to comply with the expected structure.

from bluesky.suspenders import SuspendFloor
suspender = SuspendFloor(signal=i1.counts, suspend_thresh=5.0, resume_thresh=10)
RE.install_suspender(suspender)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[11], line 3
      1 from bluesky.suspenders import SuspendFloor
      2 suspender = SuspendFloor(signal=i1.counts, suspend_thresh=5.0, resume_thresh=10)
----> 3 RE.install_suspender(suspender)

File ~/projects/BLUESKY/venv/lib/python3.11/site-packages/bluesky/run_engine.py:1083, in RunEngine.install_suspender(self, suspender)
   1070 """
   1071 Install a 'suspender', which can suspend and resume execution.
   1072 
   (...)
   1080 :meth:`RunEngine.clear_suspenders`
   1081 """
   1082 self._suspenders.add(suspender)
-> 1083 suspender.install(self)

File ~/projects/BLUESKY/venv/lib/python3.11/site-packages/bluesky/suspenders.py:76, in SuspenderBase.install(self, RE, event_type)
     74 with self._lock:
     75     self.RE = RE
---> 76 self._sig.subscribe(self, event_type=event_type, run=True)

TypeError: SignalR.subscribe() got an unexpected keyword argument 'event_type'
@coretl
Copy link
Collaborator

coretl commented Aug 19, 2024

Need to add support for the run argument to the subscribe method, as per bluesky/bluesky#1789

@coretl
Copy link
Collaborator

coretl commented Aug 20, 2024

This means adding the run argument to both of these:

def subscribe_value(self, function: Callback[T]):
"""Subscribe to updates in value of a device"""
self._get_cache().subscribe(function, want_value=True)
def subscribe(self, function: Callback[Dict[str, Reading]]) -> None:
"""Subscribe to updates in the reading"""
self._get_cache().subscribe(function, want_value=False)

and if it is False then after

self._listeners[function] = want_value
add it to a set of listeners that want to drop the next notification

In

def _notify(self, function: Callback, want_value: bool):
then if function is in the set of listeners that want to drop the next notification, then remove from the set and return without notifying, otherwise do the current behaviour

@jsouter
Copy link
Contributor

jsouter commented Oct 7, 2024

Working on a branch with those changes + added in a test to confirm when the callback gets called.
There's a couple other API changes required in bluesky or ophyd-async:

  • _get_justification for many of the Suspenders calls self._sig._get(), when there is no synch (or async) method on ophyd-async Signal called get. Probably simplest to cache the last value somewhere in the suspender instead of manually calling get on the signal, or remove the value from the justification message.
  • install calls self._sig.subscribe(), which in ophyd-async returns a dict with {name: reading with metadata}, when just the value is expected. If we're adopting the ophyd interface perhaps could be fixed by renaming subscribe and subscribe_value to something like subscribe_reading and subscribe respectively. Though obviously that's quite a big breaking change.
    Any thoughts @coretl?

@coretl
Copy link
Collaborator

coretl commented Oct 7, 2024

Working on a branch with those changes + added in a test to confirm when the callback gets called. There's a couple other API changes required in bluesky or ophyd-async:

* `_get_justification` for many of the Suspenders calls `self._sig._get()`, when there is no synch (or async) method on ophyd-async `Signal` called `get`. Probably simplest to cache the last value somewhere in the suspender instead of manually calling `get` on the signal, or remove the value from the justification message.

Caching makes sense.

* `install` calls `self._sig.subscribe()`, which in ophyd-async returns a dict with {name: reading with metadata}, when just the value is expected. If we're adopting the ophyd interface perhaps could be fixed by renaming `subscribe` and `subscribe_value` to something like `subscribe_reading` and `subscribe` respectively. Though obviously that's quite a big breaking change.
  Any thoughts @coretl?

Hmm, I only looked at https://github.com/bluesky/bluesky/blob/f46502482e3afcda57e4d6b3ae6bcc0eda129d42/src/bluesky/bundlers.py#L421-L422 which didn't do anything with the value, just went and called read(). I didn't know about suspenders at the time. I guess we have 3 options:

  1. Make suspenders cope with either a value or a reading coming back from subscribe
  2. Make suspenders use subscribe_value in preference to subscribe if it exists to get the value
  3. Remove subscribe_value from ophyd-async, and add sub_type: Literal["value", "reading"] to the signature for bluesky.protocols.subscribe to align it with ophyd, and implement that in ophyd-async

I think I prefer 1 or 3. @tacaswell thoughts?

@coretl
Copy link
Collaborator

coretl commented Oct 8, 2024

@danielballan prefers 1 so we go with that

@jsouter
Copy link
Contributor

jsouter commented Oct 10, 2024

Struggling to get a test to work here, it seems like copying this test https://github.com/bluesky/bluesky/blob/b6828c53a68252caeefc3712ea50c49e507d8db2/src/bluesky/tests/test_suspenders.py#L37

and replacing the threading.Timers with asyncio.create_tasks with the coroutine

    async def _set_after_time(val, sleep):
        await asyncio.sleep(sleep)
        await signal.set(val)

and starting those tasks before calling

    scan = [Msg("checkpoint"), Msg("sleep", None, 0.2)]
    RE(scan) 

that the RE finishes its scan before control returns to the asyncio tasks. I've tried in a few different ways but maybe there's a much more obvious way to test this. The suspender's resume/suspend methods do get called at least but I haven't been able to prove it interrupts as intended.

I have also seen that when trying to set a signal to a value that should suspend while inside a plan using
yield from bps.abs_set(signal, fail_val) that bluesky.suspender.SuspenderBase.__make_event fails to create an asyncio.Event -- h = self.RE._loop.call_soon_threadsafe(really_make_the_event) seems to never actually call the function so h gets cancelled. This doesn't seem to happen in identical circumstances if I replaced the ophyd_async signal with an ophyd signal.

@coretl
Copy link
Collaborator

coretl commented Oct 14, 2024

Are you using the RE event loop to start the tasks? E.g. call_in_bluesky_event_loop(my_task_creation_func()) where async def my_task_creation_func does the sleep then signal.set or creates a task that does it?

@jsouter
Copy link
Contributor

jsouter commented Oct 15, 2024

I've tried both, call_in_bluesky_event_loop is probably what we want, but that has the issue I described above where the really_make_the_event never gets called.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants