Skip to content

Commit

Permalink
Move util functions from i22 branch of ophyd_async that may be genera…
Browse files Browse the repository at this point in the history
…lly useful (#10)

* Move util functions from i22 branch of ophyd_async that may be generally useful

* Add tests to boost test coverage back up

* Changes to step_to_num to match expected behaviour

* Update tests checking Annotations

* Linting

* test for group_uuid

* Move i22 generic plans (but do not expose to BlueAPI)

* Inverse in_micros logic to actually convert to micros

* Respond to review comments:

- get_constant_duration renamed to be clearer intended function, always return None instead of throwing
- in_micros to throw exception when working in negative time
- Add tests for intended behaviour of get_constant_duration

* Handle Spec Product not raising on multiple Frames in axis

* Remove scanspec_fly until ScanSpecFlyable is recreated

* Rename ScanAxis to ScannableAxis

* Revert unrelated nothing change

* Remove container build from CI as not application code

* Update src/dls_bluesky_core/core/maths.py

* Update src/dls_bluesky_core/core/maths.py

Co-authored-by: Tom C (DLS) <101418278+coretl@users.noreply.github.com>

* linting

* Update src/dls_bluesky_core/core/maths.py

* Add test for negative step with positive span

* lint

* Added stream name to bps.collect in fly_and_collect

* Move inject method for type checked default arguments from blueapi

* Move inject method for type checked default arguments from blueapi

* Add test for fly_and_collect stub

* Remove dependency on BlueAPI

* Add pytest-asyncio as dependency

* Ignoring untyped function definition for core.coordination.inject for now

* lint

* Change test structure and add docstring to fly_and_collect

* Modify docstring

---------

Co-authored-by: Tom C (DLS) <101418278+coretl@users.noreply.github.com>
Co-authored-by: Rose Yemelyanova <rose.yemelyanova@diamond.ac.uk>
  • Loading branch information
3 people committed Oct 23, 2023
1 parent 512edd2 commit fc6625a
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 94 deletions.
88 changes: 0 additions & 88 deletions .github/workflows/code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,94 +102,6 @@ jobs:
# If more than one module in src/ replace with module name to test
run: python -m $(ls src | head -1) --version

container:
needs: [lint, dist, test]
runs-on: ubuntu-latest

permissions:
contents: read
packages: write

env:
TEST_TAG: "testing"

steps:
- name: Checkout
uses: actions/checkout@v3

# image names must be all lower case
- name: Generate image repo name
run: echo IMAGE_REPOSITORY=ghcr.io/$(tr '[:upper:]' '[:lower:]' <<< "${{ github.repository }}") >> $GITHUB_ENV

- name: Download wheel and lockfiles
uses: actions/download-artifact@v3
with:
path: artifacts/

- name: Log in to GitHub Docker Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v2

- name: Build and export to Docker local cache
uses: docker/build-push-action@v4
with:
# Note build-args, context, file, and target must all match between this
# step and the later build-push-action, otherwise the second build-push-action
# will attempt to build the image again
build-args: |
PIP_OPTIONS=-r lockfiles/requirements.txt dist/*.whl
context: artifacts/
file: ./Dockerfile
target: runtime
load: true
tags: ${{ env.TEST_TAG }}
# If you have a long docker build (2+ minutes), uncomment the
# following to turn on caching. For short build times this
# makes it a little slower
#cache-from: type=gha
#cache-to: type=gha,mode=max

- name: Test cli works in cached runtime image
run: docker run docker.io/library/${{ env.TEST_TAG }} --version

- name: Create tags for publishing image
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.IMAGE_REPOSITORY }}
tags: |
type=ref,event=tag
type=raw,value=latest, enable=${{ github.ref_type == 'tag' }}
# type=edge,branch=main
# Add line above to generate image for every commit to given branch,
# and uncomment the end of if clause in next step

- name: Push cached image to container registry
if: github.ref_type == 'tag' # || github.ref_name == 'main'
uses: docker/build-push-action@v3
# This does not build the image again, it will find the image in the
# Docker cache and publish it
with:
# Note build-args, context, file, and target must all match between this
# step and the previous build-push-action, otherwise this step will
# attempt to build the image again
build-args: |
PIP_OPTIONS=-r lockfiles/requirements.txt dist/*.whl
context: artifacts/
file: ./Dockerfile
target: runtime
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

release:
# upload to PyPI and make a release on every tag
needs: [lint, dist, test]
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ classifiers = [
]
description = "Common Diamond specific Bluesky plans and functions"
dependencies = [
"blueapi",
"ophyd",
"scanspec"
"ophyd_async @ git+https://github.com/bluesky/ophyd-async.git",
"scanspec",
] # Add project dependencies here, e.g. ["click", "numpy"]
dynamic = ["version"]
license.file = "LICENSE"
Expand All @@ -32,6 +32,7 @@ dev = [
"pre-commit",
"pydata-sphinx-theme>=0.12",
"pytest",
"pytest-asyncio",
"pytest-cov",
"sphinx-autobuild",
"sphinx-copybutton",
Expand Down Expand Up @@ -84,6 +85,7 @@ addopts = """
filterwarnings = "error"
# Doctest python code in docs, python code in src docstrings, test functions in tests
testpaths = "docs src tests"
asyncio_mode = "auto"

[tool.coverage.run]
data_file = "/tmp/dls_bluesky_core.coverage"
Expand Down
11 changes: 10 additions & 1 deletion src/dls_bluesky_core/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from .types import MsgGenerator, PlanGenerator
from .coordination import group_uuid, inject
from .maths import in_micros, step_to_num
from .scanspecs import get_constant_duration
from .types import MsgGenerator, PlanGenerator, ScannableAxis

__all__ = [
"get_constant_duration",
"group_uuid",
"inject",
"in_micros",
"MsgGenerator",
"PlanGenerator",
"ScannableAxis",
"step_to_num",
]
32 changes: 32 additions & 0 deletions src/dls_bluesky_core/core/coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import uuid


def group_uuid(name: str) -> str:
"""
Returns a unique but human-readable string, to assist debugging orchestrated groups.
Args:
name (str): A human readable name
Returns:
readable_uid (str): name appended with a unique string
"""
return f"{name}-{str(uuid.uuid4())[:6]}"


def inject(name: str): # type: ignore
"""
Function to mark a default argument of a plan method as a reference to a device
that is stored in the Blueapi context.
Bypasses mypy linting, returning x as Any and therefore valid as a default
argument.
Args:
name (str): Name of a device to be fetched from the Blueapi context
Returns:
Any: name but without typing checking, valid as any default type
"""

return name
50 changes: 50 additions & 0 deletions src/dls_bluesky_core/core/maths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Tuple

import numpy as np


def step_to_num(start: float, stop: float, step: float) -> Tuple[float, float, int]:
"""
Standard handling for converting from start, stop, step to start, stop, num
Forces step to be same direction as length
Includes a final point if it is within 1% of the end point (consistent with GDA)
Args:
start (float):
Start of length, will be returned unchanged
stop (float):
End of length, if length/step does not divide cleanly will be returned
extended up to 1% of step, or else truncated.
step (float):
Length of a step along the line formed from start to stop.
If stop < start, will be coerced to be backwards.
Returns:
start, adjusted_stop, num = Tuple[float, float, int]
start will be returned unchanged
adjusted_stop = start + (num - 1) * step
num is the maximal number of steps that could fit into the length.
"""
# Make step be the right direction
step = abs(step) if stop >= start else -abs(step)
# If stop is within 1% of a step then include it
steps = int((stop - start) / step + 0.01)
return start, start + steps * step, steps + 1 # include 1st point


def in_micros(t: float) -> int:
"""
Converts between a positive number of seconds and an equivalent
number of microseconds.
Args:
t (float): A time in seconds
Raises:
ValueError: if t < 0
Returns:
t (int): A time in microseconds, rounded up to the nearest whole microsecond,
"""
if t < 0:
raise ValueError(f"Expected a positive time in seconds, got {t!r}")
return int(np.ceil(t * 1e6))
33 changes: 33 additions & 0 deletions src/dls_bluesky_core/core/scanspecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import List, Optional

import numpy as np
from scanspec.core import Frames
from scanspec.specs import DURATION


def get_constant_duration(frames: List[Frames]) -> Optional[float]:
"""
Returns the duration of a number of ScanSpec frames, if known and consistent.
Args:
frames (List[Frames]): A number of Frame objects
Returns:
duration (float): if all frames have a consistent duration
None: otherwise
"""
duration_frame = [
f for f in frames if DURATION in f.axes() and len(f.midpoints[DURATION])
]
if len(duration_frame) != 1:
# Either no frame has DURATION axis,
# the frame with a DURATION axis has 0 points,
# or multiple frames have DURATION axis
return None
durations = duration_frame[0].midpoints[DURATION]
first_duration = durations[0]
if np.any(durations != first_duration):
# Not all durations are the same
return None
return first_duration
5 changes: 4 additions & 1 deletion src/dls_bluesky_core/core/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Any, Callable, Generator
from typing import Any, Callable, Generator, Union

from bluesky import Msg
from ophyd_async.core import Device
from scanspec.specs import DURATION

# 'A true "plan", usually the output of a generator function'
MsgGenerator = Generator[Msg, Any, None]
# 'A function that generates a plan'
PlanGenerator = Callable[..., MsgGenerator]
ScannableAxis = Union[Device, DURATION]
60 changes: 60 additions & 0 deletions src/dls_bluesky_core/stubs/flyables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import bluesky.plan_stubs as bps
from bluesky.protocols import Flyable

from dls_bluesky_core.core import MsgGenerator, group_uuid


def fly_and_collect(
flyer: Flyable,
flush_period: float = 0.5,
checkpoint_every_collect: bool = False,
stream_name: str = "primary",
) -> MsgGenerator:
"""Fly and collect a flyer, waiting for collect to finish with a period.
flyer.kickoff and complete are called, which starts the fly scanning process.
bps.wait is called, which finishes after each flush period and then repeats, until
complete finishes. At this point, bps.collect is called to gather the documents
produced.
For some flyers, this plan will need to be called in succession in order to, for
example, set up a flyer to send triggers multiple times and collect data. For such
a use case, this plan can be setup to checkpoint for each collect.
Note: this plan must be wrapped with calls to open and close run, and the flyer
must implement the Collectable protocol. See tests/stubs/test_flyables for an
example.
Args:
flyer (Flyable, Collectable): ophyd-async device which implements Flyable and
Collectable.
flush_period (float): How often to check if flyer.complete has finished.
Defaults to 0.5
checkpoint_every_collect (bool): whether or not to checkpoint after
flyer.collect has been called. Defaults to
False.
stream_name (str): name of the stream to collect from. Defaults to "primary".
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""
yield from bps.kickoff(flyer)
complete_group = group_uuid("complete")
yield from bps.complete(flyer, group=complete_group)
done = False
while not done:
try:
yield from bps.wait(group=complete_group, timeout=flush_period)
except TimeoutError:
pass
else:
done = True
yield from bps.collect(
flyer, stream=True, return_payload=False, name=stream_name
)
if checkpoint_every_collect:
yield from bps.checkpoint()
24 changes: 24 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

import pytest
from bluesky.run_engine import RunEngine, TransitionError


@pytest.fixture(scope="function")
def RE(request):
loop = asyncio.new_event_loop()
loop.set_debug(True)
RE = RunEngine({}, call_returns_result=True, loop=loop)

def clean_event_loop():
if RE.state not in ("idle", "panicked"):
try:
RE.halt()
except TransitionError:
pass
loop.call_soon_threadsafe(loop.stop)
RE._th.join()
loop.close()

request.addfinalizer(clean_event_loop)
return RE
12 changes: 12 additions & 0 deletions tests/core/test_coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import uuid

import pytest

from dls_bluesky_core.core.coordination import group_uuid


@pytest.mark.parametrize("group", ["foo", "bar", "baz", str(uuid.uuid4())])
def test_group_uid(group: str):
gid = group_uuid(group)
assert gid.startswith(f"{group}-")
assert not gid.endswith(f"{group}-")
Loading

0 comments on commit fc6625a

Please sign in to comment.