Skip to content

Commit

Permalink
Plumb actions into ingress
Browse files Browse the repository at this point in the history
  • Loading branch information
RobbieMcKinstry committed Nov 26, 2024
1 parent cce8680 commit bb1c850
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 38 deletions.
25 changes: 8 additions & 17 deletions src/adapters/engines/chi.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use tokio::time::Instant;
use crate::pipeline::TimeoutBehavior;
use crate::{
metrics::ResponseStatusCode, pipeline::{StageConfig, StageDetails}, stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group}
metrics::ResponseStatusCode,
pipeline::{StageConfig, StageDetails},
stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group},
};
use crate::pipeline::TimeoutBehavior;
use tokio::time::Instant;

use super::{Action, DecisionEngine};

Expand All @@ -11,7 +13,7 @@ use super::{Action, DecisionEngine};
pub struct ChiSquareEngine {
table: ContingencyTable<5, ResponseStatusCode>,
stages: StageConfig,
start_time: Instant
start_time: Instant,
}

impl ChiSquareEngine {
Expand Down Expand Up @@ -69,26 +71,15 @@ impl DecisionEngine<CategoricalObservation<5, ResponseStatusCode>> for ChiSquare
// If we've timed out, but there's no significant failure, then
// we advance the stage.
(true, TimeoutBehavior::Advance) => {
let details = self.advance();
let details = self.advance();
match details {
Some(details) => Some(Action::RampTo(details.canary_traffic())),
None => Some(Action::Promote),
}
},
}
// Otherwise, we keep observing.
(false, TimeoutBehavior::Advance) => None,
}

/*
// Now, we check to see if this stage has timed out yet.
stage.is_timed_out()
// TODO: Remember to call self.control_data.set_experimental_total
// for calculating the expected values.
todo!()
*/
}
}

Expand Down
30 changes: 23 additions & 7 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::adapters::{batch_observations, EngineController};
use crate::adapters::{batch_observations, Action, EngineController};
use crate::{
adapters::{repeat_query, DecisionEngine, Ingress, Monitor},
metrics::ResponseStatusCode,
Expand All @@ -8,7 +8,9 @@ use bon::bon;
use miette::Result;

pub(crate) use percent::WholePercent;
pub (crate) use stages::{StageDetails, StageConfig, TimeoutBehavior};
pub(crate) use stages::{StageConfig, StageDetails, TimeoutBehavior};
use tokio::pin;
use tokio_stream::StreamExt as _;

/// An alias for the Response Code-based monitor.
pub type ResponseMonitor = Box<dyn Monitor<Item = CategoricalObservation<5, ResponseStatusCode>>>;
Expand Down Expand Up @@ -42,7 +44,7 @@ impl Pipeline {

/// Run the pipeline, spawning tasks for the monitor, ingress, and engine to run
/// independently.
pub async fn run(self) -> Result<()> {
pub async fn run(mut self) -> Result<()> {
// TODO: Handle graceful shutdown.
// • First, create a shutdown channel which we
// pass to each thread.
Expand All @@ -58,11 +60,25 @@ impl Pipeline {
let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY);
// • We push observations from the Monitor into the
// DecisionEngine.
let _controller =
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
let actions = EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
pin!(actions);
// • Pipe actions to the Ingress.

todo!();
while let Some(action) = actions.next().await {
match action {
Action::Promote => {
self.ingress.promote_canary().await?;
return Ok(());
}
Action::Rollback => {
self.ingress.rollback_canary().await?;
return Ok(());
}
Action::RampTo(percent) => {
self.ingress.set_canary_traffic(percent).await?;
}
}
}
unreachable!("Connection to Decision Engine terminated early.");
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/pipeline/percent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ impl DecimalPercent {
// Using a newtype allows us to ensure they're correct by construction.
#[nutype(
validate(less_or_equal = 100),
derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash)
derive(
Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash
)
)]
pub(crate) struct WholePercent(u8);

Expand Down
10 changes: 6 additions & 4 deletions src/pipeline/stages/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ impl Default for StageConfig {

#[cfg(test)]
mod tests {
use crate::{pipeline::stages::config::{
DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC,
}, WholePercent};
use crate::{
pipeline::stages::config::{DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC},
WholePercent,
};

use super::StageConfig;

Expand All @@ -80,7 +81,8 @@ mod tests {
let mut conf = StageConfig::default();
let mut stage = conf.current();
for i in 0..4 {
let expected_traffic = Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap());
let expected_traffic =
Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap());
let observed_traffic = stage.map(|st| st.canary_traffic());
let expected_confidence = Some(DEFAULT_CANARY_STAGE_CONFIDENCE[i]);
let observed_confidence = stage.map(|st| st.badness_confidence_limit());
Expand Down
8 changes: 3 additions & 5 deletions src/pipeline/stages/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl StageDetails {

/// Returns true if the current time is beyond the timeout limit.
pub(crate) fn has_timed_out(&self, start_time: tokio::time::Instant) -> bool {
tokio::time::Instant::now().duration_since(start_time) > self.timeout
tokio::time::Instant::now().duration_since(start_time) > self.timeout
}
}

Expand Down Expand Up @@ -66,23 +66,21 @@ mod tests {

use super::StageDetails;


/// This test checks whether is_timed_out works by checking it against
/// times in the past and future.
#[test]
fn is_timed_out() {
let one_year_in_seconds = 60*60*24*365;
let one_year_in_seconds = 60 * 60 * 24 * 365;
let distant_past = Instant::now() - Duration::from_secs(one_year_in_seconds);
let far_future = Instant::now() + Duration::from_secs(one_year_in_seconds);
let example = StageDetails {
canary_traffic: WholePercent::try_new(100).unwrap(),
badness_confidence_limit: DecimalPercent::try_new(100.0).unwrap(),
timeout: Duration::from_secs(5*60), // five minutes
timeout: Duration::from_secs(5 * 60), // five minutes
timeout_behavior: crate::pipeline::stages::TimeoutBehavior::Advance,
};

assert_eq!(true, example.has_timed_out(distant_past));
assert_eq!(false, example.has_timed_out(far_future));

}
}
4 changes: 2 additions & 2 deletions src/pipeline/stages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub(crate) use config::StageConfig;
pub (crate) use details::StageDetails;
pub(crate) use details::StageDetails;

mod config;
mod details;
Expand All @@ -13,7 +13,7 @@ mod details;
/// of confidence, which only tells us how confident we are that
/// the deployment *isn't* safe).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub (crate) enum TimeoutBehavior {
pub(crate) enum TimeoutBehavior {
/// If we don't have confidence that the deployment is bad,
/// we advance to the next stage.
Advance,
Expand Down
2 changes: 1 addition & 1 deletion src/stats/contingency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<const N: usize, C: Categorical<N>> ContingencyTable<N, C> {
expected_in_category * total_observed / expected_total
}

/// calculate the expected count for the category with index `i`.
/// calculate the observed count for the category with index `i`.
pub fn observed_by_index(&self, i: usize) -> u32 {
self.observed.get_count_by_index(i)
}
Expand Down
2 changes: 1 addition & 1 deletion src/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub use categorical::Categorical;
pub use chi::chi_square_test;
pub use contingency::ContingencyTable;
pub use group::Group;
pub use observation::{CategoricalObservation, Observation};
pub use chi::chi_square_test;

/// For modeling categorical data.
mod categorical;
Expand Down

0 comments on commit bb1c850

Please sign in to comment.