From bbe220db44f71e301647068973c27cbfdd1aafb8 Mon Sep 17 00:00:00 2001 From: Alexander Clausen Date: Tue, 16 Aug 2022 14:42:28 +0200 Subject: [PATCH] Misc. enhancements, fixes, and cleanups - Added `libertem_dectris.headers` submodule that exports header classes - Added constructors for `libertem_dectris.Frame` and `libertem_dectris.FrameStack` objects from Python, mostly useful for testing - Added binding to random port for the simulator - Properly parametrize with zmq endpoint URI - Fix many clippy complaints --- Cargo.lock | 64 ++++++++++ Cargo.toml | 1 + README.md | 37 +++++- examples/testchunked.py | 2 +- examples/testflame.py | 2 +- examples/testserialize.py | 2 +- examples/testtooslow.py | 2 +- examples/testuser.py | 2 +- src/common.rs | 259 +++++++++++++++++++++++++++++++++----- src/dectris_py.rs | 245 ++++++++++++++++++++++++------------ src/main.rs | 28 ++--- 11 files changed, 506 insertions(+), 138 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93f5d4a..79b6eb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,6 +175,17 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" +[[package]] +name = "getrandom" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -239,6 +250,7 @@ dependencies = [ "serde", "serde_json", "spin_sleep", + "uuid", "zmq", ] @@ -331,6 +343,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -443,6 +461,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -567,12 +615,28 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58ee9362deb4a96cef4d437d1ad49cffc9b9e92d202b6995674e928ce684f112" +[[package]] +name = "uuid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +dependencies = [ + "getrandom", + "rand", +] + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 549c36e..c4691c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ pyo3-log = "0.6.0" serde = { version = "1.0.143", features = ["derive"] } serde_json = "1.0.83" spin_sleep = "1.1.1" +uuid = { version = "1.1.2", features = ["v4", "fast-rng"] } zmq = { version = "0.9.2", features = ["vendored"] } [profile.release] diff --git a/README.md b/README.md index 3d3be02..e87beff 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ ec.setDetectorConfig('ntrigger', nimages) result = ec.sendDetectorCommand('arm') sequence_id = result['sequence id'] -frames = libertem_dectris.FrameChunkedIterator() +frames = libertem_dectris.FrameChunkedIterator(uri="tcp://localhost:9999") # start to receive data for the given series # (can be called multiple times on the same `FrameChunkedIterator` instance) frames.start(series=sequence_id) @@ -48,3 +48,38 @@ try: finally: frames.close() # clean up background thread etc. ``` + +## Changelog + +### v0.2.0 (unreleased) + +- Added `libertem_dectris.headers` submodule that exports header classes +- Added ways to create `libertem_dectris.Frame` and `libertem_dectris.FrameStack` + objects from Python, mostly useful for testing +- Added binding to random port for the simulator +- Properly parametrize with zmq endpoint URI +- Fix many clippy complaints + +### v0.1.0 + +Initial release! + +## Development + +This package is using [pyo3](https://pyo3.rs/) with +[maturin](https://maturin.rs/) to create the Python bindings. First, make sure +`maturin` is installed in your Python environment: + +```bash +(venv) $ pip install maturin +``` + +Then, after each change to the rust code, run `maturin develop -r` to build and +install a new version of the wheel. + +## Release + +- update changelog above +- bump version in Cargo.toml if not already bumped, and push +- create a release from the GitHub UI, creating a new tag vX.Y.Z +- done! diff --git a/examples/testchunked.py b/examples/testchunked.py index a95afd4..53e2a6b 100644 --- a/examples/testchunked.py +++ b/examples/testchunked.py @@ -7,7 +7,7 @@ @click.command() @click.argument('nimages', type=int) def main(nimages: int): - frames = libertem_dectris.FrameChunkedIterator() + frames = libertem_dectris.FrameChunkedIterator(uri="tcp://127.0.0.1:9999") ec = DEigerClient('localhost', 8910) diff --git a/examples/testflame.py b/examples/testflame.py index d9b81b4..e40c6c0 100644 --- a/examples/testflame.py +++ b/examples/testflame.py @@ -5,7 +5,7 @@ if __name__ == "__main__": ec = DEigerClient("localhost", 8910) - frames = libertem_dectris.FrameChunkedIterator() + frames = libertem_dectris.FrameChunkedIterator(uri="tcp://127.0.0.1:9999") frames.start() diff --git a/examples/testserialize.py b/examples/testserialize.py index d80bfd6..1d10586 100644 --- a/examples/testserialize.py +++ b/examples/testserialize.py @@ -1,7 +1,7 @@ import libertem_dectris if __name__ == "__main__": - frames = libertem_dectris.FrameChunkedIterator() + frames = libertem_dectris.FrameChunkedIterator(uri="tcp://127.0.0.1:9999") frames.start() frame_stack = frames.get_next_stack(max_size=32) diff --git a/examples/testtooslow.py b/examples/testtooslow.py index d6ad5e9..c6d1790 100644 --- a/examples/testtooslow.py +++ b/examples/testtooslow.py @@ -2,7 +2,7 @@ import libertem_dectris if __name__ == "__main__": - frames = libertem_dectris.FrameIterator() + frames = libertem_dectris.FrameIterator(uri="tcp://127.0.0.1:9999") frames.start(series=14) diff --git a/examples/testuser.py b/examples/testuser.py index 2c96974..063bbe6 100644 --- a/examples/testuser.py +++ b/examples/testuser.py @@ -6,7 +6,7 @@ @click.command() @click.argument('nimages', type=int) def main(nimages: int): - frames = libertem_dectris.FrameIterator() + frames = libertem_dectris.FrameIterator(uri="tcp://127.0.0.1:9999") ec = DEigerClient('localhost', 8910) diff --git a/src/common.rs b/src/common.rs index d36e9a1..48acc5e 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,19 +1,36 @@ +#![allow(clippy::borrow_deref_ref)] + use std::fs; +use log::{debug, info}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use pyo3::prelude::*; use serde_json::json; -use zmq::{Context, Socket, SocketType::PUSH}; +use uuid::Uuid; +use zmq::{Context, Message, Socket, SocketEvent, SocketType::PUSH}; #[derive(Serialize, Deserialize, Debug, Clone)] +#[pyclass] pub struct DHeader { pub htype: String, pub header_detail: String, pub series: u64, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[pymethods] +impl DHeader { + #[new] + fn new(series: u64) -> Self { + DHeader { + htype: "dheader-1.0".to_string(), + header_detail: "basic".to_string(), + series, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[pyclass] pub enum TriggerMode { #[serde(rename = "exte")] @@ -54,7 +71,8 @@ impl DetectorConfig { } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[pyclass] pub struct DImage { pub htype: String, @@ -68,9 +86,21 @@ pub struct DImage { pub hash: String, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[pymethods] +impl DImage { + #[new] + fn new(frame: u64, series: u64, hash: &str) -> Self { + DImage { + htype: "dimage-1.0".to_string(), + series, + frame, + hash: hash.to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[pyclass] -#[derive(PartialEq)] pub enum PixelType { #[serde(rename = "uint8")] Uint8, @@ -80,7 +110,8 @@ pub enum PixelType { Uint32, } -#[derive(PartialEq, Serialize, Deserialize, Debug, Clone)] +#[derive(PartialEq, Eq, Serialize, Deserialize, Debug, Clone)] +#[pyclass] pub struct DImageD { pub htype: String, pub shape: Vec, @@ -89,23 +120,62 @@ pub struct DImageD { pub encoding: String, // [bs][[-]lz4][<|>] } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[pymethods] +impl DImageD { + #[new] + fn new(shape: Vec, type_: PixelType, encoding: &str) -> Self { + DImageD { + htype: "dimage_d-1.0".to_string(), + shape, + type_, + encoding: encoding.to_string(), + } + } +} + +/// "footer" sent for each frame. all times in nanoseconds +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[pyclass] pub struct DConfig { - pub htype: String, + pub htype: String, // constant dconfig-1.0 pub start_time: u64, pub stop_time: u64, pub real_time: u64, } +#[pymethods] +impl DConfig { + #[new] + fn new(start_time: u64, stop_time: u64, real_time: u64) -> Self { + DConfig { + htype: "dconfig-1.0".to_string(), + start_time, + stop_time, + real_time, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] +#[pyclass] pub struct DSeriesEnd { pub htype: String, pub series: u64, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[pymethods] +impl DSeriesEnd { + #[new] + fn new(series: u64) -> Self { + DSeriesEnd { + htype: "dseries_end-1.0".to_string(), + series, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[pyclass] -#[derive(PartialEq)] pub struct FrameData { pub dimage: DImage, pub dimaged: DImageD, @@ -156,8 +226,7 @@ impl DumpRecordFile { } pub fn read_size(&self, offset: usize) -> usize { - let size = i64::from_le_bytes(self.mmap[offset..offset + 8].try_into().unwrap()) as usize; - size + i64::from_le_bytes(self.mmap[offset..offset + 8].try_into().unwrap()) as usize } pub fn read_msg_raw(&self, offset: usize) -> (&[u8], usize) { @@ -188,18 +257,23 @@ impl DumpRecordFile { current_offset += size + 8; } - return None; + None } pub fn get_cursor(&self) -> RecordCursor { - return RecordCursor::new(self); + RecordCursor::new(self) } fn get_size(&self) -> usize { - return self.mmap.len(); + self.mmap.len() } } +pub struct CursorPos { + pub current_offset: usize, + pub current_msg_index: usize, +} + pub struct RecordCursor { file: DumpRecordFile, current_offset: usize, @@ -217,6 +291,18 @@ impl RecordCursor { } } + pub fn set_pos(&mut self, pos: CursorPos) { + self.current_msg_index = pos.current_msg_index; + self.current_offset = pos.current_offset; + } + + pub fn get_pos(&self) -> CursorPos { + CursorPos { + current_offset: self.current_offset, + current_msg_index: self.current_msg_index, + } + } + /// seek such that `index` is the next message that will be read pub fn seek_to_msg_idx(&mut self, index: usize) { self.current_offset = 0; @@ -238,23 +324,23 @@ impl RecordCursor { let (msg, size) = self.file.read_msg_raw(self.current_offset); self.current_offset += size + 8; self.current_msg_index += 1; - return msg; + msg } - pub fn read_and_deserialize<'de, T>(&mut self) -> Result + pub fn read_and_deserialize(&mut self) -> Result where T: DeserializeOwned, { - let msg = self.read_raw_msg().clone(); - return serde_json::from_slice::(&msg); + let msg = self.read_raw_msg(); + serde_json::from_slice::(msg) } pub fn peek_size(&self) -> usize { - return self.file.read_size(self.current_offset); + self.file.read_size(self.current_offset) } pub fn is_at_end(&self) -> bool { - return self.current_offset == self.file.get_size(); + self.current_offset == self.file.get_size() } pub fn get_msg_idx(&self) -> usize { @@ -277,26 +363,83 @@ impl From for SendError { } } +fn monitor_thread(ctx: Context, endpoint: &str, name: &str) { + let socket = ctx.socket(zmq::PAIR).unwrap(); + socket.connect(endpoint).unwrap(); + + let mut msg: Message = Message::new(); + + loop { + // two parts: + // first part: "number and value" + socket.recv(&mut msg, 0).unwrap(); + + let event = u16::from_ne_bytes(msg[0..2].try_into().unwrap()); + let socket_event: SocketEvent = SocketEvent::from_raw(event); + + // second part: affected endpoint as string + socket.recv(&mut msg, 0).unwrap(); + + let endpoint = String::from_utf8_lossy(&msg); + + info!("monitoring {name}: {socket_event:?} @ {endpoint}"); + + if socket_event == SocketEvent::MONITOR_STOPPED { + break; + } + } +} + +pub fn setup_monitor(ctx: Context, name: String, socket: &Socket) { + // set up monitoring: + let monitor_uuid = Uuid::new_v4(); + let monitor_endpoint = format!("inproc://monitor-{monitor_uuid}"); + socket + .monitor(&monitor_endpoint, zmq::SocketEvent::ALL as i32) + .unwrap(); + + std::thread::Builder::new() + .name(format!("sender-monitor-{monitor_uuid}")) + .spawn(move || { + monitor_thread(ctx, &monitor_endpoint, &name); + }) + .expect("should be able to start monitor thread"); +} + pub struct FrameSender { socket: Socket, cursor: RecordCursor, detector_config: DetectorConfig, series: u64, nimages: u64, + uri: String, } impl FrameSender { - pub fn new(uri: &str, filename: &str) -> Self { + pub fn new(uri: &str, filename: &str, random_port: bool) -> Self { let ctx = Context::new(); let socket = ctx .socket(PUSH) .expect("context should be able to create a socket"); - socket - .bind(uri) - .expect("should be possible to bind the zmq socket"); + + if random_port { + let new_uri = format!("{uri}:*"); + socket.bind(&new_uri).unwrap_or_else(|_| { + panic!("should be possible to bind the zmq socket at {new_uri}") + }); + } else { + socket + .bind(uri) + .unwrap_or_else(|_| panic!("should be possible to bind the zmq socket at {uri}")); + } + + setup_monitor(ctx, "FrameSender".to_string(), &socket); + + let canonical_uri = socket.get_last_endpoint().unwrap().unwrap(); + socket .set_sndhwm(4 * 256) - .expect("should be possible to set sndhwn"); + .expect("should be possible to set sndhwm"); let file = DumpRecordFile::new(filename); @@ -305,13 +448,13 @@ impl FrameSender { cursor.seek_to_first_header_of_type("dheader-1.0"); let dheader_raw = cursor.read_raw_msg(); - let dheader: DHeader = serde_json::from_slice(&dheader_raw) + let dheader: DHeader = serde_json::from_slice(dheader_raw) .expect("json should match our serialization schema"); - println!("{dheader:?}"); + debug!("{dheader:?}"); let detector_config: DetectorConfig = cursor.read_and_deserialize().unwrap(); - println!("{detector_config:?}"); + debug!("{detector_config:?}"); let nimages = detector_config.get_num_images(); let series = dheader.series; @@ -322,9 +465,14 @@ impl FrameSender { series, nimages, detector_config, + uri: canonical_uri, } } + pub fn get_uri(&self) -> &str { + &self.uri + } + pub fn get_detector_config(&self) -> &DetectorConfig { &self.detector_config } @@ -366,23 +514,66 @@ impl FrameSender { Ok(()) } - fn send_msg_at_cursor(&mut self) { + /// Send the message from the current cursor position. + /// If a timeout occurs, the cursor is rewound to the old + /// position and a retry can be attempted + fn send_msg_at_cursor(&mut self) -> Result<(), SendError> { let socket = &self.socket; let cursor = &mut self.cursor; + let old_pos = cursor.get_pos(); + let m = cursor.read_raw_msg(); - socket.send(m, 0).unwrap(); + match socket.send(m, 0) { + Ok(_) => {} + Err(zmq::Error::EAGAIN) => { + cursor.set_pos(old_pos); + return Err(SendError::Timeout); + } + Err(_) => return Err(SendError::Other), + } + + Ok(()) } - pub fn send_headers(&mut self) { + fn send_msg_at_cursor_retry(&mut self, callback: &CB) -> Result<(), SendError> + where + CB: Fn() -> Option<()>, + { + loop { + match self.send_msg_at_cursor() { + Ok(_) => return Ok(()), + Err(SendError::Timeout) => { + if let Some(()) = callback() { + continue; + } else { + return Err(SendError::Timeout); + } + } + e @ Err(_) => return e, + } + } + } + + pub fn send_headers(&mut self, idle_callback: CB) -> Result<(), SendError> + where + CB: Fn() -> Option<()>, + { + // milliseconds + self.socket.set_sndtimeo(100)?; + let cursor = &mut self.cursor; cursor.seek_to_first_header_of_type("dheader-1.0"); // dheader - self.send_msg_at_cursor(); + self.send_msg_at_cursor_retry(&idle_callback)?; // detector config - self.send_msg_at_cursor(); + self.send_msg_at_cursor_retry(&idle_callback)?; + + self.socket.set_sndtimeo(-1)?; + + Ok(()) } pub fn send_frames(&mut self) { diff --git a/src/dectris_py.rs b/src/dectris_py.rs index f383798..647851a 100644 --- a/src/dectris_py.rs +++ b/src/dectris_py.rs @@ -1,3 +1,5 @@ +#![allow(clippy::borrow_deref_ref)] + use std::{ convert::Infallible, fmt::Display, @@ -6,8 +8,8 @@ use std::{ }; use crate::common::{ - self, DConfig, DHeader, DImage, DImageD, DSeriesEnd, DetectorConfig, FrameData, FrameSender, - PixelType, TriggerMode, + self, setup_monitor, DConfig, DHeader, DImage, DImageD, DSeriesEnd, DetectorConfig, FrameData, + FrameSender, PixelType, TriggerMode, }; use bincode::serialize; @@ -26,15 +28,28 @@ fn libertem_dectris(py: Python, m: &PyModule) -> PyResult<()> { // the GIL // pyo3_log::init(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); - m.add_class::().unwrap(); + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add("TimeoutError", py.get_type::())?; + + register_header_module(py, m)?; + Ok(()) +} + +fn register_header_module(py: Python<'_>, parent_module: &PyModule) -> PyResult<()> { + let headers_module = PyModule::new(py, "headers")?; + headers_module.add_class::()?; + headers_module.add_class::()?; + headers_module.add_class::()?; + headers_module.add_class::()?; + headers_module.add_class::()?; + parent_module.add_submodule(headers_module)?; Ok(()) } @@ -50,25 +65,33 @@ impl Frame { frame: frame.clone(), } } - - fn with_data(frame: FrameData) -> Self { - Frame { frame } - } } #[pymethods] impl Frame { + #[new] + fn new(data: &PyBytes, dimage: &DImage, dimaged: &DImageD, dconfig: &DConfig) -> Self { + let frame_data: FrameData = FrameData { + dimage: dimage.clone(), + dimaged: dimaged.clone(), + image_data: data.as_bytes().into(), + dconfig: dconfig.clone(), + }; + + Frame { frame: frame_data } + } + fn __repr__(&self) -> String { let frame = &self.frame; let series = frame.dimage.series; let shape = &frame.dimaged.shape; let idx = frame.dimage.frame; - return format!(""); + format!("") } fn get_image_data(slf: PyRef, py: Python) -> Py { - let bytes: &PyBytes = PyBytes::new(py, &slf.frame.image_data.as_slice()); - return bytes.into(); + let bytes: &PyBytes = PyBytes::new(py, slf.frame.image_data.as_slice()); + bytes.into() } fn get_series_id(slf: PyRef) -> u64 { @@ -84,35 +107,47 @@ impl Frame { } fn get_pixel_type(slf: PyRef) -> String { - return match &slf.frame.dimaged.type_ { + match &slf.frame.dimaged.type_ { PixelType::Uint8 => "uint8".to_string(), PixelType::Uint16 => "uint16".to_string(), PixelType::Uint32 => "uint32".to_string(), - }; + } } fn get_encoding(slf: PyRef) -> String { slf.frame.dimaged.encoding.clone() } + /// return endianess in numpy notation + fn get_endianess(slf: PyRef) -> String { + let last_char = slf.frame.dimaged.encoding.chars().last(); + last_char.expect("encoding should be non-empty").into() + } + fn get_shape(slf: PyRef) -> Vec { slf.frame.dimaged.shape.clone() } } +impl From for FrameData { + fn from(frame: Frame) -> Self { + frame.frame + } +} + pub enum ControlMsg { StopThread, StartAcquisition { series: u64 }, } -#[derive(PartialEq)] +#[derive(PartialEq, Eq)] pub enum ResultMsg { Error { msg: String }, // generic error response, might need to specialize later Frame { frame: FrameData }, End, } -#[derive(PartialEq)] +#[derive(PartialEq, Eq)] pub enum ReceiverStatus { Idle, Running, @@ -184,7 +219,20 @@ enum AcquisitionError { impl Display for AcquisitionError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "acquisition has stopped") + match self { + AcquisitionError::ZmqError { err } => { + write!(f, "zmq error {err}") + } + AcquisitionError::Cancelled => { + write!(f, "acquisition cancelled") + } + AcquisitionError::SeriesMismatch => { + write!(f, "series mismatch") + } + AcquisitionError::Disconnected => { + write!(f, "other end has disconnected") + } + } } } @@ -193,12 +241,8 @@ fn check_for_control(control_channel: &Receiver) -> Result<(), Acqui Ok(ControlMsg::StartAcquisition { series: _ }) => { panic!("received StartAcquisition while an acquisition was already running"); } - Ok(ControlMsg::StopThread) => { - return Err(AcquisitionError::Cancelled); - } - Err(TryRecvError::Disconnected) => { - return Err(AcquisitionError::Cancelled); - } + Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled), + Err(TryRecvError::Disconnected) => Err(AcquisitionError::Cancelled), Err(TryRecvError::Empty) => Ok(()), } } @@ -215,10 +259,10 @@ fn acquisition( loop { if last_control_check.elapsed() > Duration::from_millis(300) { last_control_check = Instant::now(); - check_for_control(&to_thread_r)?; + check_for_control(to_thread_r)?; } - let frame = recv_frame(&socket, &to_thread_r)?; + let frame = recv_frame(socket, to_thread_r)?; if frame.dimage.series != series { return Err(AcquisitionError::SeriesMismatch); @@ -254,43 +298,46 @@ fn acquisition( } /// convert `AcquisitionError`s to messages on `from_threads_s` -fn background_thread_wrap(to_thread_r: &Receiver, from_thread_s: &Sender) { - match background_thread(to_thread_r, from_thread_s) { - Ok(()) => {} - Err(err) => { - from_thread_s - .send(ResultMsg::Error { - msg: err.to_string(), - }) - .unwrap(); - return; - } +fn background_thread_wrap( + to_thread_r: &Receiver, + from_thread_s: &Sender, + uri: String, +) { + if let Err(err) = background_thread(to_thread_r, from_thread_s, uri) { + from_thread_s + .send(ResultMsg::Error { + msg: err.to_string(), + }) + .unwrap(); } } fn background_thread( to_thread_r: &Receiver, from_thread_s: &Sender, + uri: String, ) -> Result<(), AcquisitionError> { let ctx = zmq::Context::new(); let socket = ctx.socket(zmq::PULL).unwrap(); socket.set_rcvtimeo(1000).unwrap(); - socket.connect("tcp://127.0.0.1:9999").unwrap(); + socket.connect(&uri).unwrap(); socket.set_rcvhwm(4 * 256).unwrap(); + setup_monitor(ctx, "DectrisReceiver".to_string(), &socket); + loop { // control: main threads tells us to quit let control = to_thread_r.recv_timeout(Duration::from_millis(100)); match control { Ok(ControlMsg::StartAcquisition { series }) => { let mut msg: Message = Message::new(); - recv_part(&mut msg, &socket, &to_thread_r)?; + recv_part(&mut msg, &socket, to_thread_r)?; // panic in case of any other header type: let dheader: DHeader = serde_json::from_str(msg.as_str().unwrap()).unwrap(); debug!("dheader: {dheader:?}"); // second message: the header itself - recv_part(&mut msg, &socket, &to_thread_r)?; + recv_part(&mut msg, &socket, to_thread_r)?; let detector_config: DetectorConfig = serde_json::from_str(msg.as_str().unwrap()).unwrap(); @@ -328,17 +375,20 @@ impl Display for ReceiverError { } impl DectrisReceiver { - pub fn new() -> Self { + pub fn new(uri: &str) -> Self { let (to_thread_s, to_thread_r) = unbounded(); let (from_thread_s, from_thread_r) = unbounded(); let builder = std::thread::Builder::new(); + let uri = uri.to_string(); DectrisReceiver { bg_thread: Some( builder .name("bg_thread".to_string()) - .spawn(move || background_thread_wrap(&to_thread_r, &from_thread_s)) + .spawn(move || { + background_thread_wrap(&to_thread_r, &from_thread_s, uri.to_string()) + }) .expect("failed to start background thread"), ), from_thread: from_thread_r, @@ -347,7 +397,7 @@ impl DectrisReceiver { } } - pub fn next(&mut self) -> ResultMsg { + pub fn recv(&mut self) -> ResultMsg { let result_msg = self .from_thread .recv() @@ -355,7 +405,7 @@ impl DectrisReceiver { if result_msg == ResultMsg::End { self.status = ReceiverStatus::Idle; } - return result_msg; + result_msg } pub fn next_timeout(&mut self, timeout: Duration) -> Option { @@ -366,13 +416,13 @@ impl DectrisReceiver { if result == ResultMsg::End { self.status = ReceiverStatus::Idle; } - return Some(result); + Some(result) } Err(e) => match e { RecvTimeoutError::Disconnected => { panic!("background thread should be running") } - RecvTimeoutError::Timeout => return None, + RecvTimeoutError::Timeout => None, }, } } @@ -403,6 +453,12 @@ impl DectrisReceiver { } } +impl Default for DectrisReceiver { + fn default() -> Self { + Self::new("tcp://127.0.0.1:9999") + } +} + #[pyclass] pub struct FrameIterator { receiver: DectrisReceiver, @@ -411,16 +467,16 @@ pub struct FrameIterator { #[pymethods] impl FrameIterator { #[new] - fn new() -> Self { + fn new(uri: &str) -> Self { FrameIterator { - receiver: DectrisReceiver::new(), + receiver: DectrisReceiver::new(uri), } } fn start(mut slf: PyRefMut, series: u64) -> PyResult<()> { slf.receiver .start(series) - .map_err(|err| return exceptions::PyRuntimeError::new_err(err.msg)) + .map_err(|err| exceptions::PyRuntimeError::new_err(err.msg)) } fn close(mut slf: PyRefMut) { @@ -493,15 +549,24 @@ impl FrameStack { FrameStack::empty() } + /// create a frame stack from a list of frames + /// NOTE: this probably doesn't perform well, and is only meant for testing + #[classmethod] + fn from_frame_list(_cls: &PyType, frames: Vec) -> Self { + FrameStack { + frames: frames.into_iter().map(|f| f.into()).collect(), + } + } + #[classmethod] fn deserialize(_cls: &PyType, serialized: &PyBytes) -> Self { let data = serialized.as_bytes(); - FrameStack::with_data(bincode::deserialize(&data).unwrap()) + FrameStack::with_data(bincode::deserialize(data).unwrap()) } fn serialize(slf: PyRef, py: Python) -> PyResult> { let bytes: &PyBytes = PyBytes::new(py, serialize(&slf.frames).unwrap().as_slice()); - return Ok(bytes.into()); + Ok(bytes.into()) } fn __len__(slf: PyRef) -> usize { @@ -525,16 +590,16 @@ struct FrameChunkedIterator { #[pymethods] impl FrameChunkedIterator { #[new] - fn new() -> Self { + fn new(uri: &str) -> Self { FrameChunkedIterator { - receiver: DectrisReceiver::new(), + receiver: DectrisReceiver::new(uri), } } fn start(mut slf: PyRefMut, series: u64) -> PyResult<()> { slf.receiver .start(series) - .map_err(|err| return exceptions::PyRuntimeError::new_err(err.msg)) + .map_err(|err| exceptions::PyRuntimeError::new_err(err.msg)) } fn close(mut slf: PyRefMut) { @@ -542,7 +607,7 @@ impl FrameChunkedIterator { } fn is_running(slf: PyRef) -> bool { - return slf.receiver.status == ReceiverStatus::Running; + slf.receiver.status == ReceiverStatus::Running } fn get_next_stack( @@ -568,7 +633,7 @@ impl FrameChunkedIterator { let recv_result = py.allow_threads(|| { let next: Result, Infallible> = Ok(recv.next_timeout(Duration::from_millis(100))); - return next; + next })?; match recv_result { @@ -608,19 +673,37 @@ struct DectrisSim { #[pymethods] impl DectrisSim { #[new] - fn new(uri: &str, filename: &str, dwelltime: Option) -> Self { - return DectrisSim { - frame_sender: FrameSender::new(uri, filename), + fn new(uri: &str, filename: &str, dwelltime: Option, random_port: bool) -> Self { + DectrisSim { + frame_sender: FrameSender::new(uri, filename, random_port), dwelltime, - }; + } + } + + fn get_uri(slf: PyRef) -> String { + slf.frame_sender.get_uri().to_string() } fn get_detector_config(slf: PyRef) -> DetectorConfig { slf.frame_sender.get_detector_config().clone() } - fn send_headers(mut slf: PyRefMut) { - slf.frame_sender.send_headers(); + fn send_headers(mut slf: PyRefMut, py: Python) -> PyResult<()> { + let sender = &mut slf.frame_sender; + py.allow_threads(|| { + if let Err(e) = sender.send_headers(|| { + let gil = Python::acquire_gil(); + if let Err(e) = gil.python().check_signals() { + eprintln!("got python error {e:?}, breaking"); + return None; + } + Some(()) + }) { + let msg = format!("failed to send headers: {e:?}"); + return Err(exceptions::PyRuntimeError::new_err(msg)); + } + Ok(()) + }) } /// send `nframes`, if given, or all frames in the acquisition, from the @@ -634,29 +717,25 @@ impl DectrisSim { Some(n) => n, }; + let dwelltime = &slf.dwelltime.clone(); + let sender = &mut slf.frame_sender; + for frame_idx in 0..effective_nframes { - // can't `allow_threads` because the zmq socket is not Send - //py.allow_threads(|| { - match slf.frame_sender.send_frame() { - Err(common::SendError::Timeout) => { - return Err(TimeoutError::new_err( - "timeout while sending frames".to_string(), - )); - } - Err(_) => { - return Err(exceptions::PyRuntimeError::new_err( - "error while sending frames".to_string(), - )); - } - Ok(_) => {} // Ok(()) - } - //})?; + py.allow_threads(|| match sender.send_frame() { + Err(common::SendError::Timeout) => Err(TimeoutError::new_err( + "timeout while sending frames".to_string(), + )), + Err(_) => Err(exceptions::PyRuntimeError::new_err( + "error while sending frames".to_string(), + )), + Ok(_) => Ok(()), + })?; // dwelltime // FIXME: for continuous mode, u64 might not be enough for elapsed time, // so maybe it's better to carry around a "budget" that can be negative // if a frame hasn't been sent out in time etc. - if let Some(dt) = slf.dwelltime { + if let Some(dt) = dwelltime { let elapsed_us = start_time.elapsed().as_micros() as u64; let target_time_us = (frame_idx + 1) * dt; if elapsed_us < target_time_us { diff --git a/src/main.rs b/src/main.rs index c1ae72b..cf6ada8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,7 +57,7 @@ fn action_cat(cli: &Cli, start_idx: usize, end_idx: usize) { while cursor.get_msg_idx() <= end_idx { let msg = cursor.read_raw_msg(); let length = (msg.len() as i64).to_le_bytes(); - std::io::stdout().write(&length).unwrap(); + std::io::stdout().write_all(&length).unwrap(); std::io::stdout().write_all(msg).unwrap(); } } @@ -102,19 +102,19 @@ fn get_msg_type(maybe_value: &Option) -> String { } fn get_summary(filename: &str) -> HashMap { - let file = DumpRecordFile::new(&filename); + let file = DumpRecordFile::new(filename); let mut cursor = file.get_cursor(); let mut msg_map = HashMap::::new(); while !cursor.is_at_end() { let raw_msg = cursor.read_raw_msg(); - let value = try_parse(&raw_msg); + let value = try_parse(raw_msg); let msg_type = get_msg_type(&value); msg_map.entry(msg_type).and_modify(|e| *e += 1).or_insert(1); } - return msg_map; + msg_map } fn inspect_print_summary(filename: &str) { @@ -162,7 +162,7 @@ fn action_inspect(cli: &Cli, head: Option, summary: bool) { fn write_raw_msg(msg: &[u8]) { let length = (msg.len() as i64).to_le_bytes(); - io::stdout().write(&length).unwrap(); + io::stdout().write_all(&length).unwrap(); io::stdout().write_all(msg).unwrap(); } @@ -172,7 +172,7 @@ where { let binding = serde_json::to_string(&value).expect("serialization should not fail"); let msg_raw = binding.as_bytes(); - write_raw_msg(&msg_raw); + write_raw_msg(msg_raw); } fn action_repeat(cli: &Cli, repetitions: usize) { @@ -182,15 +182,13 @@ fn action_repeat(cli: &Cli, repetitions: usize) { cursor.seek_to_first_header_of_type("dheader-1.0"); let dheader = cursor.read_raw_msg(); - write_raw_msg(&dheader); + write_raw_msg(dheader); // detector config let detector_config_msg = cursor.read_raw_msg(); let _detector_config: DetectorConfig = serde_json::from_slice(detector_config_msg).unwrap(); let mut detector_config_value: serde_json::Value = - serde_json::from_slice::(detector_config_msg) - .unwrap() - .to_owned(); + serde_json::from_slice::(detector_config_msg).unwrap(); // XXX the heaer may lie about the number of images: let summary = get_summary(&cli.filename); @@ -225,14 +223,14 @@ fn action_repeat(cli: &Cli, repetitions: usize) { write_serializable(&dimage); let dimaged = rep_cursor.read_raw_msg(); - write_raw_msg(&dimaged); + write_raw_msg(dimaged); let image = rep_cursor.read_raw_msg(); - write_raw_msg(&image); + write_raw_msg(image); // NOTE: we don't fake the timestamps (yet) let config = rep_cursor.read_raw_msg(); - write_raw_msg(&config); + write_raw_msg(config); idx += 1; } @@ -240,8 +238,8 @@ fn action_repeat(cli: &Cli, repetitions: usize) { } fn action_sim(filename: &str, uri: &str) { - let mut sender = FrameSender::new(&uri, &filename); - sender.send_headers(); + let mut sender = FrameSender::new(uri, filename, false); + sender.send_headers(|| Some(())).unwrap(); sender.send_frames(); sender.send_footer(); }