diff --git a/src/db.rs b/src/db.rs index db5f89f..d031439 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,26 +14,11 @@ use byteorder::{BigEndian, ReadBytesExt}; use fjall::{Partition, PartitionCreateOptions, TxKeyspace}; use std::io::Cursor; use std::marker::PhantomData; +use std::ops::Bound; use std::sync::Arc; -use std::{collections::BTreeMap, ops::Bound, sync::RwLock}; pub const MINUTE_IN_NS: u128 = 60_000_000_000; -#[derive(Clone)] -pub struct Series { - pub(crate) id: SeriesId, - pub(crate) inner: Partition, -} - -impl Series { - pub fn insert(&self, ts: u128, value: Value) -> crate::Result<()> { - // NOTE: Invert timestamp to store in reverse order - self.inner - .insert((!ts).to_be_bytes(), value.to_be_bytes()) - .map_err(Into::into) - } -} - #[derive(Debug)] pub struct StreamItem { pub series_id: SeriesId, @@ -42,17 +27,26 @@ pub struct StreamItem { } pub struct SeriesStream { - // pub(crate) series_id: SeriesId, pub(crate) tags: OwnedTagSets, pub(crate) reader: Box>>, } pub struct DatabaseInner { pub(crate) keyspace: TxKeyspace, - series: RwLock>, + + /// Actual time series data + data: Partition, + + /// Series mapping, series key -> series ID smap: SeriesMapping, + + // Inverted index of tag permutations tag_index: TagIndex, + + /// Maps series ID to its tags pub(crate) tag_sets: TagSets, + + #[allow(unused)] hyper_mode: bool, } @@ -60,9 +54,6 @@ pub struct DatabaseInner { #[derive(Clone)] pub struct Database(Arc); -// TODO: series should be stored in FIFO... but FIFO should not cause write stalls -// if disjoint... - impl Database { /// Creates a new database builder. pub fn builder() -> DatabaseBuilder { @@ -74,36 +65,20 @@ impl Database { let tag_sets = TagSets::new(&keyspace)?; let series_mapping = SeriesMapping::new(&keyspace)?; - let mut series_map = BTreeMap::new(); - - // NOTE: Recover in-memory series map - for kv in series_mapping.partition.inner().iter() { - let (_, bytes) = kv?; - - let series_id = { - let mut reader = &bytes[..]; - reader.read_u64::()? - }; - - let series = Series { - id: series_id, - inner: keyspace - .open_partition( - &Self::get_series_name(series_id), - PartitionCreateOptions::default() - .block_size(64_000) - .compression(fjall::CompressionType::Lz4), - )? - .inner() - .clone(), - }; - - series_map.insert(series_id, series); - } + let data = keyspace + .open_partition( + "_talna#data", + PartitionCreateOptions::default() + .manual_journal_persist(hyper_mode) + .block_size(64_000) + .compression(fjall::CompressionType::Lz4), + )? + .inner() + .clone(); Ok(Self(Arc::new(DatabaseInner { keyspace, - series: RwLock::new(series_map), + data, smap: series_mapping, tag_index, tag_sets, @@ -111,8 +86,11 @@ impl Database { }))) } - fn get_series_name(series_id: SeriesId) -> String { - format!("_talna#s#{series_id}") + fn format_data_point_key(series_id: SeriesId, ts: u128) -> [u8; 24] { + let mut data_point_key = [0; std::mem::size_of::() + std::mem::size_of::()]; + data_point_key[0..8].copy_from_slice(&series_id.to_be_bytes()); + data_point_key[8..24].copy_from_slice(&(!ts).to_be_bytes()); + data_point_key } fn prepare_query( @@ -120,27 +98,54 @@ impl Database { series_ids: &[SeriesId], (min, max): (Bound, Bound), ) -> crate::Result> { - // NOTE: Invert timestamps because stored in reverse order - let range = ( - max.map(|x| u128::to_be_bytes(!x)), - min.map(|x| u128::to_be_bytes(!x)), - ); - - let lock = self.0.series.read().expect("lock is poisoned"); + use fjall::Slice; + use Bound::{Excluded, Included, Unbounded}; series_ids .iter() - .map(|id| lock.get(id).cloned().expect("series should exist")) - .map(|series| { + .map(|&series_id| { // TODO: maybe cache tagsets in QuickCache... - let tags = self.0.tag_sets.get(series.id)?; + let tags = self.0.tag_sets.get(series_id)?; + + let kv_stream: Box>> = + match (min, max) { + (Unbounded, Unbounded) => { + Box::new(self.0.data.prefix(series_id.to_be_bytes())) + } + (min @ (Included(_) | Excluded(_)), Unbounded) => { + let series_id_prefix_bytes = series_id.to_be_bytes(); + let min = min.map(|ts| Self::format_data_point_key(series_id, ts)); + + Box::new(self.0.data.range((Unbounded, min)).take_while(move |kv| { + match kv { + Ok((k, _)) => k.starts_with(&series_id_prefix_bytes), + Err(_) => true, + } + })) + } + (Unbounded, max @ (Included(_) | Excluded(_))) => { + let min = Self::format_data_point_key(series_id, 0); + let max = max.map(|ts| Self::format_data_point_key(series_id, ts)); + Box::new(self.0.data.range((max, Included(min)))) + } + (min @ (Included(_) | Excluded(_)), max @ (Included(_) | Excluded(_))) => { + let min = min.map(|ts| Self::format_data_point_key(series_id, ts)); + let max = max.map(|ts| Self::format_data_point_key(series_id, ts)); + Box::new(self.0.data.range((max, min))) + } + }; Ok(SeriesStream { - // series_id: series.id, tags, - reader: Box::new(series.inner.range(range).map(move |x| match x { + reader: Box::new(kv_stream.map(move |x| match x { Ok((k, v)) => { + use std::io::Seek; + let mut k = Cursor::new(k); + + // Skip series ID + k.seek_relative(std::mem::size_of::() as i64)?; + let ts = k.read_u128::()?; // NOTE: Invert timestamp back to original value let ts = !ts; @@ -154,7 +159,7 @@ impl Database { let value = v.read_f32::()?; Ok(StreamItem { - series_id: series.id, + series_id, value, ts, }) @@ -315,23 +320,16 @@ impl Database { let series_key = SeriesKey::format(metric, tags); let series_id: Option = self.0.smap.get(&series_key)?; - let series = if let Some(series_id) = series_id { + let series_id = if let Some(series_id) = series_id { // NOTE: Series already exists (happy path) - self.0 - .series - .read() - .expect("lock is poisoned") - .get(&series_id) - .cloned() - .expect("series should exist") + series_id } else { // NOTE: Create series self.initialize_new_series(&series_key, metric, tags)? }; - // NOTE: Invert timestamp to store in reverse order - // because forward iteration is faster - series.insert(ts, value)?; + let data_point_key = Self::format_data_point_key(series_id, ts); + self.0.data.insert(data_point_key, value.to_be_bytes())?; Ok(()) } @@ -341,7 +339,7 @@ impl Database { series_key: &str, metric: MetricName, tags: &TagSet, - ) -> crate::Result { + ) -> crate::Result { // NOTE: We need to run in a transaction (for serializability) // // Because we cannot rely on the series not being created since the @@ -354,42 +352,17 @@ impl Database { reader.read_u64::().expect("should deserialize") }); - let series = if let Some(series_id) = series_id { + let series_id = if let Some(series_id) = series_id { // NOTE: Series was created since the start of the function - - self.0 - .series - .read() - .expect("lock is poisoned") - .get(&series_id) - .cloned() - .expect("series should exist") + series_id } else { // NOTE: Actually create series - let mut series_lock = self.0.series.write().expect("lock is poisoned"); - let next_series_id = series_lock.keys().max().map(|x| x + 1).unwrap_or_default(); + // TODO: atomic, persistent counter + let next_series_id = self.0.smap.partition.inner().len()? as SeriesId; log::trace!("Creating series {next_series_id} for permutation {series_key:?}"); - let partition = self.0.keyspace.open_partition( - &Self::get_series_name(next_series_id), - PartitionCreateOptions::default() - .manual_journal_persist(self.0.hyper_mode) - .block_size(64_000) - .compression(fjall::CompressionType::Lz4), - )?; - - series_lock.insert( - next_series_id, - Series { - id: next_series_id, - inner: partition.inner().clone(), - }, - ); - - drop(series_lock); - self.0.smap.insert(&mut tx, series_key, next_series_id); self.0 @@ -405,14 +378,10 @@ impl Database { tx.commit()?; - // NOTE: Get inner because we don't want to insert and read series data in a transactional context - Series { - id: next_series_id, - inner: partition.inner().clone(), - } + next_series_id }; - Ok(series) + Ok(series_id) } /// Flushes writes. @@ -441,6 +410,122 @@ mod tests { use crate::tagset; use test_log::test; + #[test] + fn test_range_cnt() -> crate::Result<()> { + let folder = tempfile::tempdir()?; + let db = Database::builder().open(&folder)?; + let metric_name = MetricName::try_from("hello").unwrap(); + + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "service" => "talna", + ), + )?; + db.write_at( + metric_name, + 1, + 10.0, + tagset!( + "service" => "talna", + ), + )?; + db.write_at( + metric_name, + 2, + 6.0, + tagset!( + "service" => "talna", + ), + )?; + db.write_at( + metric_name, + 3, + 10.0, + tagset!( + "service" => "talna", + ), + )?; + db.write_at( + metric_name, + 4, + 20.0, + tagset!( + "service" => "talna", + ), + )?; + + { + let aggregator = db.count(metric_name, "service").start(2).build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("talna")); + + for (group, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + + match group.as_ref() { + "talna" => { + assert_eq!(3.0, bucket.value); + assert_eq!(2, bucket.start); + assert_eq!(4, bucket.end); + assert_eq!(3, bucket.len); + } + _ => { + unreachable!(); + } + } + } + } + + { + let aggregator = db.count(metric_name, "service").end(3).build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("talna")); + + for (group, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + + match group.as_ref() { + "talna" => { + assert_eq!(4.0, bucket.value); + assert_eq!(0, bucket.start); + assert_eq!(3, bucket.end); + assert_eq!(4, bucket.len); + } + _ => { + unreachable!(); + } + } + } + } + + { + let aggregator = db.count(metric_name, "service").start(1).end(3).build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("talna")); + + for (group, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + + match group.as_ref() { + "talna" => { + assert_eq!(3.0, bucket.value); + assert_eq!(1, bucket.start); + assert_eq!(3, bucket.end); + assert_eq!(3, bucket.len); + } + _ => { + unreachable!(); + } + } + } + } + + Ok(()) + } + #[test] fn test_agg_cnt() -> crate::Result<()> { let folder = tempfile::tempdir()?; diff --git a/src/duration.rs b/src/duration.rs index 7d457e9..0c518d7 100644 --- a/src/duration.rs +++ b/src/duration.rs @@ -88,6 +88,24 @@ impl Duration { /// Formats N seconds as nanosecond time frame. #[must_use] pub const fn seconds(n: f64) -> u128 { - (n as u128) * 1_000_000_000 + Self::millis(n) * 1_000 + } + + /// Formats N milliseconds as nanosecond time frame. + #[must_use] + pub const fn millis(n: f64) -> u128 { + Self::micros(n) * 1_000 + } + + /// Formats N microseconds as nanosecond time frame. + #[must_use] + pub const fn micros(n: f64) -> u128 { + Self::nanos(n) * 1_000 + } + + /// Formats N nanoseconds as nanosecond time frame. + #[must_use] + pub const fn nanos(n: f64) -> u128 { + n as u128 } }