Skip to content

Commit

Permalink
moved all the dataset functionality into _hdf_ioc.py
Browse files Browse the repository at this point in the history
  • Loading branch information
evalott100 committed Jun 14, 2024
1 parent 2aa4d46 commit 3d115b0
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 102 deletions.
65 changes: 58 additions & 7 deletions src/pandablocks_ioc/_hdf_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from asyncio import CancelledError
from collections import deque
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from importlib.util import find_spec
from pathlib import Path
Expand All @@ -23,6 +23,7 @@
from softioc.pythonSoftIoc import RecordWrapper

from ._pvi import PviGroup, add_automatic_pvi_info, add_data_capture_pvi_info
from ._tables import ReadOnlyPvaTable
from ._types import ONAM_STR, ZNAM_STR, EpicsName

HDFReceived = Union[ReadyData, StartData, FrameData, EndData]
Expand Down Expand Up @@ -310,6 +311,55 @@ def handle_data(self, data: HDFReceived):
)


@dataclass
class Dataset:
name: str
capture: str


class DatasetNameCache:
def __init__(self, datasets: Dict[str, Dataset], datasets_record_name: EpicsName):
self.datasets = datasets

self._datasets_table_record = ReadOnlyPvaTable(
datasets_record_name, ["Name", "Type"]
)
self._datasets_table_record.set_rows(
["Name", "Type"], [[], []], length=300, default_data_type=str
)

def hdf_writer_names(self):
"""Formats the current dataset names for use in the HDFWriter"""

hdf_names: Dict[str, Dict[str, str]] = {}
for record_name, dataset in self.datasets.items():
if not dataset.name or dataset.capture == "No":
continue

field_name = record_name.replace(":", ".")

hdf_names[field_name] = hdf_name = {}

hdf_name[dataset.capture.split(" ")[-1]] = dataset.name
# Suffix -min and -max if both are present
if "Min Max" in dataset.capture:
hdf_name["Min"] = f"{dataset.name}-min"
hdf_name["Max"] = f"{dataset.name}-min"
return hdf_names

def update_datasets_record(self):
dataset_name_list = [
dataset.name
for dataset in self.datasets.values()
if dataset.name and dataset.capture != "No"
]
self._datasets_table_record.update_row("Name", dataset_name_list)
self._datasets_table_record.update_row(
"Type",
["float64"] * len(dataset_name_list),
)


class HDF5RecordController:
"""Class to create and control the records that handle HDF5 processing"""

Expand All @@ -334,17 +384,18 @@ class HDF5RecordController:
def __init__(
self,
client: AsyncioClient,
dataset_name_cache: Dict[str, Dict[str, str]],
datasets_record_updater: Callable,
dataset_cache: Dict[str, Dataset],
record_prefix: str,
):
if find_spec("h5py") is None:
logging.warning("No HDF5 support detected - skipping creating HDF5 records")
return

self._client = client
self.dataset_name_cache = dataset_name_cache
self.datasets_record_updater = datasets_record_updater
_datasets_record_name = EpicsName(
HDF5RecordController.DATA_PREFIX + ":DATASETS"
)
self._datasets = DatasetNameCache(dataset_cache, _datasets_record_name)

path_length = os.pathconf("/", "PC_PATH_MAX")
filename_length = os.pathconf("/", "PC_NAME_MAX")
Expand Down Expand Up @@ -662,7 +713,7 @@ async def _handle_hdf5_data(self) -> None:

# Update `DATA:DATASETS` to match the names of the datasets
# in the HDF5 file
self.datasets_record_updater()
self._datasets.update_datasets_record()

buffer = HDF5Buffer(
capture_mode,
Expand All @@ -671,7 +722,7 @@ async def _handle_hdf5_data(self) -> None:
self._status_message_record.set,
self._num_received_record.set,
number_captured_setter_pipeline,
deepcopy(self.dataset_name_cache),
self._datasets.hdf_writer_names(),
)
flush_period: float = self._flush_period_record.get()
async for data in self._client.data(
Expand Down
53 changes: 27 additions & 26 deletions src/pandablocks_ioc/_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,38 +114,39 @@ def __init__(
},
)

def add_row(
def set_rows(
self,
row_name: str,
initial_value: List,
datatype: Type = str,
row_names: List[str],
initial_values: List[List],
length: Optional[int] = None,
default_data_type: Optional[Type] = None,
):
full_name = EpicsName(self.epics_table_name + ":" + row_name)
pva_row_name = row_name.replace(":", "_").lower()
length = length or len(initial_value)
initial_value_np = np.array(initial_value, dtype=datatype)

field_record: RecordWrapper = builder.WaveformIn(
full_name,
DESC="", # Description not provided yet
initial_value=initial_value_np,
length=length,
)
for idx, (row_name, initial_value) in enumerate(zip(row_names, initial_values)):
full_name = EpicsName(self.epics_table_name + ":" + row_name)
pva_row_name = row_name.replace(":", "_").lower()
dtype = type(initial_value[0]) if initial_value else default_data_type
initial_value_np = np.array(initial_value, dtype=dtype)

field_pva_info = {
"+type": "plain",
"+channel": "VAL",
"+trigger": "",
}
field_record: RecordWrapper = builder.WaveformIn(
full_name,
DESC="", # Description not provided yet
initial_value=initial_value_np,
length=length or len(initial_value),
)

pva_info = {f"value.{pva_row_name.lower()}": field_pva_info}
field_pva_info = {
"+type": "plain",
"+channel": "VAL",
"+trigger": "*" if idx == len(row_names) - 1 else "",
}

field_record.add_info(
"Q:group",
{self.pva_table_name: pva_info},
)
self.rows[row_name] = field_record
pva_info = {f"value.{pva_row_name.lower()}": field_pva_info}

field_record.add_info(
"Q:group",
{self.pva_table_name: pva_info},
)
self.rows[row_name] = field_record

def update_row(self, row_name: str, new_value: List):
new_value_np = np.array(new_value)
Expand Down
86 changes: 21 additions & 65 deletions src/pandablocks_ioc/ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
from softioc.pythonSoftIoc import RecordWrapper

from ._connection_status import ConnectionStatus, Statuses
from ._hdf_ioc import HDF5RecordController
from ._hdf_ioc import Dataset, HDF5RecordController
from ._pvi import (
Pvi,
PviGroup,
add_automatic_pvi_info,
add_pcap_arm_pvi_info,
add_positions_table_row,
)
from ._tables import ReadOnlyPvaTable, TableRecordWrapper, TableUpdater
from ._tables import TableRecordWrapper, TableUpdater
from ._types import (
ONAM_STR,
OUT_RECORD_FUNCTIONS,
Expand Down Expand Up @@ -504,48 +504,6 @@ def validate(self, record: RecordWrapper, new_val: str):
return False


class DatasetNameCache:
def __init__(self):
# A dictionary of record name to capture type to HDF dataset name
# e.g {"COUNTER1.Out": {"Max": "SOME_OTHER_DATASET_NAME"}}
self.cache: Dict[str, Dict[str, str]] = {}
self._record_name_to_dataset_name: Dict[EpicsName, str] = {}

_datasets_record_name = EpicsName(
HDF5RecordController.DATA_PREFIX + ":DATASETS"
)
self._datasets_table = ReadOnlyPvaTable(_datasets_record_name, ["Name", "Type"])
self._datasets_table.add_row("Name", [], datatype=str, length=300)
self._datasets_table.add_row("Type", [], datatype=str, length=300)

def update_cache(
self, record_name: EpicsName, dataset_name: str, capture_mode: str
):
field_name = record_name.replace(":", ".")
# Throw away all the old settings
self.cache[field_name] = capture_names = {}
# Only consider explicitly named datsets
if dataset_name and capture_mode != "No":
# Keep a reference to just the dataset name for `DATA:DATASETS`
self._record_name_to_dataset_name[record_name] = dataset_name
capture_names[capture_mode.split(" ")[-1]] = dataset_name
# Suffix -min and -max if both are present
if "Min Max" in capture_mode:
capture_names["Min"] = f"{dataset_name}-min"
capture_names["Max"] = f"{dataset_name}-min"

else:
self._record_name_to_dataset_name.pop(record_name, None)

def update_dataset_name_to_type(self):
dataset_name_list = list(self._record_name_to_dataset_name.values())
self._datasets_table.update_row("Name", dataset_name_list)
self._datasets_table.update_row(
"Type",
["float64"] * len(dataset_name_list),
)


class IocRecordFactory:
"""Class to handle creating PythonSoftIOC records for a given field defined in
a PandA"""
Expand Down Expand Up @@ -585,7 +543,7 @@ def __init__(
# All records should be blocking
builder.SetBlocking(True)

self._dataset_name_cache = DatasetNameCache()
self._dataset_cache: Dict[str, Dataset] = {}

def _process_labels(
self, labels: List[str], record_value: ScalarRecordValue
Expand Down Expand Up @@ -915,8 +873,7 @@ def _make_pos_out(
capture_record_updater: _RecordUpdater

def capture_record_on_update(new_capture_mode):
self._dataset_name_cache.update_cache(
record_name,
self._dataset_cache[record_name] = Dataset(
record_dict[dataset_record_name].record.get(),
labels[new_capture_mode],
)
Expand Down Expand Up @@ -944,20 +901,20 @@ def capture_record_on_update(new_capture_mode):
labels if labels else None,
)

def dataset_record_on_update(new_dataset_name):
self._dataset_cache[record_name] = Dataset(
new_dataset_name,
labels[record_dict[capture_record_name].record.get()],
)

record_dict[dataset_record_name] = self._create_record_info(
dataset_record_name,
"Used to adjust the dataset name to one more scientifically relevant",
builder.stringOut,
str,
PviGroup.CAPTURE,
initial_value="",
on_update=lambda new_dataset_name: (
self._dataset_name_cache.update_cache(
record_name,
new_dataset_name,
labels[record_dict[capture_record_name].record.get()],
)
),
on_update=dataset_record_on_update,
)

offset_record_name = EpicsName(record_name + ":OFFSET")
Expand Down Expand Up @@ -1081,27 +1038,27 @@ def _make_ext_out(
labels, capture_index = self._process_labels(
field_info.capture_labels, values[capture_record_name]
)

def dataset_record_on_update(new_dataset_name):
self._dataset_cache[record_name] = Dataset(
new_dataset_name,
labels[record_dict[capture_record_name].record.get()],
)

record_dict[dataset_record_name] = self._create_record_info(
dataset_record_name,
"Used to adjust the dataset name to one more scientifically relevant",
builder.stringOut,
str,
PviGroup.OUTPUTS,
initial_value="",
on_update=lambda new_dataset_name: (
self._dataset_name_cache.update_cache(
record_name,
new_dataset_name,
labels[record_dict[capture_record_name].record.get()],
)
),
on_update=dataset_record_on_update,
)

capture_record_updater: _RecordUpdater

def capture_record_on_update(new_capture_mode):
self._dataset_name_cache.update_cache(
record_name,
self._dataset_cache[record_name] = Dataset(
record_dict[dataset_record_name].record.get(),
labels[new_capture_mode],
)
Expand Down Expand Up @@ -1890,8 +1847,7 @@ def create_block_records(

HDF5RecordController(
self._client,
self._dataset_name_cache.cache,
self._dataset_name_cache.update_dataset_name_to_type,
self._dataset_cache,
self._record_prefix,
)

Expand Down
8 changes: 4 additions & 4 deletions tests/test_hdf_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from pandablocks_ioc._hdf_ioc import (
CaptureMode,
Dataset,
HDF5Buffer,
HDF5RecordController,
NumCapturedSetter,
Expand Down Expand Up @@ -229,12 +230,11 @@ async def hdf5_controller(
test_prefix, hdf5_test_prefix = new_random_hdf5_prefix

dataset_name_cache = {
"COUNTER1.OUT": {"Value": "some_other_dataset_name"},
# these datasets haven't been overwritten, they should be the default
"COUNTER1:OUT": Dataset("some_other_dataset_name", "Value"),
}

hdf5_controller = HDF5RecordController(
AsyncioClient("localhost"), dataset_name_cache, lambda: None, test_prefix
AsyncioClient("localhost"), dataset_name_cache, test_prefix
)

# When using tests w/o CA, need to manually set _directory_exists to 1
Expand All @@ -254,7 +254,7 @@ def subprocess_func(
async def wrapper():
builder.SetDeviceName(namespace_prefix)
client = MockedAsyncioClient(standard_responses)
HDF5RecordController(client, {}, lambda: None, namespace_prefix)
HDF5RecordController(client, {}, namespace_prefix)
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
builder.LoadDatabase()
softioc.iocInit(dispatcher)
Expand Down

0 comments on commit 3d115b0

Please sign in to comment.