Skip to content

Commit

Permalink
Merge branch 'main' into 364-auto-generate-rest-client-1
Browse files Browse the repository at this point in the history
  • Loading branch information
stan-dot authored May 17, 2024
2 parents 35fb716 + 5950772 commit e591bc6
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 170 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"uvicorn",
"requests",
"dls-bluesky-core", #requires ophyd-async
"dls-dodal",
"dls-dodal>=1.24.0",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
44 changes: 23 additions & 21 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from requests.exceptions import ConnectionError

from blueapi import __version__
from blueapi.cli.amq import AmqClient
from blueapi.cli.event_bus_client import EventBusClient
from blueapi.config import ApplicationConfig, ConfigLoader
from blueapi.core import DataEvent
from blueapi.messaging import MessageContext
Expand Down Expand Up @@ -135,7 +135,9 @@ def listen_to_events(obj: dict) -> None:
"""Listen to events output by blueapi"""
config: ApplicationConfig = obj["config"]
if config.stomp is not None:
amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp))
event_bus_client = EventBusClient(
StompMessagingTemplate.autoconfigured(config.stomp)
)
else:
raise RuntimeError("Message bus needs to be configured")

Expand All @@ -150,8 +152,8 @@ def on_event(
"Subscribing to all bluesky events from "
f"{config.stomp.host}:{config.stomp.port}"
)
with amq_client:
amq_client.subscribe_to_all_events(on_event)
with event_bus_client:
event_bus_client.subscribe_to_all_events(on_event)
input("Press enter to exit")


Expand Down Expand Up @@ -182,6 +184,14 @@ def run_plan(
client: BlueapiRestClient = obj["rest_client"]

logger = logging.getLogger(__name__)

if config.stomp is not None:
_message_template = StompMessagingTemplate.autoconfigured(config.stomp)
else:
raise RuntimeError(
"Cannot run plans without Stomp configuration to track progress"
)
event_bus_client = EventBusClient(_message_template)
finished_event: deque[WorkerEvent] = deque()

def store_finished_event(event: WorkerEvent) -> None:
Expand All @@ -194,23 +204,15 @@ def store_finished_event(event: WorkerEvent) -> None:
resp = client.create_task(task)
task_id = resp.task_id

if eventbus:
if config.stomp is not None:
_message_template = StompMessagingTemplate.autoconfigured(config.stomp)
else:
raise RuntimeError(
"Cannot run plans without Stomp configuration to track progress"
)
amq_client = AmqClient(_message_template)
with amq_client:
amq_client.subscribe_to_topics(task_id, on_event=store_finished_event)
updated = client.update_worker_task(WorkerTask(task_id=task_id))

amq_client.wait_for_complete(timeout=timeout)

if amq_client.timed_out:
logger.error(f"Plan did not complete within {timeout} seconds")
return
with event_bus_client:
event_bus_client.subscribe_to_topics(task_id, on_event=store_finished_event)
updated = client.update_worker_task(WorkerTask(task_id=task_id))

event_bus_client.wait_for_complete(timeout=timeout)

if event_bus_client.timed_out:
logger.error(f"Plan did not complete within {timeout} seconds")
return

process_event_after_finished(finished_event.pop(), logger)
pprint(updated.dict())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, message: str) -> None:
_Event = WorkerEvent | ProgressEvent | DataEvent


class AmqClient:
class EventBusClient:
app: MessagingTemplate
complete: threading.Event
timed_out: bool | None
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/cli/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from blueapi.worker import Task, TrackableTask, WorkerState

from .amq import BlueskyRemoteError
from .event_bus_client import BlueskyRemoteError

T = TypeVar("T")

Expand Down
11 changes: 1 addition & 10 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
from types import ModuleType, UnionType
from typing import Any, Generic, TypeVar, Union, get_args, get_origin, get_type_hints

from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop
from bluesky.run_engine import RunEngine
from pydantic import create_model
from pydantic.fields import FieldInfo, ModelField

from blueapi.config import EnvironmentConfig, SourceKind
from blueapi.utils import (
BlueapiPlanModelConfig,
connect_ophyd_async_devices,
load_module_all,
)

Expand Down Expand Up @@ -45,7 +44,6 @@ class BlueskyContext:
plans: dict[str, Plan] = field(default_factory=dict)
devices: dict[str, Device] = field(default_factory=dict)
plan_functions: dict[str, PlanGenerator] = field(default_factory=dict)
sim: bool = field(default=False)

_reference_cache: dict[type, type] = field(default_factory=dict)

Expand Down Expand Up @@ -78,13 +76,6 @@ def with_config(self, config: EnvironmentConfig) -> None:
elif source.kind is SourceKind.DODAL:
self.with_dodal_module(mod)

call_in_bluesky_event_loop(
connect_ophyd_async_devices(
self.devices.values(),
self.sim,
)
)

def with_plan_module(self, module: ModuleType) -> None:
"""
Register all functions in the module supplied as plans.
Expand Down
4 changes: 1 addition & 3 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ def setup_handler(

handler = Handler(
config,
context=BlueskyContext(
sim=False,
),
context=BlueskyContext(),
)
handler.start()

Expand Down
2 changes: 0 additions & 2 deletions src/blueapi/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from .base_model import BlueapiBaseModel, BlueapiModelConfig, BlueapiPlanModelConfig
from .invalid_config_error import InvalidConfigError
from .modules import load_module_all
from .ophyd_async_connect import connect_ophyd_async_devices
from .serialization import serialize
from .thread_exception import handle_all_exceptions

Expand All @@ -14,5 +13,4 @@
"BlueapiModelConfig",
"BlueapiPlanModelConfig",
"InvalidConfigError",
"connect_ophyd_async_devices",
]
54 changes: 0 additions & 54 deletions src/blueapi/utils/ophyd_async_connect.py

This file was deleted.

77 changes: 0 additions & 77 deletions tests/utils/test_ophyd_async_connect.py

This file was deleted.

0 comments on commit e591bc6

Please sign in to comment.