Skip to content

Commit

Permalink
Move util functions from i22 branch of ophyd_async that may be genera…
Browse files Browse the repository at this point in the history
…lly useful
  • Loading branch information
DiamondJoseph committed Oct 12, 2023
1 parent 512edd2 commit 6fdea74
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 2 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ description = "Common Diamond specific Bluesky plans and functions"
dependencies = [
"blueapi",
"ophyd",
"ophyd_async",
"scanspec"
] # Add project dependencies here, e.g. ["click", "numpy"]
dynamic = ["version"]
Expand Down
11 changes: 10 additions & 1 deletion src/dls_bluesky_core/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from .types import MsgGenerator, PlanGenerator
from .coordination import group_uuid
from .maths import step_to_num, in_micros
from .scanspecs import get_duration
from .types import MsgGenerator, PlanGenerator, ScanAxis


__all__ = [
"get_duration",
"group_uuid",
"in_micros",
"MsgGenerator",
"PlanGenerator",
"ScanAxis",
"step_to_num"
]
14 changes: 14 additions & 0 deletions src/dls_bluesky_core/core/coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import uuid


def group_uuid(name: str) -> str:
"""
Returns a unique but human-readable string, to assist debugging orchestrated groups.
Args:
name (str): A human readable name
Returns:
readable_uid (str): name appended with a unique string
"""
return f"{name}-{str(uuid.uuid4())[:6]}"

Check warning on line 14 in src/dls_bluesky_core/core/coordination.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/coordination.py#L14

Added line #L14 was not covered by tests
47 changes: 47 additions & 0 deletions src/dls_bluesky_core/core/maths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Tuple

import numpy as np


def step_to_num(start: float, stop: float, step: float) -> Tuple[float, float, int]:
"""
Standard handling for converting from start, stop, step to start, stop, num
Forces step to be same direction as length
Includes a final point if it is within 1% of the end point (consistent with GDA)
Args:
start (float):
Start of length, will be returned unchanged
stop (float):
End of length, if length/step does not divide cleanly will be returned
extended up to 1% of step, or else truncated.
step (float):
Length of a step along the line formed from start to stop.
If stop < start, will be coerced to be backwards.
Returns:
start, truncated_stop, num = Tuple[float, float, int]
start will be returned unchanged
truncated_stop = start + num * step
num is the maximal number of steps that could fit into the length.
"""
# Make step be the right direction
step = abs(step) if stop > start else -abs(step)

Check warning on line 30 in src/dls_bluesky_core/core/maths.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/maths.py#L30

Added line #L30 was not covered by tests
# If stop is within 1% of a step then include it
num = int((stop - start) / step + 0.01)
return start, start + num * step, num

Check warning on line 33 in src/dls_bluesky_core/core/maths.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/maths.py#L32-L33

Added lines #L32 - L33 were not covered by tests


def in_micros(t: float) -> int:
"""
Converts between units of microT and units of T.
For example, from microseconds to seconds.
Args:
t (float): A time in microseconds, or other measurement in units of microU
Returns:
t (int): A time in seconds rounded up to the nearest whole second,
or other measurement in units of U, rounded up to the nearest whole U.
"""
return np.ceil(t / 1e6)

Check warning on line 47 in src/dls_bluesky_core/core/maths.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/maths.py#L47

Added line #L47 was not covered by tests
32 changes: 32 additions & 0 deletions src/dls_bluesky_core/core/scanspecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import List, Optional

import numpy as np
from scanspec.core import Frames
from scanspec.specs import DURATION


def get_duration(frames: List[Frames]) -> Optional[float]:
"""
Returns the duration of a number of ScanSpec frames, if known and consistent.
Args:
frames (List[Frames]): A number of Frame objects
Raises:
ValueError: If any frames do not have a Duration defined.
Returns:
duration (float): if all frames have a consistent duration
None: otherwise
"""
for fs in frames:
if DURATION in fs.axes():
durations = fs.midpoints[DURATION]
first_duration = durations[0]
if np.all(durations == first_duration):

Check warning on line 27 in src/dls_bluesky_core/core/scanspecs.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/scanspecs.py#L23-L27

Added lines #L23 - L27 were not covered by tests
# Constant duration, return it
return first_duration

Check warning on line 29 in src/dls_bluesky_core/core/scanspecs.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/scanspecs.py#L29

Added line #L29 was not covered by tests
else:
return None
raise ValueError("Duration not specified in Spec")

Check warning on line 32 in src/dls_bluesky_core/core/scanspecs.py

View check run for this annotation

Codecov / codecov/patch

src/dls_bluesky_core/core/scanspecs.py#L31-L32

Added lines #L31 - L32 were not covered by tests
5 changes: 4 additions & 1 deletion src/dls_bluesky_core/core/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Any, Callable, Generator
from typing import Any, Callable, Generator, Union

from bluesky import Msg
from ophyd_async.core import Device
from scanspec.specs import DURATION

# 'A true "plan", usually the output of a generator function'
MsgGenerator = Generator[Msg, Any, None]
# 'A function that generates a plan'
PlanGenerator = Callable[..., MsgGenerator]
ScanAxis = Union[Device, DURATION]

0 comments on commit 6fdea74

Please sign in to comment.