Skip to content

Commit

Permalink
Formatting correctings for linting test
Browse files Browse the repository at this point in the history
  • Loading branch information
tomtrafford committed Oct 8, 2024
1 parent 4e41f04 commit 79d1086
Showing 1 changed file with 34 additions and 46 deletions.
80 changes: 34 additions & 46 deletions src/ophyd_async/epics/pmac/_pmacTrajectory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from typing import Any

import numpy as np
import numpy.typing as npt
Expand All @@ -7,11 +8,8 @@
from velocity_profile import velocityprofile as vp

from ophyd_async.core import (
AsyncStatus,
TriggerLogic,
WatchableAsyncStatus,
WatcherUpdate,
observe_value,
FlyerController,
wait_for_value,
)
from ophyd_async.epics import motor
from ophyd_async.epics.pmac import Pmac
Expand All @@ -20,17 +18,16 @@


class PmacTrajInfo(BaseModel):
spec: Spec[motor.Motor] = Field()
spec: Spec = Field(default=None)


class PmacTrajectoryTriggerLogic(TriggerLogic[PmacTrajInfo]):
class PmacTrajectoryTriggerLogic(FlyerController[PmacTrajInfo]):
"""Device that moves a PMAC Motor record"""

def __init__(self, pmac: Pmac) -> None:
# Make a dict of which motors are for which cs axis
self.pmac = pmac

@AsyncStatus.wrap
async def prepare(self, value: PmacTrajInfo):
# initialise use_axis values to False
for i in range(len("ABCUVWXYZ")):
Expand All @@ -46,9 +43,10 @@ async def prepare(self, value: PmacTrajInfo):
cs_ports = set()
positions: dict[int, npt.NDArray[np.float64]] = {}
velocities: dict[int, npt.NDArray[np.float64]] = {}
time_array: npt.NDArray[np.float64] = []
cs_axes: dict[motor.Motor, int] = {}

time_array: npt.NDArray[np.float64] = np.empty(
2 * scan_size + (len(gaps) * 4) + 1, dtype=np.float64
)
# Which Axes are in use?
scan_axes = chunk.axes()
for axis in scan_axes:
Expand All @@ -62,9 +60,6 @@ async def prepare(self, value: PmacTrajInfo):
velocities[cs_index] = np.empty(
2 * scan_size + (len(gaps) * 4) + 1, dtype=np.float64
)
time_array = np.empty(
2 * scan_size + (len(gaps) * 4) + 1, dtype=np.float64
)
cs_ports.add(cs_port)
cs_axes[axis] = cs_index
assert len(cs_ports) == 1, "Motors in more than one CS"
Expand Down Expand Up @@ -104,8 +99,8 @@ async def prepare(self, value: PmacTrajInfo):
if gap < scan_size:
# Create Position, velocity and time arrays for the gap
pos_gap, vel_gap, time_gap = await get_gap_profile(chunk, gap)
len_gap = len(time_gap)
for axis in scan_axes:
len_gap = len(time_gap)
if axis != "DURATION":
positions[cs_axes[axis]][
profile_gap : profile_gap + len_gap
Expand Down Expand Up @@ -164,49 +159,38 @@ async def prepare(self, value: PmacTrajInfo):

for axis in scan_axes:
if axis != "DURATION":
self.pmac.profile_cs_name.set(cs_port)
self.pmac.points_to_build.set(profile_length)
self.pmac.use_axis[cs_axes[axis] + 1].set(True)
self.pmac.positions[cs_axes[axis] + 1].set(
await self.pmac.profile_cs_name.set(cs_port)
await self.pmac.points_to_build.set(profile_length)
await self.pmac.use_axis[cs_axes[axis] + 1].set(True)
await self.pmac.positions[cs_axes[axis] + 1].set(
positions[cs_axes[axis]][:profile_length],
)
self.pmac.velocities[cs_axes[axis] + 1].set(
await self.pmac.velocities[cs_axes[axis] + 1].set(
velocities[cs_axes[axis]][:profile_length]
)
else:
self.pmac.time_array.set(time_array[:profile_length])
await self.pmac.time_array.set(time_array[:profile_length])

# MOVE TO START
for axis in scan_axes:
if axis != "DURATION":
await axis.set(self.initial_pos[cs_axes[axis]])

# Set PMAC to use Velocity Array
self.pmac.profile_calc_vel.set(False)
self.pmac.build_profile.set(True)
await self.pmac.profile_calc_vel.set(False)
await self.pmac.build_profile.set(True)
self._fly_start = time.monotonic()

@AsyncStatus.wrap
async def kickoff(self):
self.status = self.pmac.execute_profile.set(1, timeout=self.scantime + 10)
self.status = await self.pmac.execute_profile.set(
True, timeout=self.scantime + 10
)

async def stop(self):
await self.pmac.profile_abort.set(1)
await self.pmac.profile_abort.set(True)

@WatchableAsyncStatus.wrap
async def complete(self):
async for percent in observe_value(self.scan_percent):
yield WatcherUpdate(
name=self.name,
current=percent,
initial=0,
target=100,
unit="%",
precision=0,
time_elapsed=time.monotonic() - self._fly_start,
)
if percent >= 100:
break
await wait_for_value(self.status, False, timeout=self.scantime)

async def get_cs_info(self, motor: motor.Motor) -> tuple[str, int]:
output_link = await motor.output_link.get_value()
Expand All @@ -220,7 +204,7 @@ async def get_cs_info(self, motor: motor.Motor) -> tuple[str, int]:
def _calculate_gaps(self, chunk: Frames[motor.Motor]):
inds = np.argwhere(chunk.gap)
if len(inds) == 0:
return len(chunk)
return [len(chunk)]
else:
return inds

Expand All @@ -229,18 +213,18 @@ async def ramp_up_velocity_pos(
motor: motor.Motor,
start_velocity: float,
end_velocity: float,
ramp_time: int = None,
min_ramp_time: int = None,
ramp_time: float = 0,
min_ramp_time: float = 0,
):
# For the given motor return the displacement and time taken to get from one given
# velocity to another
acceleration_time = await motor.acceleration_time.get_value()
max_velocity = await motor.max_velocity.get_value()
accl = max_velocity / acceleration_time
delta_v = abs(end_velocity - start_velocity)
if ramp_time is None:
if ramp_time == 0:
ramp_time = delta_v / accl
if min_ramp_time is not None:
if min_ramp_time:
ramp_time = max(ramp_time, min_ramp_time)
disp = 0.5 * (start_velocity + end_velocity) * ramp_time
return [disp, ramp_time]
Expand Down Expand Up @@ -380,8 +364,8 @@ async def profile_between_points(


async def point_velocities(
chunk: Frames[motor.Motor], index: int, entry: bool = True
) -> dict[str, float]:
chunk: Frames[Any], index: int, entry: bool = True
) -> dict[motor.Motor, float]:
"""Find the velocities of each axis over the entry/exit of current point"""
velocities = {}
for axis in chunk.axes():
Expand Down Expand Up @@ -419,7 +403,11 @@ async def calculate_profile_from_velocities(
time_arrays: dict[motor.Motor, npt.NDArray[np.float64]],
velocity_arrays: dict[motor.Motor, npt.NDArray[np.float64]],
current_positions: dict[motor.Motor, npt.NDArray[np.float64]],
) -> list[dict[motor.Motor, npt.NDArray[np.float64]]]:
) -> tuple[
dict[motor.Motor, npt.NDArray[np.float64]],
dict[motor.Motor, npt.NDArray[np.float64]],
list[int],
]:
# at this point we have time/velocity arrays with 2-4 values for each
# axis. Each time represents a (instantaneous) change in acceleration.
# We want to translate this into a move profile (time/position).
Expand Down

0 comments on commit 79d1086

Please sign in to comment.