Skip to content

Commit

Permalink
Merge branch 'main' into 366-add-cli-commands-to-inspectrestart-the-e…
Browse files Browse the repository at this point in the history
…nvironment
  • Loading branch information
stan-dot authored May 17, 2024
2 parents 59b0786 + 5950772 commit c61b420
Show file tree
Hide file tree
Showing 20 changed files with 31 additions and 857 deletions.
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ RUN pip install .
# The runtime stage copies the built venv into a slim runtime container
FROM python:${PYTHON_VERSION}-slim as runtime
# Add apt-get system dependecies for runtime here if needed
RUN apt-get update && apt-get install -y --no-install-recommends \
# Git required for installing packages at runtime
git \
&& rm -rf /var/lib/apt/lists/*
COPY --from=build /venv/ /venv/
COPY ./container-startup.sh /container-startup.sh
ENV PATH=/venv/bin:$PATH
Expand Down
5 changes: 4 additions & 1 deletion helm/blueapi/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ spec:
{{- include "blueapi.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- if .Values.restartOnConfigChange }}
checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }}
{{- end }}
{{- with .Values.podAnnotations }}
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
Expand Down
2 changes: 2 additions & 0 deletions helm/blueapi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ affinity: {}

hostNetwork: false # May be needed for talking to arcane protocols such as EPICS

restartOnConfigChange: true

listener:
enabled: true
resources: {}
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ dependencies = [
"aiohttp",
"PyYAML",
"click<8.1.4",
"fastapi[all]<0.99",
"fastapi[all]<0.99", # Later versions use a newer openapi schema, which is incompatible with swagger see https://github.com/swagger-api/swagger-codegen/issues/10446
"uvicorn",
"requests",
"dls-bluesky-core", #requires ophyd-async
"dls-dodal<1.21",
"dls-dodal>=1.24.0",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand All @@ -44,7 +44,7 @@ dev = [
"pytest-cov",
"pytest-asyncio",
"ruff",
"sphinx-autobuild",
"sphinx-autobuild==2024.2.4", # Later versions have a clash with fastapi<0.99, remove pin when fastapi is a higher version
"sphinx-copybutton",
"sphinx-click",
"sphinx-design",
Expand Down
21 changes: 12 additions & 9 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from requests.exceptions import ConnectionError

from blueapi import __version__
from blueapi.cli.amq import AmqClient, BlueskyRemoteError

from blueapi.cli.event_bus_client import EventBusClient
from blueapi.config import ApplicationConfig, ConfigLoader
from blueapi.core import DataEvent
from blueapi.messaging import MessageContext
Expand Down Expand Up @@ -136,7 +137,9 @@ def listen_to_events(obj: dict) -> None:
"""Listen to events output by blueapi"""
config: ApplicationConfig = obj["config"]
if config.stomp is not None:
amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp))
event_bus_client = EventBusClient(
StompMessagingTemplate.autoconfigured(config.stomp)
)
else:
raise RuntimeError("Message bus needs to be configured")

Expand All @@ -151,8 +154,8 @@ def on_event(
"Subscribing to all bluesky events from "
f"{config.stomp.host}:{config.stomp.port}"
)
with amq_client:
amq_client.subscribe_to_all_events(on_event)
with event_bus_client:
event_bus_client.subscribe_to_all_events(on_event)
input("Press enter to exit")


Expand Down Expand Up @@ -182,7 +185,7 @@ def run_plan(
raise RuntimeError(
"Cannot run plans without Stomp configuration to track progress"
)
amq_client = AmqClient(_message_template)
event_bus_client = EventBusClient(_message_template)
finished_event: deque[WorkerEvent] = deque()

def store_finished_event(event: WorkerEvent) -> None:
Expand All @@ -195,13 +198,13 @@ def store_finished_event(event: WorkerEvent) -> None:
resp = client.create_task(task)
task_id = resp.task_id

with amq_client:
amq_client.subscribe_to_topics(task_id, on_event=store_finished_event)
with event_bus_client:
event_bus_client.subscribe_to_topics(task_id, on_event=store_finished_event)
updated = client.update_worker_task(WorkerTask(task_id=task_id))

amq_client.wait_for_complete(timeout=timeout)
event_bus_client.wait_for_complete(timeout=timeout)

if amq_client.timed_out:
if event_bus_client.timed_out:
logger.error(f"Plan did not complete within {timeout} seconds")
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, message: str) -> None:
_Event = WorkerEvent | ProgressEvent | DataEvent


class AmqClient:
class EventBusClient:
app: MessagingTemplate
complete: threading.Event
timed_out: bool | None
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/cli/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from blueapi.worker import Task, TrackableTask, WorkerState

from .amq import BlueskyRemoteError
from .event_bus_client import BlueskyRemoteError

T = TypeVar("T")

Expand Down
7 changes: 0 additions & 7 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ class StompConfig(BaseModel):
auth: BasicAuthentication | None = None


class DataWritingConfig(BlueapiBaseModel):
visit_service_url: str | None = None # e.g. "http://localhost:8088/api"
visit_directory: Path = Path("/tmp/0-0")
group_name: str = "example"


class WorkerEventConfig(BlueapiBaseModel):
"""
Config for event broadcasting via the message bus
Expand All @@ -78,7 +72,6 @@ class EnvironmentConfig(BlueapiBaseModel):
Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.plans"),
Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"),
]
data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig)
events: WorkerEventConfig = Field(default_factory=WorkerEventConfig)


Expand Down
25 changes: 2 additions & 23 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
import functools
import logging
from collections.abc import Callable, Sequence
from collections.abc import Callable
from dataclasses import dataclass, field
from importlib import import_module
from inspect import Parameter, signature
from types import ModuleType, UnionType
from typing import Any, Generic, TypeVar, Union, get_args, get_origin, get_type_hints

from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop
from bluesky.run_engine import RunEngine
from pydantic import create_model
from pydantic.fields import FieldInfo, ModelField

from blueapi.config import EnvironmentConfig, SourceKind
from blueapi.utils import (
BlueapiPlanModelConfig,
connect_ophyd_async_devices,
load_module_all,
)

from .bluesky_types import (
BLUESKY_PROTOCOLS,
Device,
HasName,
MsgGenerator,
Plan,
PlanGenerator,
PlanWrapper,
is_bluesky_compatible_device,
is_bluesky_plan_generator,
)
Expand All @@ -45,22 +41,12 @@ class BlueskyContext:
run_engine: RunEngine = field(
default_factory=lambda: RunEngine(context_managers=[])
)
plan_wrappers: Sequence[PlanWrapper] = field(default_factory=list)
plans: dict[str, Plan] = field(default_factory=dict)
devices: dict[str, Device] = field(default_factory=dict)
plan_functions: dict[str, PlanGenerator] = field(default_factory=dict)
sim: bool = field(default=False)

_reference_cache: dict[type, type] = field(default_factory=dict)

def wrap(self, plan: MsgGenerator) -> MsgGenerator:
wrapped_plan = functools.reduce(
lambda wrapped, next_wrapper: next_wrapper(wrapped),
self.plan_wrappers,
plan,
)
yield from wrapped_plan

def find_device(self, addr: str | list[str]) -> Device | None:
"""
Find a device in this context, allows for recursive search.
Expand Down Expand Up @@ -90,13 +76,6 @@ def with_config(self, config: EnvironmentConfig) -> None:
elif source.kind is SourceKind.DODAL:
self.with_dodal_module(mod)

call_in_bluesky_event_loop(
connect_ophyd_async_devices(
self.devices.values(),
self.sim,
)
)

def with_plan_module(self, module: ModuleType) -> None:
"""
Register all functions in the module supplied as plans.
Expand Down
Empty file.
127 changes: 0 additions & 127 deletions src/blueapi/data_management/visit_directory_provider.py

This file was deleted.

41 changes: 0 additions & 41 deletions src/blueapi/preprocessors/attach_metadata.py

This file was deleted.

Loading

0 comments on commit c61b420

Please sign in to comment.