Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MetricSummary::bounds cleanup #390

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 45 additions & 32 deletions crates/counter-agg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@

use time_series::TSPoint;
use stats_agg::{XYPair, stats2d::StatsSummary2D};
use serde::{Deserialize, Serialize};
use std::fmt;


pub mod range;
pub mod stable;

#[cfg(test)]
mod tests;
Expand All @@ -23,7 +22,7 @@ pub enum CounterError{
// nonsensical results rather than unsound behavior, garbage in garbage out.
// But much better if we can validate at deserialization. We can do that in
// the builder if we want.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub struct MetricSummary {
// TODO invariants?
pub first: TSPoint,
Expand All @@ -43,7 +42,7 @@ pub struct MetricSummary {
// out upon garbage in.
pub stats: StatsSummary2D,
// TODO See TODOs in I64Range about protecting from deserialization.
pub bounds: Option<range::I64Range>,
pub bounds: range::I64Range,
}

// Note that this can lose fidelity with the timestamp, but it would only lose it in the microseconds,
Expand All @@ -66,8 +65,8 @@ fn to_seconds(t: f64)-> f64{
/// it is treated as a reset of the counter and the previous value is added to the "true value" of the
/// counter at that timestamp.
impl MetricSummary {
pub fn new(pt: &TSPoint, bounds:Option<range::I64Range>) -> MetricSummary {
let mut n = MetricSummary{
pub fn new(pt: &TSPoint, bounds: range::I64Range) -> MetricSummary {
let mut n = MetricSummary {
first: *pt,
second: *pt,
penultimate: *pt,
Expand Down Expand Up @@ -150,7 +149,7 @@ impl MetricSummary {
self.num_changes += incoming.num_changes;

self.stats = self.stats.combine(stats).unwrap();
self.bounds_extend(incoming.bounds);
self.bounds_extend(incoming.bounds.clone());
Ok(())
}

Expand Down Expand Up @@ -204,42 +203,56 @@ impl MetricSummary {
}

pub fn bounds_valid(&self) -> bool {
match self.bounds{
None => true, // unbounded contains everything
Some(b) => b.contains(self.last.ts) && b.contains(self.first.ts)
}
self.bounds.contains(self.last.ts) && self.bounds.contains(self.first.ts)
}

fn bounds_extend(&mut self, in_bounds:Option<range::I64Range>){
match (self.bounds, in_bounds) {
(None, _) => {self.bounds = in_bounds},
(_, None) => {},
(Some(mut a), Some(b)) => {
a.extend(&b);
self.bounds = Some(a);
}
};
fn bounds_extend(&mut self, in_bounds: range::I64Range) {
// TODO(epg): This preserves existing behavior, which seems odd to me:
// match (self.bounds, in_bounds) {
// If we're unbounded, narrow to in_bounds (that's not "extend").
// (None, _) => self.bounds = in_bounds,
if self.bounds.is_infinite() {
self.bounds = in_bounds;
}
// Else if in_bounds is infinite, ignore it!
// (_, None) => {}
else if in_bounds.is_infinite() {
}
// Else, widen or narrow at both ends as necessary (makes sense but inconsistent!).
// (Some(mut a), Some(b)) => {
else {
self.bounds.extend(&in_bounds);
}
}

// based on: https://github.com/timescale/promscale_extension/blob/d51a0958442f66cb78d38b584a10100f0d278298/src/lib.rs#L208,
// which is based on: // https://github.com/prometheus/prometheus/blob/e5ffa8c9a08a5ee4185271c8c26051ddc1388b7a/promql/functions.go#L59
pub fn prometheus_delta(&self) -> Result<Option<f64>, CounterError>{
if self.bounds.is_none() || !self.bounds_valid() || self.bounds.unwrap().has_infinite() {
if !self.bounds_valid() {
return Err(CounterError::BoundsInvalid);
}
let (left, right);
match self.bounds.both() {
None => return Err(CounterError::BoundsInvalid),
Some((l, r)) => {
left = l;
right = r;
}
}
Comment on lines +234 to +241
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there really should be a cleaner way to do this unwrap_or_return behavior, but if there is I don't know it.

//must have at least 2 values
if self.single_value() || self.bounds.unwrap().is_singleton() { //technically, the is_singleton check is redundant, it's included for clarity (any singleton bound that is valid can only be one point)
if self.single_value() || self.bounds.is_singleton() {
//technically, the is_singleton check is redundant, it's included for clarity (any singleton bound that is valid can only be one point)
return Ok(None);
}

let mut result_val = self.delta();

// all calculated durations in seconds in Prom implementation, so we'll do that here.
// we can unwrap all of the bounds accesses as they are guaranteed to be there from the checks above
let mut duration_to_start = to_seconds((self.first.ts - self.bounds.unwrap().left.unwrap()) as f64);
let mut duration_to_start = to_seconds((self.first.ts - left) as f64);

/* bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. Subtract an extra ms, ours is in microseconds. */
let duration_to_end = to_seconds((self.bounds.unwrap().right.unwrap() - self.last.ts - 1_000) as f64);
let duration_to_end = to_seconds((right - self.last.ts - 1_000) as f64);
let sampled_interval = self.time_delta();
let avg_duration_between_samples = sampled_interval / (self.stats.n - 1) as f64; // don't have to worry about divide by zero because we know we have at least 2 values from the above.

Expand Down Expand Up @@ -282,9 +295,9 @@ impl MetricSummary {
return Ok(None);
}
let delta = delta.unwrap();
let bounds = self.bounds.unwrap() ; // if we got through delta without error then we have bounds
let bounds = &self.bounds; // if we got through delta without error then we have bounds
/* bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. So subtract an extra ms from the duration*/
let duration = bounds.duration().unwrap() - 1_000;
let duration = bounds.duration() - 1_000;
if duration <= 0 {
return Ok(None); // if we have a total duration under a ms, it's less than prom could deal with so we return none.
}
Expand All @@ -304,11 +317,11 @@ impl fmt::Display for CounterError {
}
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq)]
pub struct GaugeSummaryBuilder(MetricSummary);

impl GaugeSummaryBuilder {
pub fn new(pt: &TSPoint, bounds: Option<range::I64Range>) -> Self {
pub fn new(pt: &TSPoint, bounds: range::I64Range) -> Self {
Self(MetricSummary::new(pt, bounds))
}

Expand All @@ -322,7 +335,7 @@ impl GaugeSummaryBuilder {
self.0.combine(incoming)
}

pub fn set_bounds(&mut self, bounds: Option<range::I64Range>) {
pub fn set_bounds(&mut self, bounds: range::I64Range) {
self.0.bounds = bounds;
}

Expand All @@ -346,11 +359,11 @@ impl From<MetricSummary> for GaugeSummaryBuilder {
}
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq)]
pub struct CounterSummaryBuilder(MetricSummary);

impl CounterSummaryBuilder {
pub fn new(pt: &TSPoint, bounds: Option<range::I64Range>) -> Self {
pub fn new(pt: &TSPoint, bounds: range::I64Range) -> Self {
Self(MetricSummary::new(pt, bounds))
}

Expand All @@ -366,7 +379,7 @@ impl CounterSummaryBuilder {
self.0.combine(incoming)
}

pub fn set_bounds(&mut self, bounds: Option<range::I64Range>) {
pub fn set_bounds(&mut self, bounds: range::I64Range) {
self.0.bounds = bounds;
}

Expand Down
146 changes: 106 additions & 40 deletions crates/counter-agg/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,77 @@ use serde::{Deserialize, Serialize};
// we are a discrete type so translating is simple [), this enforces equality
// between ranges like [0, 10) and [0, 9]
// None values denote infinite bounds on that side
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[repr(C)]
pub struct I64Range {
pub left: Option<i64>,
pub right: Option<i64>
left: Option<i64>,
right: Option<i64>
}


impl I64Range {
pub fn has_infinite(&self)-> bool{
self.left.is_none() || self.right.is_none()
/// Panics if `left` > `right`.
pub fn new(left: Option<i64>, right: Option<i64>) -> Self {
let range = Self { left, right };
assert!(range.is_valid());
range
}

// TODO See TODO below about range validity. Right now we don't care
// much. If we start to care, move the caring to `new` and `extend`
// methods. That will allow this crate to protect the integrity of
// MetricSummary and I64Range in the face of the extension needing to be
// able to construct them from raw (and therefore potentially
// corrupt) inputs.
fn is_valid(&self) -> bool {
match (self.left, self.right) {
(Some(a), Some(b)) => a <= b,
_ => true,
pub fn infinite() -> Self {
Self {
left: None,
right: None,
}
}

pub fn is_singleton(&self) -> bool{
/// Return `Some([left]))` when it is finite, else `None`.
#[inline]
pub fn left(&self) -> Option<i64> {
self.left
}

/// Return `Some([right]))` when it is finite, else `None`.
#[inline]
pub fn right(&self) -> Option<i64> {
self.right
}

/// Return `Some(([left], [right]))` when both are finite, else `None`.
pub fn both(&self) -> Option<(i64, i64)> {
match (self.left, self.right) {
(Some(a), Some(b)) => a == b,
_ => false,
(Some(left), Some(right)) => Some((left, right)),
_ => None,
}
}

pub fn is_infinite_either(&self) -> bool {
self.is_infinite_left() || self.is_infinite_right()
}

pub fn is_infinite(&self) -> bool {
self.is_infinite_left() && self.is_infinite_right()
}

pub fn is_infinite_left(&self) -> bool {
self.left.is_none()
}

pub fn is_infinite_right(&self) -> bool {
self.right.is_none()
}

// TODO See TODO below about range validity.
fn is_valid(&self) -> bool {
self.both()
.map(|(left, right)| left <= right)
.unwrap_or(true)
}

pub fn is_singleton(&self) -> bool {
self.both()
.map(|(left, right)| left == right)
.unwrap_or(false)
}

pub fn extend(&mut self, other: &Self) {
// TODO: What should extend do with invalid ranges on either side? right now it treats them as if they are real...
self.left = match (self.left, other.left) {
Expand All @@ -61,15 +99,11 @@ impl I64Range {
(None, None) => true,
}
}

// pub fn contains(&self, other: I64Range) -> bool{
// unimplemented!()
// }
pub fn duration(&self) -> Option<i64> {
if self.has_infinite() || !self.is_valid() {
return None
}
Some(self.right.unwrap() - self.left.unwrap())

/// Panics if either `left` or `right` is infinite.
pub fn duration(&self) -> i64 {
let (left, right) = self.both().expect("infinite duration");
right - left
}
}

Expand Down Expand Up @@ -131,7 +165,6 @@ mod tests {
let normal = I64Range{left:Some(2), right:Some(9)};
weird.extend(&normal);
assert_eq!(weird, I64Range{left:Some(-6), right:Some(9)});

}

#[test]
Expand Down Expand Up @@ -163,21 +196,33 @@ mod tests {
#[test]
fn test_duration(){
let a = I64Range{left:Some(3), right:Some(7)};
assert_eq!(a.duration().unwrap(), 4);
assert_eq!(a.duration(), 4);
let a = I64Range{left:Some(-3), right:Some(7)};
assert_eq!(a.duration().unwrap(), 10);
let a = I64Range{left:None, right:Some(7)};
assert_eq!(a.duration(), None);
let a = I64Range{left:Some(3), right:None};
assert_eq!(a.duration(), None);
//invalid ranges return None durations as well
let a = I64Range{left:Some(3), right:Some(0)};
assert_eq!(a.duration(), None);
assert_eq!(a.duration(), 10);
}

#[test]
#[should_panic(expected = "infinite duration")]
fn duration_infinite_left() {
I64Range{left:None, right:Some(7)}
.duration();
}

#[test]
#[should_panic(expected = "infinite duration")]
fn duration_infinite_right() {
I64Range{left:Some(-1), right:None}
.duration();
}

#[test]
fn test_checks(){
#[should_panic(expected = "infinite duration")]
fn duration_infinite_both() {
I64Range::infinite().duration();
}

#[test]
fn test_checks() {
let a = I64Range{left:Some(2), right:Some(5)};
assert!(a.is_valid());
assert!(!a.is_singleton());
Expand All @@ -190,9 +235,30 @@ mod tests {
let a = I64Range{left:Some(2), right:Some(2)};
assert!(a.is_valid());
assert!(a.is_singleton());
assert_eq!(a.duration().unwrap(), 0);
assert_eq!(a.duration(), 0);
let a = I64Range{left:Some(0), right:Some(-10)};
assert!(!a.is_valid());
assert!(!a.is_singleton());
}

#[test]
fn infinite() {
let range = I64Range { left: None, right: None };
assert!(range.contains(i64::MIN));
assert!(range.contains(i64::MIN + 1));
assert!(range.contains(i64::MAX));
assert!(range.contains(i64::MAX - 1));
}

#[test]
fn exclude_i64_max() {
let range = I64Range { left: Some(i64::MIN), right: Some(i64::MAX) };
assert!(range.contains(i64::MIN));
// TODO If we don't need to exclude i64::MAX, we can simplify even
// further and make right non-Option (left already doesn't need to be
// Option as None and Some(i64::MIN) are handled the same way.
// How important is it that we draw the line at
// 9,223,372,036,854,775,807 rather than 9,223,372,036,854,775,806?
assert!(!range.contains(i64::MAX));
}
}
Loading