diff --git a/src/action.rs b/src/action.rs index b5b13a8..a17f53d 100644 --- a/src/action.rs +++ b/src/action.rs @@ -4,6 +4,7 @@ mod publisher; mod types; mod subscribers; mod commands; +mod routine; pub use command::*; pub use commands::*; @@ -11,3 +12,4 @@ pub use subscriber::*; pub use subscribers::*; pub use publisher::*; pub use types::*; +pub use routine::*; diff --git a/src/action/command.rs b/src/action/command.rs index c84938e..f9ae575 100644 --- a/src/action/command.rs +++ b/src/action/command.rs @@ -1,9 +1,10 @@ -use crate::io::IOEvent; +use crate::io::IOType; +use crate::errors::ErrorType; -pub type CommandType = Box; +pub type CommandType = Box>; /// Abstraction for single atomic output operation -pub trait Command { - fn execute(&self) -> Option; +pub trait Command { + fn execute(&self, value: Option) -> Result, ErrorType>; } diff --git a/src/action/commands.rs b/src/action/commands.rs index e19dc6b..ffd60ce 100644 --- a/src/action/commands.rs +++ b/src/action/commands.rs @@ -1,3 +1,5 @@ mod notifier; +mod gpio; pub use notifier::*; +pub use gpio::*; \ No newline at end of file diff --git a/src/action/commands/gpio.rs b/src/action/commands/gpio.rs new file mode 100644 index 0000000..0486add --- /dev/null +++ b/src/action/commands/gpio.rs @@ -0,0 +1,233 @@ +use crate::action::{Command, IOCommand}; +use crate::io::{DeferredDevice, IOType, DeviceTraits, IODirection}; +use crate::errors::{Error, ErrorKind, ErrorType}; + +pub struct GPIOCommand { + func: IOCommand, +} + +impl GPIOCommand { + pub fn new(func: IOCommand, device: Option) -> Self { + if let Some(device) = device { + check_alignment(&func, device.clone()).unwrap(); + } + + Self { func } + } + + pub fn direction(&self) -> IODirection { + match self.func { + IOCommand::Input(_) => IODirection::Input, + IOCommand::Output(_) => IODirection::Output, + } + } +} + +impl Command for GPIOCommand { + /// Execute internally stored function. + /// + /// # Returns + /// If internal function is `IOCommand::Input`, then the value that is read from device is returned. + /// Otherwise, if `IOCommand::Output`, then `None` is returned. + fn execute(&self, value: Option) -> Result, ErrorType> { + match self.func { + IOCommand::Input(inner) => { + // throw warning for unused value + if let Some(_) = value { unused_value() } + + let read_value = inner(); + + Ok(Some(read_value)) + + }, + IOCommand::Output(inner) => { + let unwrapped_value = value.expect("No value was passed to write..."); + let _ = inner(unwrapped_value); // TODO: handle bad result + + Ok(None) + }, + } + } +} + +/// Panic if command and device are not aligned +pub fn check_alignment(command: &IOCommand, device: DeferredDevice) -> Result<(), ErrorType> { + let aligned = command.direction() == device.direction(); + match aligned { + true => Ok(()), + false => Err(misconfigured_error()) + } +} + +/// Generate an error for when command type does not match device type +pub fn misconfigured_error() -> ErrorType { + Error::new(ErrorKind::CommandError, "Misconfigured device! Device and command type do not match.") +} + +/// Print a warning on console stderr +fn unused_value() { + const MSG: &str = "Unused value passed when reading input..."; + eprintln!("{}", MSG); +} + +#[cfg(test)] +mod tests { + use crate::action::{IOCommand, check_alignment, GPIOCommand, Command}; + use crate::helpers::Deferrable; + use crate::io::{DeferredDevice, Device, DeviceType, GenericOutput, IdType, IODirection, GenericInput, IOType}; + use crate::storage::OwnedLog; + + const REGISTER_DEFAULT: IOType = IOType::PosInt8(255); + static mut REGISTER: IOType = REGISTER_DEFAULT; + + unsafe fn reset_register() { + REGISTER = REGISTER_DEFAULT; + } + + unsafe fn set_register(val: IOType) { + REGISTER = val; + } + + fn make_device(direction: &IODirection) -> DeferredDevice { + let name = ""; + let id = IdType::default(); + let log = OwnedLog::new(id, None).deferred(); + + let device = match direction { + IODirection::Input => { + DeviceType::Input(GenericInput::new(String::from(name), id, None, Some(log))) + } + IODirection::Output => { + DeviceType::Output(GenericOutput::new(String::from(name), id, None, Some(log))) + } + }; + device.deferred() + } + + #[test] + fn test_check_alignment() { + { + let direction = IODirection::Input; + let device = make_device(&direction); + + let command = IOCommand::Input(move || IOType::Float(0.0)); + + let result = check_alignment(&command, device); + match result { + Ok(_) => assert!(true), + Err(_) => assert!(false) + } + } + { + let direction = IODirection::Output; + let device = make_device(&direction); + + let command = IOCommand::Output(move |_| Ok(())); + + let result = check_alignment(&command, device); + match result { + Ok(_) => assert!(true), + Err(_) => assert!(false) + } + } + + + { + let direction = IODirection::Output; + let device = make_device(&direction); + + let command = IOCommand::Input(move || IOType::Float(0.0)); + + let result = check_alignment(&command, device); + match result { + Ok(_) => assert!(false), + Err(_) => assert!(true) + } + } + { + let direction = IODirection::Input; + let device = make_device(&direction); + + let command = IOCommand::Output(move |_| Ok(())); + + let result = check_alignment(&command, device); + match result { + Ok(_) => assert!(false), + Err(_) => assert!(true) + } + } + } + + #[test] + #[should_panic] + /// Assert that program panics when device and IOCommand are misaligned + /// This test case specifically uses an input device and an output command. + fn test_alignment_i_o() { + let direction = IODirection::Input; + let device = make_device(&direction); + + let command = IOCommand::Output(move |_| Ok(())); + + GPIOCommand::new(command, Some(device)); + } + + #[test] + #[should_panic] + /// Assert that program panics when device and IOCommand are misaligned + /// This test case specifically uses an output device and an input command. + fn test_alignment_o_i() { + let direction = IODirection::Output; + let device = make_device(&direction); + + let command = IOCommand::Input(move || IOType::default()); + + GPIOCommand::new(command, Some(device)); + } + + #[test] + fn test_execute() { + { + unsafe { reset_register(); } + + let func = IOCommand::Input(move || unsafe { + REGISTER + }); + let command = GPIOCommand::new(func, None); + + match command.execute(None) { + Ok(tentative) => unsafe { + match tentative { + Some(inner) => assert_eq!(REGISTER, inner), + None => assert!(false) + } + }, + Err(_) => assert!(false) + } + + } + { + unsafe { reset_register(); } + + let func = IOCommand::Output(move |val| unsafe { + set_register(val); + Ok(()) + }); + let command = GPIOCommand::new(func, None); + let value = IOType::Binary(true); + + unsafe { assert_ne!(REGISTER, value); } + + match command.execute(Some(value)) { + Ok(tentative) => { + match tentative { + Some(_) => assert!(false), + None => () + } + }, + Err(_) => assert!(false) + } + + unsafe { assert_eq!(REGISTER, value); } + } + } +} diff --git a/src/action/commands/notifier.rs b/src/action/commands/notifier.rs index cda3de9..33634e5 100644 --- a/src/action/commands/notifier.rs +++ b/src/action/commands/notifier.rs @@ -1,7 +1,8 @@ use std::sync::{Arc, Mutex}; use crate::action::{Command, CommandType}; +use crate::errors::ErrorType; use crate::helpers::{Deferrable, Deferred}; -use crate::io::IOEvent; +use crate::io::{IOEvent, IOType}; /// Simple command for printing a message to stdout pub struct SimpleNotifier { @@ -12,20 +13,20 @@ impl SimpleNotifier { pub fn new(msg: String) -> Self { Self { msg } } - pub fn command(msg: String) -> CommandType { + pub fn command(msg: String) -> CommandType { Box::new(Self::new(msg)) } } -impl Command for SimpleNotifier { - fn execute(&self) -> Option { +impl Command for SimpleNotifier { + fn execute(&self, _value: Option) -> Result, ErrorType> { println!("{}", self.msg); - None + Ok(None) } } impl Deferrable for SimpleNotifier { - type Inner = CommandType; + type Inner = CommandType; fn deferred(self) -> Deferred { Arc::new(Mutex::new(Box::new(self))) } diff --git a/src/action/routine.rs b/src/action/routine.rs new file mode 100644 index 0000000..15ca0d2 --- /dev/null +++ b/src/action/routine.rs @@ -0,0 +1,137 @@ +use std::sync::{Arc, Mutex, Weak}; +use chrono::{DateTime, Utc}; +use crate::action::{GPIOCommand, Command}; +use crate::errors::ErrorType; +use crate::helpers::Deferred; +use crate::io::{IOEvent, IOType, DeviceMetadata}; +use crate::storage::{HasLog, OwnedLog}; + +/// A `Command` that should be executed at a scheduled time *outside* of the normal event loop. +/// +/// Typically these should exclusively be `Output` events, such as completing a time bound operation. +/// +/// # Example +/// The primary use case is turning off a pump or other output after a predetermined period of time. +/// The normal event loop will execute the first action, but to avoid blocking the thread, a `Routine` +/// should be scheduled. +pub struct Routine { + /// Scheduled time to execute function + timestamp: DateTime, + + /// Copy of owning device metadata + /// A copy is used to avoid locking issues since scheduled commands might be time critical. + metadata: DeviceMetadata, + + /// Value to pass to `GPIOCommand` + value: IOType, + + /// Weak reference to log for originating device + log: Weak>, + + command: GPIOCommand, +} + +impl Routine { + pub fn new( + timestamp: DateTime, + metadata: DeviceMetadata, + value: IOType, + log: Deferred, + command: GPIOCommand) -> Self + { + let log = Arc::downgrade(&log); + Self { timestamp, metadata, value, log, command } + } + /// Main polling function + /// + /// Checks scheduled time, then executes command. `IOEvent` is automatically added to device log. + /// + /// # Returns + /// bool based on if execution was successful or not. This value should be used to drop `Routine` from + /// external store. + pub fn attempt(&self) -> bool { + let now = Utc::now(); + if now >= self.timestamp { + let result = self.execute(Some(self.value)); + match result { + Ok(event) => { + let event = event.unwrap(); + let _ = self.add_to_log(event); + return true + } + Err(e) => { + eprintln!("{}", e); + } + }; + }; + + // return false by default + false + } +} + +impl Command for Routine { + fn execute(&self, value: Option) -> Result, ErrorType> { + match self.command.execute(value) { + Ok(_) => { + let event = IOEvent::generate(&self.metadata, self.timestamp, value.unwrap()); + Ok(Some(event)) + } + Err(e) => Err(e) + } + } +} + +impl HasLog for Routine { + fn log(&self) -> Option> { + Some(self.log.upgrade().unwrap()) + } +} + +#[cfg(test)] +mod tests { + use chrono::{Duration, Utc}; + use crate::action::{GPIOCommand, IOCommand, Routine}; + use crate::helpers::Deferrable; + use crate::io::{DeviceMetadata, IOType}; + use crate::storage::{MappedCollection, OwnedLog}; + + const REGISTER_DEFAULT: IOType = IOType::Binary(false); + static mut REGISTER: IOType = REGISTER_DEFAULT; + + unsafe fn reset_register() { + REGISTER = REGISTER_DEFAULT; + } + + unsafe fn set_register(val: IOType) { + REGISTER = val; + } + + #[test] + fn test_attempt() { + unsafe { reset_register(); } + let metadata = DeviceMetadata::default(); + + let log = OwnedLog::new(metadata.id, None).deferred(); + + let func = IOCommand::Output(move |val| unsafe { + set_register(val); + Ok(()) + }); + let command = GPIOCommand::new(func, None); + + let timestamp = Utc::now() + Duration::microseconds(5); + let value = IOType::Binary(true); + let routine = Routine::new(timestamp, metadata, value, log.clone(), command); + + unsafe { assert_ne!(REGISTER, value); } + + while Utc::now() < timestamp { + assert_eq!(false, routine.attempt()); + } + + assert!(routine.attempt()); + unsafe { assert_eq!(REGISTER, value); } + assert_eq!(log.try_lock().unwrap().length(), 1); + } +} \ No newline at end of file diff --git a/src/action/subscriber.rs b/src/action/subscriber.rs index 39546ea..e3658a0 100644 --- a/src/action/subscriber.rs +++ b/src/action/subscriber.rs @@ -18,7 +18,7 @@ pub trait SubscriberStrategy { fn name(&self) -> String; /// Primary method to evaluate incoming data /// Returned IOEvent should be logged - fn evaluate(&mut self, data: &IOEvent) -> Option; + fn evaluate(&mut self, data: &IOEvent); fn publisher(&self) -> &Option>; fn add_publisher(&mut self, publisher: Deferred); diff --git a/src/action/subscribers/pid.rs b/src/action/subscribers/pid.rs index 3a90c1c..054ff57 100644 --- a/src/action/subscribers/pid.rs +++ b/src/action/subscribers/pid.rs @@ -22,7 +22,7 @@ impl SubscriberStrategy for PIDMonitor { fn name(&self) -> String { self.name.clone() } - fn evaluate(&mut self, _data: &IOEvent) -> Option { + fn evaluate(&mut self, _data: &IOEvent) { todo!() // maintain PID } diff --git a/src/action/subscribers/threshold.rs b/src/action/subscribers/threshold.rs index f9178b4..30c79d9 100644 --- a/src/action/subscribers/threshold.rs +++ b/src/action/subscribers/threshold.rs @@ -55,16 +55,14 @@ impl SubscriberStrategy for ThresholdNotifier { self.name.clone() } - fn evaluate(&mut self, event: &IOEvent) -> Option { - let value = event.data.value; + fn evaluate(&mut self, data: &IOEvent) { + let value = data.data.value; let exceeded = match &self.trigger { &Comparison::GT => value >= self.threshold, &Comparison::LT => value <= self.threshold, }; if exceeded { - (self.factory)(value, self.threshold).execute() - } else { - None + let _ = (self.factory)(value, self.threshold).execute(None); } } diff --git a/src/action/types.rs b/src/action/types.rs index 6a267fe..6492450 100644 --- a/src/action/types.rs +++ b/src/action/types.rs @@ -1,7 +1,21 @@ //! Type aliases for functions and closures to assist `ActionBuilder`. //! These aliases allow for strongly structuring the dynamic initialization of subscriber/command instances. use crate::action::CommandType; -use crate::io::IOType; +use crate::io::{IODirection, IOEvent, IOType}; // Command Factories -pub type BaseCommandFactory = fn(IOType, IOType) -> CommandType; \ No newline at end of file +#[derive(Copy, Clone)] +pub enum IOCommand { + Input(fn() -> IOType), + Output(fn(IOType) -> Result<(), ()>), +} +impl IOCommand { + pub fn direction(&self) -> IODirection { + match self { + IOCommand::Input(_) => IODirection::Input, + IOCommand::Output(_) => IODirection::Output, + } + } +} + +pub type BaseCommandFactory = fn(IOType, IOType) -> CommandType; \ No newline at end of file diff --git a/src/builders/action.rs b/src/builders/action.rs index f57377b..9262d76 100644 --- a/src/builders/action.rs +++ b/src/builders/action.rs @@ -1,7 +1,7 @@ use std::ops::DerefMut; use crate::action::{BaseCommandFactory, Comparison, Publisher, PublisherInstance, ThresholdNotifier}; -use crate::errors::{ErrorKind, Error, Result}; +use crate::errors::{ErrorKind, Error, ErrorType}; use crate::helpers::{Deferrable, Deferred}; use crate::io::{DeferredDevice, DeviceType, IOType, DeviceWrapper}; @@ -23,7 +23,7 @@ impl ActionBuilder { /// `Err` is returned if passed device is not input. /// # Args /// - device: Device to add pub/subs. Should be Input - pub fn new(device: DeferredDevice) -> Result { + pub fn new(device: DeferredDevice) -> Result { if device.is_output() { return Err(Error::new(ErrorKind::DeviceError, "Passed device is output. Expected input.")) } diff --git a/src/builders/device_log.rs b/src/builders/device_log.rs index 7739458..e7f46d9 100644 --- a/src/builders/device_log.rs +++ b/src/builders/device_log.rs @@ -1,3 +1,5 @@ +use std::ops::DerefMut; +use crate::action::{IOCommand, GPIOCommand}; use crate::helpers::{Deferrable, Deferred}; use crate::io::{ DeferredDevice, Device, DeviceType, GenericInput, GenericOutput, IODirection, IOKind, IdType, @@ -9,6 +11,7 @@ use std::sync::{Arc, Mutex, Weak}; pub struct DeviceLogBuilder { device: DeferredDevice, log: Deferred, + command: IOCommand, } impl DeviceLogBuilder { @@ -17,30 +20,81 @@ impl DeviceLogBuilder { id: &IdType, kind: &Option, direction: &IODirection, + command: &IOCommand, settings: Option>, ) -> Self { - let log: Deferred = Arc::new(Mutex::new(OwnedLog::new(*id, settings))); + + check_command_alignment(command, direction, name); + + let log: Deferred; + let device = match direction { IODirection::Output => { - let output = GenericOutput::new(name.to_string(), *id, *kind, log.clone()); + let mut output = GenericOutput::new( + name.to_string(), + *id, + *kind, + None, + ); + log = output.init_log(settings); DeviceType::Output(output) - } + }, IODirection::Input => { - let input = GenericInput::new(name.to_string(), *id, *kind, log.clone()); + let mut input = GenericInput::new( + name.to_string(), + *id, + *kind, + None, + ); + log = input.init_log(settings); DeviceType::Input(input) - } + }, }; + // wrap device let wrapped = device.deferred(); + + // set log owner let downgraded: Weak> = Arc::downgrade(&wrapped.clone()); log.lock().unwrap().set_owner(downgraded); + Self { device: wrapped, log, + command: *command } } pub fn get(&self) -> (DeferredDevice, Deferred) { (self.device.clone(), self.log.clone()) } + + /// Create a `GPIOCommand` from `command` field. + /// + /// # Notes + /// Alignment of command and device type is checked in `::new()` by `check_command_alignment()` + pub fn setup_command(&self) { + let gpio = GPIOCommand::new(self.command, Some(self.device.clone())); + + let mut binding = self.device.lock().unwrap(); + let device = binding.deref_mut(); + + match device { + DeviceType::Input(inner) => { + inner.add_command(gpio) + }, + DeviceType::Output(inner) => { + inner.add_command(gpio) + } + } + } } + +/// Check that `DeviceType` and `IOCommand` align +/// +/// Program panics and dies if direction is misaligned. +fn check_command_alignment(command: &IOCommand, direction: &IODirection, name: &str) { + if command.direction() != *direction { + panic!("IOCommand type and `IODirection do not align for {}", name); + } +} \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs index 70e5003..4dfb8d4 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,7 +1,7 @@ use std::error::Error as _Error; use std::fmt; -pub type Result = std::result::Result>; +pub type ErrorType = Box; #[derive(Debug)] pub enum ErrorKind { @@ -11,7 +11,9 @@ pub enum ErrorKind { SerializationError, - DeviceError, + DeviceError, // error originating from device implementation + + CommandError, // error originating from command implementation } #[derive(Debug)] @@ -21,7 +23,7 @@ pub struct Error { } impl Error { - pub fn new(kind: ErrorKind, msg: &str) -> Box { + pub fn new(kind: ErrorKind, msg: &str) -> ErrorType { let message = String::from(msg); Box::new(Error { kind, message }) } @@ -35,6 +37,7 @@ impl fmt::Display for Error { ErrorKind::ContainerNotEmpty => "Container is not empty", ErrorKind::SerializationError => "Error during serialization", ErrorKind::DeviceError => "Wrong type of device passed", + ErrorKind::CommandError => "Error in command implementation", }; write!(f, "{}: {}", pretext, self.message) diff --git a/src/helpers.rs b/src/helpers.rs index c0d2490..c22462a 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::ops::Deref; use std::sync::{Arc, Mutex}; -use crate::errors::Result; +use crate::errors::ErrorType; /// Return a writable `File` from a given path. /// File does not exist, then an attempt is made to create the file. @@ -19,7 +19,7 @@ pub fn writable_or_create(path: String) -> File { /// Check a sequence of `Result` /// This used to check the returned outputs of recursive or parallel operations. /// This does not crash the program but instead prints any errors via `dbg!`. -pub fn check_results(results: &[Result]) -> Result<()> { +pub fn check_results(results: &[Result]) -> Result<(), ErrorType> { for result in results { match result { Err(e) => dbg!(e), diff --git a/src/io/device.rs b/src/io/device.rs index d7445ca..c96535e 100644 --- a/src/io/device.rs +++ b/src/io/device.rs @@ -1,10 +1,13 @@ //! Provide Low-level Device Functionality use std::fmt::Formatter; -use chrono::{DateTime, Utc}; -use crate::helpers::Deferred; -use crate::io::metadata::DeviceMetadata; -use crate::io::{IODirection, IOKind, IdType, IOType, IOEvent}; -use crate::storage::OwnedLog; +use std::sync::Arc; +use chrono::Utc; +use crate::action::GPIOCommand; +use crate::errors::*; +use crate::helpers::{Deferrable, Deferred}; +use crate::io::{IODirection, IOKind, IdType, IOType, IOEvent, DeviceMetadata}; +use crate::settings::Settings; +use crate::storage::{HasLog, OwnedLog}; /// Defines a minimum interface for interacting with GPIO devices. /// @@ -12,13 +15,15 @@ use crate::storage::OwnedLog; /// Additionally, an accessor, `metadata()` is defined to provide for the facade methods to access /// device name, id, direction, and kind. Therefore, implementing structs shall implement a field /// `metadata` that is mutably accessed through the reciprocal getter method. -pub trait Device { +pub trait Device: HasLog { + /// Creates a new instance of the device with the given parameters. + /// /// `name`: name of device. /// `id`: device ID. /// `kind`: kind of I/O device. Optional argument. /// `log`: Optional deferred owned log for the device. - fn new(name: String, id: IdType, kind: Option, log: Deferred) -> Self + fn new(name: String, id: IdType, kind: Option, log: Option>) -> Self where Self: Sized; @@ -46,8 +51,35 @@ pub trait Device { self.metadata().kind } - /// Generate an `IOEvent` instance from provided value or `::rx()` - fn generate_event(&self, dt: DateTime, value: Option) -> IOEvent; + /// Generate an `IOEvent` instance from provided value + /// + /// This is used by internal `command` for building events from given data. + /// Input devices pass read value; output devices pass write value. + /// + /// # Notes + /// Utc time is generated within this function. This allows each call to be more accurately + /// recorded instead of using a single time when polling. Accurate record keeping is more + /// valuable than a slight hit to performance. + /// + /// Additionally, internally generating timestamp adds a layer of separation between + /// device trait objects and any of it's owners (i.e.: `PollGroup`). + fn generate_event(&self, value: IOType) -> IOEvent { + let dt = Utc::now(); + IOEvent::generate(self.metadata(), dt, value) + } + + /// Setter for `command` field + fn add_command(&mut self, command: GPIOCommand); + + /// Setter for `log` field + fn add_log(&mut self, log: Deferred); + + /// Initialize, set, and return log. + fn init_log(&mut self, settings: Option>) -> Deferred { + let log = OwnedLog::new(self.id(), settings).deferred(); + self.add_log(log.clone()); + log + } } impl std::fmt::Debug for dyn Device { @@ -63,3 +95,6 @@ impl std::fmt::Debug for dyn Device { } } +pub fn no_internal_closure() -> Box { + Error::new(ErrorKind::CommandError, "Device has no internal closure") +} diff --git a/src/io/event.rs b/src/io/event.rs index 3a2abe5..dba848f 100644 --- a/src/io/event.rs +++ b/src/io/event.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::io::types::{IOData, IOType, IdTraits}; -use crate::io::{Device, IODirection, IdType}; +use crate::io::{IODirection, IdType, DeviceMetadata}; /// Encapsulates `IOData` alongside of timestamp and device data #[derive(Debug, Serialize, Deserialize, Clone, Copy)] @@ -32,12 +32,11 @@ impl IOEvent { /// ``` /// /// ``` - pub fn generate(device: &(impl Device + ?Sized), timestamp: DateTime, value: IOType) -> Self { - let direction = device.direction(); - let info = device.metadata(); - let id = info.id; + pub fn generate(metadata: &DeviceMetadata, timestamp: DateTime, value: IOType) -> Self { + let direction = metadata.direction; + let id = metadata.id; let data = IOData { - kind: info.kind.clone(), + kind: metadata.kind, value, }; IOEvent { diff --git a/src/io/input.rs b/src/io/input.rs index 9beca81..e56ee87 100644 --- a/src/io/input.rs +++ b/src/io/input.rs @@ -1,17 +1,17 @@ -use crate::errors; +use crate::action::{Command, GPIOCommand, Publisher, PublisherInstance}; +use crate::errors::ErrorType; use crate::helpers::{Deferrable, Deferred}; -use crate::io::types::{IOType, DeviceType}; -use crate::io::{Device, DeviceMetadata, IdType, IODirection, IOEvent, IOKind}; -use crate::storage::{MappedCollection, OwnedLog}; -use chrono::{DateTime, Utc}; +use crate::io::types::DeviceType; +use crate::io::{Device, DeviceMetadata, IdType, IODirection, IOEvent, IOKind, no_internal_closure}; +use crate::storage::{HasLog, OwnedLog}; use std::sync::{Arc, Mutex}; -use crate::action::{Publisher, PublisherInstance}; #[derive(Default)] pub struct GenericInput { metadata: DeviceMetadata, - pub log: Deferred, + log: Option>, publisher: Option>, + command: Option, } impl Deferrable for GenericInput { @@ -27,12 +27,11 @@ impl Device for GenericInput { /// Creates a mock sensor which returns a value /// /// # Arguments - /// /// * `name`: arbitrary name of sensor /// * `id`: arbitrary, numeric ID to differentiate from other sensors /// /// returns: MockPhSensor - fn new(name: String, id: IdType, kind: Option, log: Deferred) -> Self + fn new(name: String, id: IdType, kind: Option, log: Option>) -> Self where Self: Sized, { @@ -40,11 +39,13 @@ impl Device for GenericInput { let metadata: DeviceMetadata = DeviceMetadata::new(name, id, kind, IODirection::Input); let publisher = None; + let command = None; - GenericInput { + Self { metadata, log, publisher, + command, } } @@ -52,33 +53,50 @@ impl Device for GenericInput { &self.metadata } - /// Generate an `IOEvent` instance from provided value or `::rx()` - fn generate_event(&self, dt: DateTime, value: Option) -> IOEvent { - IOEvent::generate(self, dt, value.unwrap_or_else(move || self.rx())) + fn add_command(&mut self, command: GPIOCommand) { + self.command = Some(command); + } + + fn add_log(&mut self, log: Deferred) { + self.log = Some(log) } } impl GenericInput { /// Return a mock value - pub fn rx(&self) -> IOType { - IOType::Float(1.2) + pub fn rx(&self) -> Result { + // Execute GPIO command + let read_value = if let Some(command) = &self.command { + let result = command.execute(None).unwrap(); + result.unwrap() + } else { return Err(no_internal_closure()) }; + + Ok(self.generate_event(read_value)) + } + + /// Propagate `IOEvent` to all subscribers. + /// + /// No error is raised when there is no associated publisher. + fn propagate(&mut self, event: &IOEvent) { + if let Some(publisher) = &self.publisher { + publisher.lock().unwrap().notify(&event); + }; } /// Get IOEvent, add to log, and propagate to publisher/subscribers + /// /// Primary interface method during polling. - pub fn read(&mut self, time: DateTime) -> errors::Result { - // get IOEvent - let event = self.generate_event(time, None); - - // propagate to publisher/subscribers - match &self.publisher { - Some(publisher) => publisher.lock().unwrap().notify(&event), - _ => () - }; + /// + /// # Notes + /// This method will fail if there is no associated log + pub fn read(&mut self) -> Result { + + let event = self.rx().expect("Error returned by `rx()`"); + + self.propagate(&event); + + self.add_to_log(event); - // add to log - let mut binding = self.log.lock().unwrap(); - binding.push(time, event)?; Ok(event) } @@ -99,34 +117,49 @@ impl GenericInput { } } +impl HasLog for GenericInput { + fn log(&self) -> Option> { + self.log.clone() + } +} + // Testing #[cfg(test)] mod tests { - use chrono::Utc; - use crate::action::PublisherInstance; + use crate::action::{GPIOCommand, IOCommand, PublisherInstance}; use crate::helpers::Deferrable; use crate::io::{Device, GenericInput, IOType}; + use crate::storage::MappedCollection; const DUMMY_OUTPUT: IOType = IOType::Float(1.2); + const COMMAND: IOCommand = IOCommand::Input(move || DUMMY_OUTPUT); #[test] fn test_rx() { - let input = GenericInput::default(); - assert_eq!(input.rx(), DUMMY_OUTPUT); + let mut input = GenericInput::default(); + + input.command = Some(GPIOCommand::new(COMMAND, None)); + + let event = input.rx().unwrap(); + assert_eq!(event.data.value, DUMMY_OUTPUT); } #[test] fn test_read() { let mut input = GenericInput::default(); + let log = input.init_log(None); + + input.command = Some(GPIOCommand::new(COMMAND, None)); + + assert_eq!(log.try_lock().unwrap().length(), 0); - let time = Utc::now(); - let event = input.read(time).unwrap(); + let event = input.read().unwrap(); assert_eq!(event.data.value, DUMMY_OUTPUT); - assert_eq!(event.timestamp, time); assert_eq!(event.data.kind, input.kind()); - // TODO: attach log and assert that IOEvent has been added to log + // assert that event was added to log + assert_eq!(log.try_lock().unwrap().length(), 1); } /// Test `::add_publisher()` and `::has_publisher()` diff --git a/src/io/output.rs b/src/io/output.rs index c99bf60..039e3bc 100644 --- a/src/io/output.rs +++ b/src/io/output.rs @@ -1,30 +1,18 @@ -use crate::errors; +use crate::action::{Command, GPIOCommand}; +use crate::errors::ErrorType; use crate::helpers::{Deferrable, Deferred}; -use crate::io::{Device, IOType, DeviceType}; -use crate::io::{ - DeviceMetadata, IODirection, IOEvent, IOKind, IdType, -}; -use crate::storage::{MappedCollection, OwnedLog}; +use crate::io::{DeviceMetadata, IODirection, IOEvent, IOKind, IdType, Device, IOType, DeviceType, no_internal_closure}; +use crate::storage::{HasLog, OwnedLog}; use std::sync::{Arc, Mutex}; -use chrono::{DateTime, Utc}; +#[derive(Default)] pub struct GenericOutput { metadata: DeviceMetadata, // cached state state: IOType, - pub log: Deferred, -} -impl Default for GenericOutput { - /// Overwrite default value for `IODirection` in `DeviceMetadata` - fn default() -> Self { - let mut metadata = DeviceMetadata::default(); - metadata.direction = IODirection::Output; - - let state = IOType::default(); - let log = Arc::new(Mutex::new(OwnedLog::default())); - Self { metadata, state, log } - } + log: Option>, + command: Option, } impl Deferrable for GenericOutput { @@ -45,47 +33,57 @@ impl Device for GenericOutput { /// * `id`: arbitrary, numeric ID to differentiate from other devices /// /// returns: GenericOutput - fn new(name: String, id: IdType, kind: Option, log: Deferred) -> Self + fn new(name: String, id: IdType, kind: Option, log: Option>) -> Self where Self: Sized, { let kind = kind.unwrap_or_default(); let state = IOType::default(); - let metadata: DeviceMetadata = DeviceMetadata::new(name, id, kind, IODirection::Input); + let metadata: DeviceMetadata = DeviceMetadata::new(name, id, kind, IODirection::Output); + + let command = None; - GenericOutput { metadata, state, log } + Self { metadata, state, log, command } } fn metadata(&self) -> &DeviceMetadata { &self.metadata } - /// Generate an `IOEvent` instance from provided value or `::tx()` - fn generate_event(&self, dt: DateTime, value: Option) -> IOEvent { - IOEvent::generate(self, dt, value.unwrap()) + fn add_command(&mut self, command: GPIOCommand) { + self.command = Some(command); + } + + fn add_log(&mut self, log: Deferred) { + self.log = Some(log) } } impl GenericOutput { /// Return a mock value - pub fn tx(&self, value: &IOType) -> IOEvent { - /* low-level functionality goes here */ - self.generate_event(Utc::now(), Some(*value)) + pub fn tx(&self, value: IOType) -> Result { + // Execute GPIO command + if let Some(command) = &self.command { + command.execute(Some(value)).unwrap(); + } else { return Err(no_internal_closure()) }; + + Ok(self.generate_event(value)) } /// Primary interface method during polling. + /// /// Calls `tx()`, updates cached state, and saves to log. - pub fn write(&mut self, value: &IOType) -> errors::Result { - let event = self.tx(value); + /// + /// # Notes + /// This method will fail if there is no associated log + pub fn write(&mut self, value: IOType) -> Result { + let event = self.tx(value).expect("Error returned by `tx()`"); // update cached state self.state = event.data.value; - // add to log - self.log - .lock() - .unwrap() - .push(event.timestamp, event.clone())?; + self.add_to_log(event); + Ok(event) } @@ -96,42 +94,61 @@ impl GenericOutput { } } +impl HasLog for GenericOutput { + fn log(&self) -> Option> { + self.log.clone() + } +} + #[cfg(test)] mod tests { + use crate::action::{GPIOCommand, IOCommand}; use crate::io::{Device, GenericOutput, IOType}; + use crate::storage::MappedCollection; + + /// Dummy output command for testing. + /// Accepts value and returns `Ok(())` + const COMMAND: IOCommand = IOCommand::Output(move |_| Ok(())); #[test] fn test_tx() { - let value = IOType::Binary(true); - let output = GenericOutput::default(); + let mut output = GenericOutput::default(); + output.command = Some(GPIOCommand::new(COMMAND, None)); - let new = output.tx(&value); + let value = IOType::Binary(true); + let event = output.tx(value).expect("Unknown error occurred in `tx()`"); - assert_eq!(value, new.data.value); - assert_eq!(output.kind(), new.data.kind); - assert_eq!(output.direction(), new.direction); + assert_eq!(value, event.data.value); + assert_eq!(output.kind(), event.data.kind); + assert_eq!(output.direction(), event.direction); } #[test] /// Test that `tx()` was called, cached state was updated, and IOEvent added to log. fn test_write() { - let value = IOType::Binary(true); let mut output = GenericOutput::default(); + let log = output.init_log(None); + + assert_eq!(log.try_lock().unwrap().length(), 0); + + let value = IOType::Binary(true); + output.command = Some(GPIOCommand::new(COMMAND, None)); // check `state` before `::write()` assert_ne!(value, *output.state()); - let new = output.write(&value).expect("Unknown error returned by `::write()`"); + let event = output.write(value).expect("Unknown error returned by `::write()`"); // check state after `::write()` assert_eq!(value, *output.state()); // check returned `IOEvent` - assert_eq!(value, new.data.value); - assert_eq!(output.kind(), new.data.kind); - assert_eq!(output.direction(), new.direction); + assert_eq!(value, event.data.value); + assert_eq!(output.kind(), event.data.kind); + assert_eq!(output.direction(), event.direction); - // TODO: attach log and assert that event was added to log + // assert that event was added to log + assert_eq!(log.try_lock().unwrap().length(), 1); } } \ No newline at end of file diff --git a/src/io/types.rs b/src/io/types.rs index 469ed7d..9f9b8cb 100644 --- a/src/io/types.rs +++ b/src/io/types.rs @@ -149,6 +149,7 @@ pub trait DeviceTraits { fn name(&self) -> String; fn id(&self) -> IdType; fn kind(&self) -> IOKind; + fn direction(&self) -> IODirection; } pub enum DeviceType { @@ -197,6 +198,13 @@ impl DeviceTraits for DeviceType { Self::Input(inner) => inner.kind(), } } + + fn direction(&self) -> IODirection { + match self { + Self::Output(inner) => inner.direction(), + Self::Input(inner) => inner.direction(), + } + } } pub type DeferredDevice = Deferred; @@ -212,15 +220,19 @@ impl DeviceWrapper for DeferredDevice { } impl DeviceTraits for DeferredDevice { fn name(&self) -> String { - self.lock().unwrap().name() + self.try_lock().unwrap().name() } fn id(&self) -> IdType { - self.lock().unwrap().id() + self.try_lock().unwrap().id() } fn kind(&self) -> IOKind { - self.lock().unwrap().kind() + self.try_lock().unwrap().kind() + } + + fn direction(&self) -> IODirection { + self.try_lock().unwrap().direction() } } diff --git a/src/main.rs b/src/main.rs index 4857e22..6fdc078 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,9 +12,9 @@ mod units; use std::sync::Arc; -use crate::action::{BaseCommandFactory, Comparison, SimpleNotifier}; +use crate::action::{BaseCommandFactory, Comparison, SimpleNotifier, IOCommand}; use crate::builders::ActionBuilder; -use crate::errors::Result; +use crate::errors::ErrorType; use crate::io::{IODirection, IOKind, IOType}; use crate::settings::Settings; use crate::storage::{Persistent, PollGroup}; @@ -35,21 +35,24 @@ fn init(name: &str) -> PollGroup { group } -fn main() -> Result<()> { +fn setup_poller() -> PollGroup { let mut poller = init("main"); + let config = vec![ - ("test name", 0, IOKind::PH, IODirection::Input), - ("second sensor", 1, IOKind::Flow, IODirection::Input), + ("test name", 0, IOKind::PH, IODirection::Input, IOCommand::Input(move || IOType::Float(1.2))), + ("second sensor", 1, IOKind::Flow, IODirection::Input, IOCommand::Input(move || IOType::Float(0.5))), ]; poller.add_devices(&config).unwrap(); + poller +} - // build subscribers/commands +fn build_subscribers(poller: &mut PollGroup) { println!("\nBuilding subscribers ..."); for (id, input) in poller.inputs.iter() { println!("\n- Setting up builder ..."); - let builder = ActionBuilder::new(input.clone()); + let mut builder = ActionBuilder::new(input.clone()).unwrap(); println!("- Initializing subscriber ..."); @@ -58,24 +61,42 @@ fn main() -> Result<()> { let trigger = Comparison::GT; let factory: BaseCommandFactory = |value, threshold| SimpleNotifier::command(format!("{} exceeded {}", value, threshold)); - builder?.add_threshold(&name, threshold, trigger, factory); + builder.add_threshold(&name, threshold, trigger, factory); } println!("\n... Finished building\n"); +} + +fn poll(poller: &mut PollGroup) -> Result<(), ErrorType> { + match poller.poll() { + Ok(_) => match poller.save(&None) { + Ok(_) => (), + Err(t) => { + return Err(t); + } + }, + _ => (), + }; + Ok(()) +} + +fn attempt_scheduled(poller: &mut PollGroup) { + poller.check_scheduled() +} + +fn main() -> Result<(), ErrorType> { + let mut poller = setup_poller(); + build_subscribers(&mut poller); + // main event loop println!("... Beginning polling ...\n"); loop { - let polled = poller.poll(); - match polled { - Ok(_) => match poller.save(&None) { - Ok(_) => (), - Err(t) => { - return Err(t); - } - }, - _ => (), - }; + + poll(&mut poller).expect("Error occurred during polling"); + + attempt_scheduled(&mut poller); + std::thread::sleep(FREQUENCY); } } diff --git a/src/storage/collection.rs b/src/storage/collection.rs index b2cda5d..d39d9a4 100644 --- a/src/storage/collection.rs +++ b/src/storage/collection.rs @@ -1,4 +1,4 @@ -use crate::errors; +use crate::errors::ErrorType; use crate::io::IdTraits; /// Define a basic interface to interact with underlying data. @@ -10,7 +10,7 @@ use crate::io::IdTraits; pub trait MappedCollection { /// Add a key-value pair to the collection and return a boolean indicating if the addition was successful. /// If the key already existed, then `false` is returned. - fn push(&mut self, key: K, data: T) -> errors::Result<&mut T>; + fn push(&mut self, key: K, data: T) -> Result<&mut T, ErrorType>; /// Access object by key /// Since key might not exist, an option is returned. diff --git a/src/storage/container.rs b/src/storage/container.rs index 7f3b960..62ec2aa 100644 --- a/src/storage/container.rs +++ b/src/storage/container.rs @@ -18,7 +18,7 @@ /// store a collection of objects of a specific type `T`, and identified by a specific key type `K`. The relationship /// between `Containerized` and `Container` is that `Containerized` defines how the `Container` should be created /// and used for a specific type, while `Container` actually holds the collection of objects. -use crate::errors::{Error, ErrorKind, Result}; +use crate::errors::{Error, ErrorKind, ErrorType}; use crate::io::IdTraits; use crate::storage::collection::MappedCollection; use serde::{Deserialize, Serialize}; @@ -61,7 +61,7 @@ impl MappedCollection for Container { /// Using `entry` method on the inner HashMap to check if the key already exists in the HashMap /// - If the key already exists, the returned value is `std::collections::hash_map::Entry::Occupied`, which returns false. /// - If the key does not exist, the returned value is `std::collections::hash_map::Entry::Vacant`, which inserts the key-value pair into the HashMap and returns true. - fn push(&mut self, key: K, data: T) -> Result<&mut T> { + fn push(&mut self, key: K, data: T) -> Result<&mut T, ErrorType> { match self.inner.entry(key) { std::collections::hash_map::Entry::Occupied(_) => { Err(Error::new(ErrorKind::ContainerError, "Key already exists")) diff --git a/src/storage/grouping.rs b/src/storage/grouping.rs index 80ac946..d1e738a 100644 --- a/src/storage/grouping.rs +++ b/src/storage/grouping.rs @@ -1,10 +1,10 @@ use std::ops::DerefMut; use chrono::{DateTime, Duration, Utc}; use std::sync::Arc; -use crate::action::PublisherInstance; +use crate::action::{IOCommand, PublisherInstance, Routine}; use crate::builders::DeviceLogBuilder; use crate::helpers::Deferred; -use crate::errors::Result; +use crate::errors::ErrorType; use crate::helpers::check_results; use crate::io::{IdType, IOKind, DeviceContainer, IOEvent, DeviceType, IODirection}; use crate::settings::Settings; @@ -29,20 +29,21 @@ pub struct PollGroup { pub logs: LogContainer, pub inputs: DeviceContainer, pub publishers: Vec>, + pub scheduled: Vec, } impl PollGroup { /// Iterate through container once. Call `get_event()` on each value. /// Update according to the lowest rate. - pub fn poll(&mut self) -> std::result::Result>, ()> { - let mut results: Vec> = Vec::new(); + pub fn poll(&mut self) -> Result>, ()> { + let mut results: Vec> = Vec::new(); let next_execution = self.last_execution + self.settings.interval; if next_execution <= Utc::now() { for (_, input) in self.inputs.iter_mut() { - let mut device = input.lock().unwrap(); + let mut device = input.try_lock().unwrap(); if let DeviceType::Input(inner) = device.deref_mut() { - let result = inner.read(next_execution); + let result = inner.read(); results.push(result); } } @@ -53,6 +54,19 @@ impl PollGroup { } } + pub fn check_scheduled(&mut self) { + let mut executed = Vec::default(); + for (index, routine) in self.scheduled.iter().enumerate() { + if routine.attempt() { + executed.push(index); + } + } + // remove completed + for index in executed { + self.scheduled.remove(index); + } + } + /// Constructor for `Poller` struct. /// Initialized empty containers. pub fn new(name: &str, settings: Option>) -> Self { @@ -62,6 +76,7 @@ impl PollGroup { let inputs = >::default(); let logs = Vec::default(); let publishers = Vec::default(); + let scheduled = Vec::default(); Self { _name: String::from(name), @@ -69,19 +84,30 @@ impl PollGroup { last_execution, logs, inputs, - publishers + publishers, + scheduled, } } /// Build device interface and log. /// /// Add device to store - pub fn build_device(&mut self, name: &str, id: &IdType, kind: &Option, direction: &IODirection) -> Result> { + pub fn build_device( + &mut self, + name: &str, + id: &IdType, + kind: &Option, + direction: &IODirection, + command: &IOCommand, + ) -> Result, ErrorType> { // variable allowed to go out-of-scope because `poller` owns reference let settings = Some(self.settings.clone()); - let builder = DeviceLogBuilder::new(name, id, kind, direction, settings); + let builder = DeviceLogBuilder::new(name, id, kind, direction, command, settings); + builder.setup_command(); + let (device, log) = builder.get(); + self.logs.push(log); match direction { @@ -98,13 +124,13 @@ impl PollGroup { Ok(device) } - /// Builds multiple input objects and respective `OwnedLog` containers. + /// Builds multiple input objects and their respective `OwnedLog` containers. /// # Args: /// Single array should be any sequence of tuples containing a name literal, an `IdType`, and an `IOKind` - pub fn add_devices(&mut self, arr: &[(&str, IdType, IOKind, IODirection)]) -> Result<()> { + pub fn add_devices(&mut self, arr: &[(&str, IdType, IOKind, IODirection, IOCommand)]) -> Result<(), ErrorType> { let mut results = Vec::default(); - for (name, id, kind, direction) in arr.iter().to_owned() { - let result = self.build_device(name, id, &Some(*kind), direction); + for (name, id, kind, direction, command) in arr.iter().to_owned() { + let result = self.build_device(name, id, &Some(*kind), direction, command); results.push(result); }; check_results(&results) @@ -119,7 +145,7 @@ impl PollGroup { /// # Notes /// This works because each log container should have it's own name upon initialization /// from hardcoded input devices. - fn load_logs(&self, path: &Option) -> Result<()> { + fn load_logs(&self, path: &Option) -> Result<(), ErrorType> { let mut results = Vec::new(); for log in self.logs.iter() { let result = log.lock().unwrap().load(path); @@ -132,7 +158,7 @@ impl PollGroup { /// # Notes /// This works because each log container should have it's own name upon initialization /// from hardcoded input devices. - fn save_logs(&self, path: &Option) -> Result<()> { + fn save_logs(&self, path: &Option) -> Result<(), ErrorType> { let mut results = Vec::new(); for log in self.logs.iter() { let result = log.lock().unwrap().save(path); @@ -145,12 +171,12 @@ impl PollGroup { /// Only save and load log data since PollGroup is statically initialized /// If `&None` is given to either methods, then current directory is used. impl Persistent for PollGroup { - fn save(&self, path: &Option) -> Result<()> { + fn save(&self, path: &Option) -> Result<(), ErrorType> { let results = &[self.save_logs(path)]; check_results(results) } - fn load(&mut self, path: &Option) -> Result<()> { + fn load(&mut self, path: &Option) -> Result<(), ErrorType> { let results = &[self.load_logs(path)]; check_results(results) } diff --git a/src/storage/logging.rs b/src/storage/logging.rs index 8ad3385..333b0b4 100644 --- a/src/storage/logging.rs +++ b/src/storage/logging.rs @@ -7,8 +7,8 @@ use std::io::{BufReader, BufWriter}; use std::ops::Deref; use std::sync::{Arc, Mutex, Weak}; -use crate::errors::{Error, ErrorKind, Result}; -use crate::helpers::{writable_or_create, Deferred}; +use crate::errors::{Error, ErrorKind, ErrorType}; +use crate::helpers::{writable_or_create, Deferred, Deferrable}; use crate::io::{IOEvent, IdType, DeviceType, DeferredDevice, DeviceTraits}; use crate::settings::Settings; use crate::storage::{Container, MappedCollection, Persistent}; @@ -21,6 +21,16 @@ pub type LogContainer = Vec>; const FILETYPE: &str = ".json"; +pub trait HasLog { + fn log(&self) -> Option>; + + fn add_to_log(&self, event: IOEvent) { + let log = self.log().expect("No log is associated"); + log.try_lock().unwrap() + .push(event.timestamp, event).expect("Unknown error when adding event to log"); + } +} + // Encapsulates a `LogType` alongside a weak reference to a `Device` #[derive(Serialize, Deserialize, Default)] pub struct OwnedLog { @@ -75,10 +85,17 @@ impl OwnedLog { pub fn iter(&self) -> Iter, IOEvent> { self.log.iter() } + + pub fn orphan(&self) -> bool { + match self.owner { + Some(_) => false, + None => true, + } + } } impl MappedCollection> for OwnedLog { - fn push(&mut self, key: DateTime, data: IOEvent) -> Result<&mut IOEvent> { + fn push(&mut self, key: DateTime, data: IOEvent) -> Result<&mut IOEvent, ErrorType> { self.log.push(key, data) } @@ -101,7 +118,7 @@ impl MappedCollection> for OwnedLog { // Implement save/load operations for `LogType` impl Persistent for OwnedLog { - fn save(&self, path: &Option) -> Result<()> { + fn save(&self, path: &Option) -> Result<(), ErrorType> { if self.log.is_empty() { Err(Error::new( ErrorKind::ContainerEmpty, @@ -123,7 +140,7 @@ impl Persistent for OwnedLog { } } - fn load(&mut self, path: &Option) -> Result<()> { + fn load(&mut self, path: &Option) -> Result<(), ErrorType> { if self.log.is_empty() { let file = File::open(self.full_path(path).deref())?; let reader = BufReader::new(file); @@ -148,6 +165,13 @@ impl Persistent for OwnedLog { } } +impl Deferrable for OwnedLog { + type Inner = OwnedLog; + fn deferred(self) -> Deferred { + Arc::new(Mutex::new(self)) + } +} + impl std::fmt::Debug for OwnedLog { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( @@ -158,3 +182,71 @@ impl std::fmt::Debug for OwnedLog { ) } } + +// Testing +#[cfg(test)] +mod tests { + use crate::builders::DeviceLogBuilder; + use crate::helpers::Deferred; + use crate::io::{Device, IOKind, IdType, DeviceType, IODirection, IOType}; + use crate::storage::{MappedCollection, OwnedLog, Persistent}; + use std::path::Path; + use std::time::Duration; + use std::{fs, thread}; + use std::ops::Deref; + use crate::action::IOCommand; + + fn add_to_log(device: &Deferred, log: &Deferred, count: usize) { + for _ in 0..count { + let binding = device.lock().unwrap(); + let event = match binding.deref() { + DeviceType::Input(inner) => inner.generate_event(IOType::default()), + DeviceType::Output(inner) => inner.generate_event(IOType::default()), + }; + log.lock().unwrap().push(event.timestamp, event).unwrap(); + thread::sleep(Duration::from_nanos(1)); // add delay so that we don't finish too quickly + } + } + + #[test] + fn test_load_save() { + const SENSOR_NAME: &str = "test"; + const ID: IdType = 32; + const COUNT: usize = 10; + const COMMAND: IOCommand = IOCommand::Input(move || IOType::default()); + + /* NOTE: More complex `IOEvent` objects *could* be checked, but we are trusting `serde`. + These tests only count the number of `IOEvent`'s added. */ + + let filename; + // test save + { + let builder = DeviceLogBuilder::new(SENSOR_NAME, &ID, &Some(IOKind::Flow), + &IODirection::Input, &COMMAND, None); + let (device, log) = builder.get(); + add_to_log(&device, &log, COUNT); + let _log = log.lock().unwrap(); + _log.save(&None).unwrap(); + + // save filename for later + filename = _log.filename(); + // check that file exists + assert!(Path::new(&filename).exists()); + }; + + // test load + // build back up then load + { + let builder = DeviceLogBuilder::new(SENSOR_NAME, &ID, &Some(IOKind::Flow), + &IODirection::Input, &COMMAND, None); + let (_device, log) = builder.get(); + let mut _log = log.lock().unwrap(); + _log.load(&None).unwrap(); + + // check count of `IOEvent` + assert_eq!(COUNT, _log.length() as usize); + }; + + fs::remove_file(filename).unwrap(); + } +} \ No newline at end of file diff --git a/src/storage/persistent.rs b/src/storage/persistent.rs index cc925b8..fd4c658 100644 --- a/src/storage/persistent.rs +++ b/src/storage/persistent.rs @@ -1,10 +1,10 @@ -use crate::errors::Result; +use crate::errors::ErrorType; // trait that expresses an interface to save or load from disk pub trait Persistent { // save data to disk - fn save(&self, path: &Option) -> Result<()>; + fn save(&self, path: &Option) -> Result<(), ErrorType>; // load from disk - fn load(&mut self, path: &Option) -> Result<()>; + fn load(&mut self, path: &Option) -> Result<(), ErrorType>; } diff --git a/tests/builder_tests.rs b/tests/builder_tests.rs index 3fb4ede..3ef2946 100644 --- a/tests/builder_tests.rs +++ b/tests/builder_tests.rs @@ -1,8 +1,8 @@ use std::ops::Deref; -use sensd::action::{BaseCommandFactory, Comparison, SimpleNotifier}; +use sensd::action::{BaseCommandFactory, Comparison, IOCommand, SimpleNotifier}; use sensd::helpers::*; -use sensd::builders::ActionBuilder; -use sensd::io::{DeviceType, GenericInput, IOType}; +use sensd::builders::{ActionBuilder, DeviceLogBuilder}; +use sensd::io::{DeviceType, GenericInput, IdType, IODirection, IOKind, IOType, DeviceTraits}; #[test] fn test_action_builder() { @@ -28,6 +28,23 @@ fn test_action_builder() { } #[test] -fn test_pub_sub_builder() { - unimplemented!() +fn test_device_log_builder() { + const NAME: &str = "device name"; + const ID: IdType = 0; + const DIRECTION: IODirection = IODirection::Input; + const KIND: IOKind = IOKind::Unassigned; + + let command = IOCommand::Input(move || IOType::default()); + let builder = DeviceLogBuilder::new( + NAME, + &ID, + &Some(KIND), + &DIRECTION, + &command, + None + ); + let (device, log) = builder.get(); + + assert_eq!(false, log.lock().unwrap().orphan()); + assert!(log.lock().unwrap().filename().contains(&device.lock().unwrap().name())); } \ No newline at end of file diff --git a/tests/grouping_test.rs b/tests/grouping_test.rs index 6caaf3f..fc60992 100644 --- a/tests/grouping_test.rs +++ b/tests/grouping_test.rs @@ -1,16 +1,18 @@ use chrono::Duration; -use sensd::io::{IODirection, IOKind}; +use sensd::io::{IODirection, IOKind, IOType}; use sensd::settings::Settings; use sensd::storage::PollGroup; use std::sync::Arc; +use sensd::action::IOCommand; #[test] fn test_add_device() { let mut poller: PollGroup = PollGroup::new("main", None); + let command = IOCommand::Input(move || IOType::default()); let config = vec![ - ("test name", 0, IOKind::PH, IODirection::Input), - ("second sensor", 1, IOKind::EC, IODirection::Input), + ( "test name", 0, IOKind::PH, IODirection::Input, command.clone(), ), + ( "second sensor", 1, IOKind::EC, IODirection::Input, command.clone(), ), ]; poller.add_devices(&config).unwrap(); @@ -23,9 +25,10 @@ fn test_add_to_log() { settings.interval = Duration::nanoseconds(1); let mut poller: PollGroup = PollGroup::new("main", Some(Arc::new(settings))); + let command = IOCommand::Input(move || IOType::default()); let config = vec![ - ("test name", 0, IOKind::AmbientTemperature, IODirection::Input), - ("second sensor", 1, IOKind::Color, IODirection::Input), + ("test name", 0, IOKind::AmbientTemperature, IODirection::Input, command.clone()), + ("second sensor", 1, IOKind::Color, IODirection::Input, command.clone()), ]; poller.add_devices(&config).unwrap(); diff --git a/tests/ownedlog_test.rs b/tests/ownedlog_test.rs deleted file mode 100644 index b7943ec..0000000 --- a/tests/ownedlog_test.rs +++ /dev/null @@ -1,62 +0,0 @@ -use chrono::Utc; -use sensd::builders::DeviceLogBuilder; -use sensd::helpers::Deferred; -use sensd::io::{Device, IOKind, IdType, DeviceType, IODirection}; -use sensd::storage::{MappedCollection, OwnedLog, Persistent}; -use std::path::Path; -use std::time::Duration; -use std::{fs, thread}; -use std::ops::Deref; - -fn add_to_log(device: &Deferred, log: &Deferred, count: usize) { - for _ in 0..count { - let binding = device.lock().unwrap(); - let event = match binding.deref() { - DeviceType::Input(inner) => inner.generate_event(Utc::now(), None), - DeviceType::Output(inner) => inner.generate_event(Utc::now(), None), - }; - log.lock().unwrap().push(event.timestamp, event).unwrap(); - thread::sleep(Duration::from_nanos(1)); // add delay so that we don't finish too quickly - } -} - -#[test] -fn test_load_save() { - const SENSOR_NAME: &str = "test"; - const ID: IdType = 32; - const COUNT: usize = 10; - - /* NOTE: More complex `IOEvent` objects *could* be checked, but we are trusting `serde`. - These tests only count the number of `IOEvent`'s added. */ - - let filename; - // test save - { - let builder = DeviceLogBuilder::new(SENSOR_NAME, &ID, &Some(IOKind::Flow), - &IODirection::Input, None); - let (device, log) = builder.get(); - add_to_log(&device, &log, COUNT); - let _log = log.lock().unwrap(); - _log.save(&None).unwrap(); - - // save filename for later - filename = _log.filename(); - // check that file exists - assert!(Path::new(&filename).exists()); - }; - - // test load - // build back up then load - { - let builder = DeviceLogBuilder::new(SENSOR_NAME, &ID, &Some(IOKind::Flow), - &IODirection::Input, None); - let (_device, log) = builder.get(); - let mut _log = log.lock().unwrap(); - _log.load(&None).unwrap(); - - // check count of `IOEvent` - assert_eq!(COUNT, _log.length() as usize); - }; - - fs::remove_file(filename).unwrap(); -}