Skip to content

Commit

Permalink
Modify attach_metadata processor to only yield from, not yield (#324)
Browse files Browse the repository at this point in the history
Relates to: bluesky/bluesky#1625

---------

Co-authored-by: Callum Forrester <callum.forrester@diamond.ac.uk>
  • Loading branch information
rosesyrett and callumforrester committed Oct 27, 2023
1 parent c7d4776 commit 9724c0c
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 552 deletions.
2 changes: 1 addition & 1 deletion .github/actions/install_requirements/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ inputs:
required: true
python_version:
description: Python version to install
default: "3.x"
default: "3.9"

runs:
using: composite
Expand Down
5 changes: 4 additions & 1 deletion config/bl38p.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ env:
- kind: planFunctions
module: dls_bluesky_core.plans
- kind: planFunctions
module: dls_bluesky_core.stubs
module: dls_bluesky_core.stubs
data_writing:
visit_directory: /dls/p38/data/2023/cm33874-1
group_name: BL38P
2 changes: 0 additions & 2 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from pydantic.fields import FieldInfo, ModelField

from blueapi.config import EnvironmentConfig, SourceKind
from blueapi.data_management.gda_directory_provider import VisitDirectoryProvider
from blueapi.utils import (
BlueapiPlanModelConfig,
connect_ophyd_async_devices,
Expand Down Expand Up @@ -64,7 +63,6 @@ class BlueskyContext:
plans: Dict[str, Plan] = field(default_factory=dict)
devices: Dict[str, Device] = field(default_factory=dict)
plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict)
directory_provider: Optional[VisitDirectoryProvider] = field(default=None)
sim: bool = field(default=False)

_reference_cache: Dict[Type, Type] = field(default_factory=dict)
Expand Down
Empty file.
128 changes: 0 additions & 128 deletions src/blueapi/data_management/gda_directory_provider.py

This file was deleted.

52 changes: 9 additions & 43 deletions src/blueapi/preprocessors/attach_metadata.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky.utils import make_decorator
from ophyd_async.core import DirectoryProvider

from blueapi.core import MsgGenerator
from blueapi.data_management.gda_directory_provider import VisitDirectoryProvider

DATA_SESSION = "data_session"
DATA_GROUPS = "data_groups"


def attach_metadata(
plan: MsgGenerator,
provider: VisitDirectoryProvider,
provider: DirectoryProvider,
) -> MsgGenerator:
"""
Attach data session metadata to the runs within a plan and make it correlate
with an ophyd-async DirectoryProvider.
This wrapper is meant to ensure (on a best-effort basis) that detectors write
their data to the same place for a given run, and that their writings are
tied together in the run via the data_session metadata keyword in the run
start document.
The wrapper groups data by staging and bundles it with runs as best it can.
Since staging is inherently decoupled from runs this is done on a best-effort
basis. In the following sequence of messages:
|stage|, stage, |open_run|, close_run, unstage, unstage, |stage|, stage,
|open_run|, close_run, unstage, unstage
A new group is created at each |stage| and bundled into the start document
at each |open_run|.
This calls the directory provider and ensures the start document contains
the correct data session.
Args:
plan: The plan to preprocess
Expand All @@ -41,32 +29,10 @@ def attach_metadata(
Yields:
Iterator[Msg]: Plan messages
"""

group_in_progress = False

for message in plan:
# If the first stage in a series of stages is detected,
# update the directory provider and create a new group.
if (message.command == "stage") and (not group_in_progress):
yield from bps.wait_for([provider.update])
group_in_progress = True
# Mark if detectors are being unstaged so that the start
# of the next sequence of stages is detectable.
elif message.command == "unstage":
group_in_progress = False

# If a run is being opened, attempt to bundle the information
# on any existing group into the start document.
if message.command == "open_run":
# Handle the case where we're opening a run but no detectors
# have been staged yet. Common for nested runs.
if not group_in_progress:
yield from bps.wait_for([provider.update])
directory_info = provider()
message.kwargs[DATA_SESSION] = directory_info.filename_prefix

# This is a preprocessor so we yield the original message.
yield message
directory_info = provider()
yield from bpp.inject_md_wrapper(
plan, md={DATA_SESSION: directory_info.filename_prefix}
)


attach_metadata_decorator = make_decorator(attach_metadata)
24 changes: 5 additions & 19 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import logging
from typing import Mapping, Optional

from ophyd_async.core import StaticDirectoryProvider

from blueapi.config import ApplicationConfig
from blueapi.core import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.data_management.gda_directory_provider import (
LocalVisitServiceClient,
VisitDirectoryProvider,
VisitServiceClient,
VisitServiceClientBase,
)
from blueapi.messaging import StompMessagingTemplate
from blueapi.messaging.base import MessagingTemplate
from blueapi.preprocessors.attach_metadata import attach_metadata
Expand Down Expand Up @@ -92,18 +88,9 @@ def setup_handler(
plan_wrappers = []

if config:
visit_service_client: VisitServiceClientBase
if config.env.data_writing.visit_service_url is not None:
visit_service_client = VisitServiceClient(
config.env.data_writing.visit_service_url
)
else:
visit_service_client = LocalVisitServiceClient()

provider = VisitDirectoryProvider(
data_group_name=config.env.data_writing.group_name,
data_directory=config.env.data_writing.visit_directory,
client=visit_service_client,
provider = StaticDirectoryProvider(
filename_prefix=f"{config.env.data_writing.group_name}-blueapi",
directory_path=str(config.env.data_writing.visit_directory),
)

# Make all dodal devices created by the context use provider if they can
Expand All @@ -125,7 +112,6 @@ def setup_handler(
config,
context=BlueskyContext(
plan_wrappers=plan_wrappers,
directory_provider=provider,
sim=False,
),
)
Expand Down
Empty file removed tests/data_writing/__init__.py
Empty file.
66 changes: 0 additions & 66 deletions tests/data_writing/test_gda_directory_provider.py

This file was deleted.

Loading

0 comments on commit 9724c0c

Please sign in to comment.