-
Notifications
You must be signed in to change notification settings - Fork 5
/
core.rs
206 lines (182 loc) · 6.84 KB
/
core.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//! Provides handles and receivers for updating data via a clock or a file change.
use std::fmt::Debug;
use std::fs::File;
use std::io;
use std::io::Read;
use std::time::Duration;
use fs2::FileExt;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle;
use tracing::{debug, error, instrument};
use uuid::Uuid;
/// An `EventUuid` helpful for logging.
#[derive(Debug, Clone)]
pub struct EventUuid(pub String);
/// An `Event` type limited to `Clock` or `File` type events for now.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum Event {
/// Represents a `Clock` signal, i.e. some period of time has passed.
Clock(EventUuid),
/// Represents a `File` signal, i.e. a local file has changed.
File(EventUuid, String),
/// For potentially adding future fields.
#[non_exhaustive]
Unknown,
}
/// Gives guidance for a reasonable refresh rate for most applications
#[derive(Debug, Clone)]
pub enum RefreshRate {
/// 15 seconds is a reasonable refresh rate for most applications
FifteenSeconds,
/// Thirty second refresh rate
ThirtySeconds,
/// One minute refresh rate
OneMinute,
/// Warning: Setting refresh rates that are very low risks overwhelming the policy set source.
/// Users should be cautious when setting refresh rates less than the default and ensure
/// that their policy set source (the disk IO, AVP throttle limit, or anything else)
/// can handle the rate
Other(Duration),
}
impl RefreshRate {
/// Get refresh rate as Duration
pub fn value(&self) -> Duration {
match *self {
Self::FifteenSeconds => Duration::from_secs(15),
Self::ThirtySeconds => Duration::from_secs(30),
Self::OneMinute => Duration::from_secs(60),
Self::Other(d) => d,
}
}
}
/// `clock_ticker_task` will create a background thread that will send notification to a broadcast
/// channel periodically. The output will be a handle to this thread and the receiver of these
/// events.
#[instrument]
pub fn clock_ticker_task(refresh_rate: RefreshRate) -> (JoinHandle<()>, Receiver<Event>) {
let (sender, receiver) = broadcast::channel(10);
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(refresh_rate.value()).await;
let event = Event::Clock(EventUuid(Uuid::new_v4().to_string()));
match sender.send(event.clone()) {
Ok(_) => {
debug!("Successfully broadcast clock ticker event: event={event:?}");
}
Err(error) => {
error!(
"Failed to broadcast clock ticker event: event={event:?} : error={error:?}"
);
}
}
}
});
(handle, receiver)
}
/// `file_inspector_task` will create a background thread that will send a notification when the given file
/// has changed to a broadcast channel periodically. The output will be a handle to this thread and the
/// receiver of these events.
///
/// The mechanism for detecting change within the file is a standard `SHA-256` digest.
#[instrument]
pub fn file_inspector_task(
refresh_rate: RefreshRate,
path: String,
) -> (JoinHandle<()>, Receiver<Event>) {
/// The `FileChangeInspector` tells the authority when policies on disk have changed.
#[derive(Debug)]
struct FileChangeInspector {
/// The path to the file that is being monitored
file_path: String,
/// Defines the sha 256 of the file.
hash: Option<String>,
}
impl FileChangeInspector {
/// Creates a new instance of the `FileChangeInspector`
pub fn new(file_path: String) -> Self {
Self {
// This is the path to the file to monitor for changes.
file_path,
hash: None,
}
}
/// `changed` returns true if the file has changed.
/// It will return true on the first call after creating a `io::Error` instance.
#[instrument(skip(self), ret, err)]
pub fn changed(&mut self) -> Result<bool, io::Error> {
let mut file_data = String::new();
{
let mut file = File::open(self.file_path.clone())?;
file.lock_shared()?;
file.read_to_string(&mut file_data)?;
file.unlock()?;
}
let calculated_hash = sha256::digest(file_data);
if Some(calculated_hash.clone()) == self.hash {
debug!("Authorization data file has not changed");
return Ok(false);
}
self.hash = Some(calculated_hash);
debug!("Authorization data file has changed: hash={:?}", self.hash);
Ok(true)
}
}
let (sender, receiver) = broadcast::channel(10);
let mut inspector = FileChangeInspector::new(path.clone());
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(refresh_rate.value()).await;
match inspector.changed() {
Ok(true) => {
let event = Event::File(EventUuid(Uuid::new_v4().to_string()), path.clone());
match sender.send(event.clone()) {
Ok(_) => {
debug!("Successfully notificated authorization data file has changed: event={event:?}");
}
Err(error) => {
error!(
"Failed to notificate authorization data file has changed: event={event:?}: error={error:?}"
);
}
}
}
Err(e) => {
error!("Error using file: {e}");
return;
}
_ => {}
}
}
});
(handle, receiver)
}
#[cfg(test)]
mod test {
use std::time::Duration;
use tempfile::NamedTempFile;
use crate::public::events::core::{clock_ticker_task, file_inspector_task, Event, RefreshRate};
#[tokio::test]
async fn validate_send_receive() {
let (handle, mut receiver) =
clock_ticker_task(RefreshRate::Other(Duration::from_millis(1)));
assert!(receiver.recv().await.is_ok());
handle.abort();
}
#[tokio::test]
async fn validate_send_receive_file_inspector() {
let temp_file = NamedTempFile::new().unwrap();
let path = temp_file.path().to_str().unwrap().to_string();
let (_, mut receiver) =
file_inspector_task(RefreshRate::Other(Duration::from_millis(1)), path.clone());
match receiver.recv().await.unwrap() {
Event::File(_, recv_path) => {
assert_eq!(path, recv_path);
}
err => {
panic!("{err:?}");
}
}
}
}