diff --git a/Cargo.toml b/Cargo.toml index cd3c0a9..1a1c59b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,16 @@ edition = "2021" [dependencies] regex = "1.8.1" +toml = "0.7.4" +anyhow = "1.0.71" +multimap = "0.9.0" async-trait = "0.1.68" hashed_wheel_timer = "0.1.1" petgraph = "0.6.3" dyn-clone = "1.0.11" +clap = { version = "4.3.0", features = ["derive", "string"] } +axum = "0.6.18" +log = "0.4.18" +env_logger = "0.10.0" tokio = { version = "1.28.1", features = ["full"] } -bytes = { version = "1", features = ["serde"] } \ No newline at end of file +bytes = { version = "1", features = ["serde"] } diff --git a/src/common/configuration/configuration.rs b/src/common/configuration/configuration.rs new file mode 100644 index 0000000..ff10ed4 --- /dev/null +++ b/src/common/configuration/configuration.rs @@ -0,0 +1,130 @@ +use std::{ + collections::{HashMap, HashSet}, + todo, +}; + +use hashed_wheel_timer::WheelTimer; + +use super::{ + configuration_entry::ConfigurationEntry, + provider::{ + map_provider::MapProvider, provider_factory::ProviderFactory, + secret_provider::SecretProvider, Provider, + }, +}; + +pub const CONFIG_PROVIDER_KEY: &str = "config.providers"; +pub const CONFIG_PROVIDERS_DEFAULT: &str = + "PropertiesFile,Environment,SystemProperties,CommandLine,RuntimeOverride"; +pub const CONFIG_PROVIDERS_REX: &str = r"^[\w+\d+,\/\.:\|\\\- ]+$"; + +pub const PLUGIN_DIRECTORY_KEY: &str = "config.plugin.path"; +pub const PLUGIN_DIRECTORY_REX: &str = r"^[\w+\d+\/\.\\:\-]+$"; + +pub const CONFIG_RELOAD_INTERVAL_KEY: &str = "config.reload.interval"; + +pub struct Configuration { + merged_config: HashMap, + providers: Vec>, + secret_providers: HashMap>, + provider_config: String, + provider_path: String, + factories: Vec>, + reload_keys: HashSet, + timer: WheelTimer, +} + +impl Clone for Configuration { + fn clone(&self) -> Self { + todo!() + } +} + +impl Configuration { + pub fn new() -> Self { + let timer = WheelTimer::new(0, 0).unwrap(); + + Configuration { + merged_config: HashMap::new(), + providers: Vec::new(), + secret_providers: HashMap::new(), + provider_config: "".to_string(), + provider_path: "".to_string(), + factories: Vec::new(), + reload_keys: HashSet::new(), + timer, + } + } + + pub fn get_port(&self) -> i32 { + todo!() + } + + pub fn get_ssl_port(&self) -> i32 { + todo!() + } + + pub fn get_bind(&self) -> String { + todo!() + } + + pub fn get_root(&self) -> String { + todo!() + } + + pub fn get_load_plugins(&self) -> bool { + todo!() + } + + pub fn has_property(&self, key: String) -> bool { + if key.is_empty() { + // TODO: Exception, key cannot be null or empty + return false; + } + match self.merged_config.get(&key) { + Some(e) => return e.schema().is_null(), + None => return false, + } + } + + pub fn register_by_string_value( + &self, + key: String, + default_value: String, + is_dynamic: bool, + description: String, + ) { + todo!() + } + + pub fn register_by_bool_value( + &self, + key: String, + default_value: bool, + is_dynamic: bool, + description: String, + ) { + todo!() + } + + pub fn register_by_int_value( + &self, + key: String, + default_value: i32, + is_dynamic: bool, + description: String, + ) { + todo!() + } + + pub fn get_int(&self, key: String) -> i32 { + todo!() + } + + // pub async fn new_configuration_with_map_provider( + // &self, + // cli_args: Vec, + // map_provider: MapProvider, + // ) -> Self { todo!() + // } +} diff --git a/src/common/configuration/configuration_entry.rs b/src/common/configuration/configuration_entry.rs index b03cf06..4d1aeba 100644 --- a/src/common/configuration/configuration_entry.rs +++ b/src/common/configuration/configuration_entry.rs @@ -1,9 +1,9 @@ use std::collections::HashSet; use super::{ - configuration_callback::ConfigurationCallback, + configuration::Configuration, configuration_callback::ConfigurationCallback, configuration_entry_schema::ConfigurationEntrySchema, - configuration_override::ConfigurationOverride, Configuration, + configuration_override::ConfigurationOverride, }; pub(crate) struct ConfigurationEntry { @@ -12,3 +12,9 @@ pub(crate) struct ConfigurationEntry { settings: Vec, callbacks: HashSet>, } + +impl ConfigurationEntry { + pub fn schema(&self) -> &ConfigurationEntrySchema { + &self.schema + } +} diff --git a/src/common/configuration/configuration_entry_schema.rs b/src/common/configuration/configuration_entry_schema.rs index ee47d38..8b3254d 100644 --- a/src/common/configuration/configuration_entry_schema.rs +++ b/src/common/configuration/configuration_entry_schema.rs @@ -2,7 +2,7 @@ use super::configuration_value_validator::ConfigurationValueValidator; pub(crate) struct ConfigurationEntrySchema { key: String, - //typ: T, + // typ: T, source: String, description: String, validator: Box, @@ -13,3 +13,9 @@ pub(crate) struct ConfigurationEntrySchema { help_level: String, meta: String, } + +impl ConfigurationEntrySchema { + pub fn is_null(&self) -> bool { + todo!() + } +} diff --git a/src/common/configuration/mod.rs b/src/common/configuration/mod.rs index ba52069..359f707 100644 --- a/src/common/configuration/mod.rs +++ b/src/common/configuration/mod.rs @@ -1,18 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - todo, -}; - -use hashed_wheel_timer::WheelTimer; - -use self::{ - configuration_entry::ConfigurationEntry, - provider::{ - map_provider::MapProvider, provider_factory::ProviderFactory, - secret_provider::SecretProvider, Provider, - }, -}; - +pub mod configuration; pub mod configuration_callback; pub mod configuration_entry; pub mod configuration_entry_schema; @@ -20,44 +6,3 @@ pub mod configuration_override; pub mod configuration_value_validator; pub mod error; pub mod provider; - -pub const CONFIG_PROVIDER_KEY: &str = "config.providers"; -pub const CONFIG_PROVIDERS_DEFAULT: &str = - "PropertiesFile,Environment,SystemProperties,CommandLine,RuntimeOverride"; -pub const CONFIG_PROVIDERS_REX: &str = r"^[\w+\d+,\/\.:\|\\\- ]+$"; - -pub const PLUGIN_DIRECTORY_KEY: &str = "config.plugin.path"; -pub const PLUGIN_DIRECTORY_REX: &str = r"^[\w+\d+\/\.\\:\-]+$"; - -pub const CONFIG_RELOAD_INTERVAL_KEY: &str = "config.reload.interval"; - -pub(crate) struct Configuration { - merged_config: HashMap, - providers: Vec>, - secret_providers: HashMap>, - provider_config: String, - provider_path: String, - factories: Vec>, - reload_keys: HashSet, - timer: WheelTimer, -} - -impl Clone for Configuration { - fn clone(&self) -> Self { - todo!() - } -} - -impl Configuration { - pub async fn new_configuration(&self, properties: HashMap) -> Self { - todo!() - } - - pub async fn new_configuration_with_map_provider( - &self, - cli_args: Vec, - map_provider: MapProvider, - ) -> Self { - todo!() - } -} diff --git a/src/common/configuration/provider/provider_factory.rs b/src/common/configuration/provider/provider_factory.rs index 381ff24..c584ec2 100644 --- a/src/common/configuration/provider/provider_factory.rs +++ b/src/common/configuration/provider/provider_factory.rs @@ -1,8 +1,7 @@ use hashed_wheel_timer::WheelTimer; -use crate::common::configuration::Configuration; - use super::Provider; +use crate::common::configuration::configuration::Configuration; #[async_trait::async_trait] pub(crate) trait ProviderFactory { diff --git a/src/common/configuration/provider/secret_provider.rs b/src/common/configuration/provider/secret_provider.rs index 4917986..a924c13 100644 --- a/src/common/configuration/provider/secret_provider.rs +++ b/src/common/configuration/provider/secret_provider.rs @@ -1,9 +1,8 @@ use bytes::Bytes; use hashed_wheel_timer::WheelTimer; -use crate::common::configuration::Configuration; - use super::provider_factory::ProviderFactory; +use crate::common::configuration::configuration::Configuration; #[async_trait::async_trait] pub(crate) trait SecretProvider { diff --git a/src/common/core/base_tsdb_plugin.rs b/src/common/core/base_tsdb_plugin.rs index d6a1b6f..d816fd2 100644 --- a/src/common/core/base_tsdb_plugin.rs +++ b/src/common/core/base_tsdb_plugin.rs @@ -15,7 +15,7 @@ impl TSDBPlugin for BaseTSDBPlugin { self.id.clone() } - fn typ(&self) -> String { + fn get_type(&self) -> String { todo!() } diff --git a/src/common/core/registry.rs b/src/common/core/registry.rs index 10039da..0006450 100644 --- a/src/common/core/registry.rs +++ b/src/common/core/registry.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use dyn_clone::{clone_trait_object, DynClone}; use crate::{ @@ -14,7 +16,6 @@ use crate::{ }, core::pool::shared_obj::SharedObject, }; -use std::collections::HashMap; #[async_trait::async_trait] pub(crate) trait Registry: DynClone { @@ -22,7 +23,7 @@ pub(crate) trait Registry: DynClone { async fn cleanup_pool(&self) -> ExecutorService; - async fn register_plugin(&self, id: String, plugin: Box); + async fn register_plugin(&self, id: String, plugin: Box); fn get_default_plugin(&self) -> Box; @@ -57,14 +58,10 @@ pub(crate) trait Registry: DynClone { clone_trait_object!(Registry); -pub(crate) trait RegistryGetQueryOpt -where - B: Builder, - C: QueryNodeConfig, -{ - fn get_query_node_factory(&self, id: String) -> Box>; +pub(crate) trait RegistryGetQueryOpt { + fn get_query_node_factory(&self, id: String) -> Box; - fn get_query_iter_factory(&self, id: String) -> Box>; + fn get_query_iter_factory(&self, id: String) -> Box; fn get_query_iter_interpolator_factory(&self, id: String) -> Box; } diff --git a/src/common/core/tsdb.rs b/src/common/core/tsdb.rs index ae36654..4e1d90e 100644 --- a/src/common/core/tsdb.rs +++ b/src/common/core/tsdb.rs @@ -1,13 +1,17 @@ use dyn_clone::{clone_trait_object, DynClone}; use hashed_wheel_timer::WheelTimer; -use crate::common::configuration::Configuration; -use crate::common::core::registry::Registry; -use crate::common::pool::executor::ExecutorService; -use crate::common::query::query_context::QueryContext; -use crate::common::query::query_node_config::{Builder, QueryNodeConfig}; -use crate::common::stats::stats_collector::StatsCollector; -use crate::common::threadpools::tsdb_thread_pool_executor::TSDBThreadPoolExecutor; +use crate::common::{ + configuration::configuration::Configuration, + core::registry::Registry, + pool::executor::ExecutorService, + query::{ + query_context::QueryContext, + query_node_config::{Builder, QueryNodeConfig}, + }, + stats::stats_collector::StatsCollector, + threadpools::tsdb_thread_pool_executor::TSDBThreadPoolExecutor, +}; #[async_trait::async_trait] #[allow(clippy::upper_case_acronyms)] pub(crate) trait TSDB: DynClone + Send + Sync { diff --git a/src/common/core/tsdb_plugin.rs b/src/common/core/tsdb_plugin.rs index 8a09f90..e689ec2 100644 --- a/src/common/core/tsdb_plugin.rs +++ b/src/common/core/tsdb_plugin.rs @@ -1,8 +1,8 @@ use crate::common::core::tsdb::TSDB; #[async_trait::async_trait] -pub(crate) trait TSDBPlugin: Send + Sync { - fn typ(&self) -> String; +pub(crate) trait TSDBPlugin { + fn get_type(&self) -> String; fn id(&self) -> String; diff --git a/src/common/data/low_level_time_series_data.rs b/src/common/data/low_level_time_series_data.rs new file mode 100644 index 0000000..d4958eb --- /dev/null +++ b/src/common/data/low_level_time_series_data.rs @@ -0,0 +1,87 @@ +use bytes::Bytes; + +use super::timestamp::TimeStamp; + +pub(crate) enum StringFormat { + ASCIISTRING, + UTF8STRING, + ENCODED, +} + +pub(crate) trait LowLevelTimeSeriesData { + fn advance(&self) -> bool; + + fn has_parsing_error(&self) -> bool; + + fn parsing_error(&self) -> String; + + fn timestamp(&self) -> Box; + + fn tags_buffer(&self) -> Vec; + + fn tag_buffer_start(&self) -> i32; + + fn tag_buffer_length(&self) -> i32; + + fn tags_format(&self) -> StringFormat; + + fn tag_delimiter(&self) -> Bytes; + + fn tag_set_count(&self) -> i32; + + fn advance_tag_pair(&self) -> bool; + + fn tag_key_start(&self) -> i32; + + fn tag_key_length(&self) -> i32; + + fn tag_value_start(&self) -> i32; + + fn tag_value_length(&self) -> i32; + + fn common_tags(&self) -> bool; + + fn common_timestamp(&self) -> bool; +} + +pub(crate) trait HashedLowLevelTimeSeriesData: LowLevelTimeSeriesData { + fn time_series_hash(&self) -> u64; + + fn tags_set_hash(&self) -> u64; + + fn tag_pair_hash(&self) -> u64; + + fn tag_key_hash(&self) -> u64; + + fn tag_value_hash(&self) -> u64; +} + +pub(crate) trait NamespacedLowLevelTimeSeriesData: LowLevelTimeSeriesData { + fn namespace_buffer(&self) -> Vec; + + fn namespace_start(&self) -> i32; + + fn namespace_length(&self) -> i32; + + fn namespace_format(&self) -> StringFormat; +} + +pub(crate) trait HashedNamespacedLowLevelTimeSeriesData: + NamespacedLowLevelTimeSeriesData + HashedLowLevelTimeSeriesData +{ + fn namespace_hash(&self) -> u64; +} + +pub(crate) trait RetiredLowLevelTimeSeriesData: LowLevelTimeSeriesData { + fn initial_epoch(&self) -> i32; +} + +pub(crate) trait RetiredHashedLowLevelTimeSeriesData: HashedLowLevelTimeSeriesData { + fn initial_epoch(&self) -> i32; +} + +pub(crate) trait NamespacedRetiredHashedLowLevelTimeSeriesData: + NamespacedLowLevelTimeSeriesData + HashedLowLevelTimeSeriesData +{ + fn initial_epoch(&self) -> i32; +} diff --git a/src/common/data/mod.rs b/src/common/data/mod.rs index 353b951..b324bf4 100644 --- a/src/common/data/mod.rs +++ b/src/common/data/mod.rs @@ -1,13 +1,19 @@ +pub mod low_level_time_series_data; pub mod partial_time_series; pub mod partial_time_series_set; pub mod time_series; pub mod time_series_byte_id; +pub mod time_series_data_consumer_factory; pub mod time_series_data_source; pub mod time_series_data_source_config; +pub mod time_series_data_source_factory; pub mod time_series_datatype; +pub mod time_series_datum; +pub mod time_series_datum_id; +pub mod time_series_id; +pub mod time_series_shared_tags_and_time_data; +pub mod time_series_string_id; pub mod time_series_value; pub mod time_specification; pub mod timestamp; pub mod typed_time_series_iter; - -pub(crate) trait Comparable: PartialEq + PartialOrd + Eq + Ord {} diff --git a/src/common/data/partial_time_series.rs b/src/common/data/partial_time_series.rs index fff8bfb..a2f431c 100644 --- a/src/common/data/partial_time_series.rs +++ b/src/common/data/partial_time_series.rs @@ -1,6 +1,6 @@ use super::{ - partial_time_series_set::PartialTimeSeriesSet, time_series_byte_id::TimeSeriesID, - time_series_datatype::TimeSeriesDataType, + partial_time_series_set::PartialTimeSeriesSet, time_series_datatype::TimeSeriesDataType, + time_series_id::TimeSeriesID, }; pub(crate) trait PartialTimeSeries: TimeSeriesDataType { diff --git a/src/common/data/time_series.rs b/src/common/data/time_series.rs index 908531e..54d7870 100644 --- a/src/common/data/time_series.rs +++ b/src/common/data/time_series.rs @@ -1,5 +1,5 @@ use super::{ - time_series_byte_id::TimeSeriesID, time_series_datatype::TimeSeriesDataType, + time_series_id::TimeSeriesID, time_series_datatype::TimeSeriesDataType, typed_time_series_iter::TypedTimeSeriesIterator, }; diff --git a/src/common/data/time_series_byte_id.rs b/src/common/data/time_series_byte_id.rs index 1831ae0..243b266 100644 --- a/src/common/data/time_series_byte_id.rs +++ b/src/common/data/time_series_byte_id.rs @@ -1,11 +1,31 @@ -pub(crate) trait TimeSeriesID { - // True if the fields are encoded using a format specified by a storage engine. - fn encode(&self) -> bool; +use std::collections::{HashMap, HashSet}; - // A signed 64 bit hash code for collision reduction. - // return a hash as u64 - fn hashcode(&self) -> u64; +use bytes::Bytes; - // The type series dealt with. Either a `TimeSeriesByteID` or `TimeSeriesStringID`. - // fn get_type(&self, _: T) -> &str; +use super::{ + time_series_data_source_factory::TimeSeriesDataSourceFactory, time_series_id::TimeSeriesID, + time_series_string_id::TimeSeriesStringID, +}; +use crate::common::stats::span::Span; + +pub(crate) trait TimeSeriesByteID: TimeSeriesID { + fn data_store(&self) -> Box; + + fn alias(&self) -> Vec; + + fn namespace(&self) -> Vec; + + fn metric(&self) -> Vec; + + fn tags(&self) -> HashMap, Vec>; + + fn aggregated_tags(&self) -> Vec>; + + fn disjoint_tags(&self) -> Vec>; + + fn unique_ids(&self) -> HashSet; + + fn skip_metric(&self) -> bool; + + fn decode(&self, cache: bool, span: Box) -> Box; } diff --git a/src/common/data/time_series_data_consumer_factory.rs b/src/common/data/time_series_data_consumer_factory.rs new file mode 100644 index 0000000..278d761 --- /dev/null +++ b/src/common/data/time_series_data_consumer_factory.rs @@ -0,0 +1,7 @@ +use crate::common::{ + core::tsdb_plugin::TSDBPlugin, storage::time_series_data_consumer::TimeSeriesDataConsumer, +}; + +pub(crate) trait TimeSeriesDataConsumerFactory: TSDBPlugin { + fn consumer(&self) -> Box; +} diff --git a/src/common/data/time_series_data_source.rs b/src/common/data/time_series_data_source.rs index a784545..8dbe312 100644 --- a/src/common/data/time_series_data_source.rs +++ b/src/common/data/time_series_data_source.rs @@ -1,12 +1,3 @@ -use crate::common::query::{query_node::QueryNode, query_node_config::Builder}; +use crate::common::query::query_node::QueryNode; -use super::time_series_data_source_config::TimeSeriesDataSourceConfig; - -// TODO: C is different from T -pub(crate) trait TimeSeriesDataSource -where - Self: QueryNode, - B: Builder, - C: TimeSeriesDataSourceConfig, -{ -} +pub(crate) trait TimeSeriesDataSource: QueryNode {} diff --git a/src/common/data/time_series_data_source_config.rs b/src/common/data/time_series_data_source_config.rs index 1b9162a..8fbd210 100644 --- a/src/common/data/time_series_data_source_config.rs +++ b/src/common/data/time_series_data_source_config.rs @@ -1,9 +1,3 @@ -use crate::common::query::query_node_config::{Builder, QueryNodeConfig}; +use crate::common::query::query_node_config::QueryNodeConfig; -pub(crate) trait TimeSeriesDataSourceConfig -where - Self: QueryNodeConfig, - B: Builder, - C: TimeSeriesDataSourceConfig, -{ -} +pub(crate) trait TimeSeriesDataSourceConfig: QueryNodeConfig {} diff --git a/src/common/data/time_series_data_source_factory.rs b/src/common/data/time_series_data_source_factory.rs new file mode 100644 index 0000000..18309c8 --- /dev/null +++ b/src/common/data/time_series_data_source_factory.rs @@ -0,0 +1,45 @@ +use bytes::Bytes; + +use super::{ + time_series_byte_id::TimeSeriesByteID, + time_series_data_source_config::TimeSeriesDataSourceConfig, time_series_id::TimeSeriesID, + time_series_string_id::TimeSeriesStringID, +}; +use crate::common::{ + query::{query_node_config::QueryNodeConfig, query_pipeline_context::QueryPipelineContext}, + rollup::rollup_config::RollupConfig, + stats::span::Span, +}; + +#[async_trait::async_trait] +pub(crate) trait TimeSeriesDataSourceFactory { + fn id_type(&self) -> Box; + + fn supports_query( + &self, + context: Box, + config: Box, + ) -> bool; + + fn supports_push_down(&self, operation: Box) -> bool; + + async fn resolve_byte_id( + &self, + id: Box, + span: Box, + ) -> Box; + + async fn encode_join_keys( + &self, + join_keys: Vec, + span: Box, + ) -> Vec>; + + async fn encode_join_metrics( + &self, + join_metrics: Vec, + span: Box, + ) -> Vec>; + + fn rollup_config(&self) -> Box; +} diff --git a/src/common/data/time_series_datum.rs b/src/common/data/time_series_datum.rs new file mode 100644 index 0000000..9e317de --- /dev/null +++ b/src/common/data/time_series_datum.rs @@ -0,0 +1,17 @@ +use super::{ + time_series_datatype::TimeSeriesDataType, time_series_datum_id::TimeSeriesDatumID, + time_series_value::TimeSeriesValue, +}; + +pub(crate) trait TimeSeriesDatum { + fn id(&self) -> Box; + + fn value(&self) -> Box; +} + +pub(crate) fn wrap( + id: Box, + value: Box, +) -> Box { + todo!() +} diff --git a/src/common/data/time_series_datum_id.rs b/src/common/data/time_series_datum_id.rs new file mode 100644 index 0000000..ebe062b --- /dev/null +++ b/src/common/data/time_series_datum_id.rs @@ -0,0 +1,21 @@ +use std::hash::Hash; + +use super::time_series_id::TimeSeriesID; + +pub(crate) trait TimeSeriesDatumID { + fn get_type(&self) -> Box; +} + +impl PartialEq for Box { + fn eq(&self, other: &Self) -> bool { + todo!() + } +} + +impl Eq for Box {} + +impl Hash for Box { + fn hash(&self, state: &mut H) { + todo!() + } +} diff --git a/src/common/data/time_series_id.rs b/src/common/data/time_series_id.rs new file mode 100644 index 0000000..3a81496 --- /dev/null +++ b/src/common/data/time_series_id.rs @@ -0,0 +1,11 @@ +pub(crate) trait TimeSeriesID { + // True if the fields are encoded using a format specified by a storage engine. + fn encode(&self) -> bool; + + // A signed 64 bit hash code for collision reduction. + // return a hash as u64 + fn hashcode(&self) -> u64; + + // The type series dealt with. Either a `TimeSeriesByteID` or + // `TimeSeriesStringID`. fn get_type(&self, _: T) -> &str; +} diff --git a/src/common/data/time_series_shared_tags_and_time_data.rs b/src/common/data/time_series_shared_tags_and_time_data.rs new file mode 100644 index 0000000..2ec7cfd --- /dev/null +++ b/src/common/data/time_series_shared_tags_and_time_data.rs @@ -0,0 +1,15 @@ +use std::collections::HashMap; + +use multimap::MultiMap; + +use super::{time_series_datatype::TimeSeriesDataType, timestamp::TimeStamp}; + +pub(crate) trait TimeSeriesShardTagsAndTimeData { + fn timestamp(&self) -> Box; + + fn tags(&self) -> HashMap; + + fn data(&self) -> MultiMap>; + + fn size(&self) -> i32; +} diff --git a/src/common/data/time_series_string_id.rs b/src/common/data/time_series_string_id.rs new file mode 100644 index 0000000..f6bba9c --- /dev/null +++ b/src/common/data/time_series_string_id.rs @@ -0,0 +1,23 @@ +use std::collections::{HashMap, HashSet}; + +use super::time_series_id::TimeSeriesID; + +pub(crate) trait TimeSeriesStringID: TimeSeriesID { + fn alias(&self) -> String; + + fn namespace(&self) -> String; + + fn metric(&self) -> String; + + fn tags(&self) -> HashMap; + + fn get_tag_value(&self, key: String) -> String; + + fn aggregated_tags(&self) -> Vec; + + fn disjoint_tags(&self) -> Vec; + + fn unique_ids(&self) -> HashSet; + + fn hits(&self) -> u64; +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 4c1fc4d..2aad326 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -5,4 +5,5 @@ pub mod pool; pub mod query; pub mod rollup; pub mod stats; +pub mod storage; pub mod threadpools; diff --git a/src/common/query/mod.rs b/src/common/query/mod.rs index b034cf8..abb8d52 100644 --- a/src/common/query/mod.rs +++ b/src/common/query/mod.rs @@ -2,6 +2,7 @@ pub mod filter; pub mod interpolation; pub mod plan; pub mod query_context; +pub mod query_context_builder; pub mod query_fill_policy; pub mod query_iter_factory; pub mod query_mode; diff --git a/src/common/query/plan/query_plan.rs b/src/common/query/plan/query_plan.rs index 58f4777..6c5a69b 100644 --- a/src/common/query/plan/query_plan.rs +++ b/src/common/query/plan/query_plan.rs @@ -31,67 +31,55 @@ pub(crate) struct TimeAdjustments { // } #[async_trait::async_trait] -pub(crate) trait QueryPlanner -where - B: Builder, - C: QueryNodeConfig, -{ - async fn plan(&self, span: Box); +pub(crate) trait QueryPlanner { + async fn plan(&self, span: Box); async fn replace( &self, - old_config: Box>, - new_config: Box>, + old_config: Box, + new_config: Box, ); async fn add_edge( &self, - from: Box>, - to: Box>, + from: Box, + to: Box, ) -> bool; async fn remove_edge( &self, - from: Box>, - to: Box>, + from: Box, + to: Box, ) -> bool; - async fn remove_node(&self, config: Box>) -> bool; + async fn remove_node(&self, config: Box) -> bool; - async fn graph(&self) -> Graph>, Box>>; + fn graph(&self) -> Graph, Box>; - async fn config_graph( - &self, - ) -> Graph>, Box>>; + fn config_graph(&self) -> Graph, Box>; - async fn context(&self) -> Box>; + fn context(&self) -> Box; - async fn node_for_id(&self, id: String) -> Box>; + fn node_for_id(&self, id: String) -> Box; - async fn get_factory( - &self, - node: Box>, - ) -> Box>; + fn get_factory(&self, node: Box) -> Box; async fn terminal_source_node( &self, - config: Box>, - ) -> Vec>>; + config: Box, + ) -> Vec>; - async fn get_metric_for_data_source( + fn get_metric_for_data_source( &self, - node: Box>, + node: Box, data_source_id: String, ) -> String; - async fn get_adjustments( - &self, - config: Box>, - ) -> TimeAdjustments; + fn get_adjustments(&self, config: Box) -> TimeAdjustments; async fn base_setup_graph( &self, - context: Box>, - config: Box>, + context: Box, + config: Box, ); } diff --git a/src/common/query/query_context.rs b/src/common/query/query_context.rs index 7e3c960..f27dfd4 100644 --- a/src/common/query/query_context.rs +++ b/src/common/query/query_context.rs @@ -2,39 +2,42 @@ use std::collections::HashMap; use dyn_clone::{clone_trait_object, DynClone}; -use super::query_sink_config::QuerySinkConfig; -use super::time_series_query::TimeSeriesQuery; -use super::{query_mode::QueryMode, query_sink::QuerySink}; -use crate::common::stats::query_stats::QueryStats; -use crate::common::{core::tsdb::TSDB, data::time_series_byte_id::TimeSeriesID, stats::span::Span}; +use super::{ + query_mode::QueryMode, query_sink::QuerySink, query_sink_config::QuerySinkConfig, + time_series_query::TimeSeriesQuery, +}; +use crate::common::{ + core::tsdb::TSDB, + data::time_series_id::TimeSeriesID, + stats::{query_stats::QueryStats, span::Span}, +}; -#[async_trait::async_trait] pub(crate) trait QueryContext: DynClone { - async fn sinks(&self) -> Vec>; + fn sinks(&self) -> Vec>; - async fn mode(&self) -> QueryMode; + fn mode(&self) -> QueryMode; - async fn fetch_next(&self, span: Box); + fn fetch_next(&self, span: Box); - async fn close(&self); + fn close(&self); fn is_closed(&self) -> bool; - async fn stats(&self) -> Box; + fn stats(&self) -> Box; - async fn sink_configs(&self) -> Vec>; + fn sink_configs(&self) -> Vec>; - async fn query(&self) -> Box; + fn query(&self) -> Box; - async fn tsdb(&self) -> Box; + fn tsdb(&self) -> Box; // async fn auth_state(&self) -> AuthState; - async fn headers(&self) -> HashMap; + fn headers(&self) -> HashMap; fn cacheable(&self) -> bool; - async fn initialize(&self, span: Box); + fn initialize(&self, span: Box); fn get_id(&self, hash: u64) -> Box; diff --git a/src/common/query/query_context_builder.rs b/src/common/query/query_context_builder.rs new file mode 100644 index 0000000..63b087b --- /dev/null +++ b/src/common/query/query_context_builder.rs @@ -0,0 +1,27 @@ +use std::collections::HashMap; + +use super::{ + query_context::QueryContext, query_mode::QueryMode, query_sink::QuerySink, + query_sink_config::QuerySinkConfig, time_series_query::TimeSeriesQuery, +}; +use crate::common::stats::query_stats::QueryStats; + +pub(crate) trait QueryContextBuilder { + fn set_query(&mut self, query: Box); + + fn set_mode(&mut self, mode: QueryMode); + + fn set_stats(&mut self, stats: Box); + + fn set_sinks(&mut self, configs: Vec>); + + fn add_sink(&mut self, sink: Box); + + fn set_local_sinks(&mut self, sinks: Vec>); + + // fn set_auth_state(&mut self, auth_state: AuthState); + + fn set_header(&mut self, headers: HashMap); + + fn build(&self) -> Box; +} diff --git a/src/common/query/query_iter_factory.rs b/src/common/query/query_iter_factory.rs index 42e93c9..dd64124 100644 --- a/src/common/query/query_iter_factory.rs +++ b/src/common/query/query_iter_factory.rs @@ -1,26 +1,21 @@ use std::collections::HashMap; -use crate::common::data::{ - time_series::TimeSeries, time_series_datatype::TimeSeriesDataType, - typed_time_series_iter::TypedTimeSeriesIterator, -}; - use super::{ query_node::QueryNode, query_node_config::{Builder, QueryNodeConfig}, query_result::QueryResult, }; +use crate::common::data::{ + time_series::TimeSeries, time_series_datatype::TimeSeriesDataType, + typed_time_series_iter::TypedTimeSeriesIterator, +}; -pub(crate) trait QueryIteratorFactory -where - B: Builder, - C: QueryNodeConfig, -{ +pub(crate) trait QueryIteratorFactory { fn types(&self) -> Vec>; fn new_iterator( &self, - node: Box>, + node: Box, result: Box, sources: Vec>, typ: Box, @@ -28,7 +23,7 @@ where fn new_iterator_with_map( &self, - node: Box>, + node: Box, result: Box, sources: HashMap>, typ: Box, diff --git a/src/common/query/query_node.rs b/src/common/query/query_node.rs index 15f4310..8725112 100644 --- a/src/common/query/query_node.rs +++ b/src/common/query/query_node.rs @@ -1,38 +1,31 @@ -use crate::common::{data::partial_time_series::PartialTimeSeries, stats::span::Span}; - use super::{ - query_node_config::{Builder, QueryNodeConfig}, - query_node_factory::QueryNodeFactory, - query_pipeline_context::QueryPipelineContext, - query_result::QueryResult, + query_node_config::QueryNodeConfig, query_node_factory::QueryNodeFactory, + query_pipeline_context::QueryPipelineContext, query_result::QueryResult, }; +use crate::common::{data::partial_time_series::PartialTimeSeries, stats::span::Span}; #[async_trait::async_trait] -pub(crate) trait QueryNode -where - C: QueryNodeConfig, - B: Builder, -{ - async fn factory(&self) -> Box>; +pub(crate) trait QueryNode { + async fn factory(&self) -> Box; - async fn pipeline_context(&self) -> Box>; + async fn pipeline_context(&self) -> Box; - async fn initialize(&self, span: Box); + async fn initialize(&self, span: Box); - fn config(&self) -> Box>; + fn config(&self) -> Box; async fn close(&self); async fn on_complete( &self, - downstream: Box>, + downstream: Box, final_seq: u64, total_seq: u64, ); - async fn on_next(&self, next: Box); + async fn on_next(&self, next: Box); - async fn on_next_by_partial(&self, next: Box); + async fn on_next_by_partial(&self, next: Box); async fn on_error(&self); } diff --git a/src/common/query/query_node_config.rs b/src/common/query/query_node_config.rs index 7ec469e..620edc2 100644 --- a/src/common/query/query_node_config.rs +++ b/src/common/query/query_node_config.rs @@ -1,35 +1,29 @@ use std::collections::HashMap; -use std::hash::Hash; - -use crate::common::configuration::Configuration; use super::{query_node_config_options::QueryNodeConfigOptions, query_result_id::QueryResultID}; +use crate::common::configuration::configuration::Configuration; #[async_trait::async_trait] -pub(crate) trait Builder -where - B: Builder, - C: QueryNodeConfig, -{ - fn set_id(&self, id: String) -> B; +pub(crate) trait Builder { + fn set_id(&self, id: String) -> Box; - fn set_type(&self, typ: String) -> B; + fn set_type(&self, typ: String) -> Box; - fn set_sources(&self, sources: Vec) -> B; + fn set_sources(&self, sources: Vec) -> Box; - fn add_source(&self, source: String) -> B; + fn add_source(&self, source: String) -> Box; - fn set_overrides(&self, overrides: HashMap) -> B; + fn set_overrides(&self, overrides: HashMap) -> Box; - fn add_overrides(&self, key: String, value: String) -> B; + fn add_overrides(&self, key: String, value: String) -> Box; - fn set_result_ids(&self, result_ids: Vec>) -> B; + fn set_result_ids(&self, result_ids: Vec>) -> Box; - fn add_result_id(&self, result_id: Box) -> B; + fn add_result_id(&self, result_id: Box) -> Box; - async fn build(&self) -> B; + async fn build(&self) -> Box; - fn return_self(&self) -> B; + fn return_self(&self) -> Box; } #[async_trait::async_trait] @@ -37,12 +31,7 @@ pub(crate) trait GetQueryNodeOption { async fn node_options(option: QueryNodeConfigOptions) -> T; } -#[async_trait::async_trait] -pub(crate) trait QueryNodeConfig -where - B: Builder, - C: QueryNodeConfig, -{ +pub(crate) trait QueryNodeConfig { fn get_id(&self) -> String; fn get_type(&self) -> String; @@ -57,7 +46,7 @@ where fn is_read_cache(&self) -> bool; - async fn get_overrides(&self) -> HashMap; + fn get_overrides(&self) -> HashMap; fn get_string(&self, config: Configuration, key: String) -> String; @@ -71,25 +60,25 @@ where fn has_key(&self, key: String) -> String; - async fn to_builder(&self) -> B; + fn to_builder(&self) -> Box; fn result_ids(&self) -> Vec>; fn is_marked_cacheable(&self) -> bool; - async fn mark_cacheable(&self); + fn mark_cacheable(&self); } -impl Hash for dyn QueryNodeConfig { - fn hash(&self, _state: &mut H) { - todo!() - } -} +// impl Hash for dyn QueryNodeConfig { +// fn hash(&self, _state: &mut H) { +// todo!() +// } +// } -impl PartialEq for dyn QueryNodeConfig { - fn eq(&self, _other: &Self) -> bool { - todo!() - } -} +// impl PartialEq for dyn QueryNodeConfig { +// fn eq(&self, _other: &Self) -> bool { +// todo!() +// } +// } -impl Eq for dyn QueryNodeConfig {} +// impl Eq for dyn QueryNodeConfig {} diff --git a/src/common/query/query_node_factory.rs b/src/common/query/query_node_factory.rs index 02716a1..2395ab1 100644 --- a/src/common/query/query_node_factory.rs +++ b/src/common/query/query_node_factory.rs @@ -1,7 +1,5 @@ use dyn_clone::{clone_trait_object, DynClone}; -use crate::common::core::{tsdb::TSDB, tsdb_plugin::TSDBPlugin}; - use super::{ plan::query_plan::QueryPlanner, query_node::QueryNode, @@ -9,42 +7,39 @@ use super::{ query_pipeline_context::QueryPipelineContext, util::{JsonNode, ObjectMapper}, }; +use crate::common::core::{tsdb::TSDB, tsdb_plugin::TSDBPlugin}; #[async_trait::async_trait] -pub(crate) trait QueryNodeFactory +pub(crate) trait QueryNodeFactory where Self: TSDBPlugin + DynClone, - B: Builder, - C: QueryNodeConfig, { fn id(&self) -> String; - async fn parse_config(&self, mapper: ObjectMapper, tsdb: Box, node: JsonNode) -> C; + async fn parse_config( + &self, + mapper: ObjectMapper, + tsdb: Box, + node: JsonNode, + ) -> Box; async fn setup_graph( &self, - context: Box>, - config: C, - planner: Box>, + context: Box, + config: dyn QueryNodeConfig, + planner: Box, ); - async fn new_node( - &self, - context: Box>, - ) -> Box>; + async fn new_node(&self, context: Box) -> Box; async fn new_node_with_config( &self, - context: Box>, - config: C, - ) -> Box>; + context: Box, + config: dyn QueryNodeConfig, + ) -> Box; } -impl Clone for Box> -where - B: Builder + Clone, - C: QueryNodeConfig + Clone, -{ +impl Clone for Box { fn clone(&self) -> Self { todo!() } diff --git a/src/common/query/query_pipeline_context.rs b/src/common/query/query_pipeline_context.rs index 9ea7b86..11769c5 100644 --- a/src/common/query/query_pipeline_context.rs +++ b/src/common/query/query_pipeline_context.rs @@ -1,12 +1,3 @@ -use crate::common::{ - core::tsdb::TSDB, - data::{ - time_series_byte_id::TimeSeriesID, time_series_data_source::TimeSeriesDataSource, - time_series_data_source_config::TimeSeriesDataSourceConfig, - }, - stats::span::Span, -}; - use super::{ query_context::QueryContext, query_node::QueryNode, @@ -16,64 +7,55 @@ use super::{ query_sink::QuerySink, time_series_query::TimeSeriesQuery, }; +use crate::common::{ + core::tsdb::TSDB, + data::{ + time_series_id::TimeSeriesID, time_series_data_source::TimeSeriesDataSource, + time_series_data_source_config::TimeSeriesDataSourceConfig, + }, + stats::span::Span, +}; -#[async_trait::async_trait] -pub(crate) trait QueryPipelineContext -where - Self: QueryNode, - B: Builder, - C: QueryNodeConfig, -{ - async fn factory(&self) -> Box>; +pub(crate) trait QueryPipelineContext: QueryNode { + fn factory(&self) -> Box; fn tsdb(&self) -> Box; - async fn query(&self) -> Box; + fn query(&self) -> Box; - async fn query_context(&self) -> Box; + fn query_context(&self) -> Box; - async fn initialize(&self, span: Box); + fn initialize(&self, span: Box); - async fn fetch_next(&self, span: Box); + fn fetch_next(&self, span: Box); - async fn upstream(&self, node: Box>) -> Vec>>; + fn upstream(&self, node: Box) -> Vec>; - async fn downstream(&self, node: Box>) -> Vec>>; + fn downstream(&self, node: Box) -> Vec>; - async fn downstream_sources( - &self, - node: Box>, - ) -> Vec>>; + fn downstream_sources(&self, node: Box) -> Vec>; - async fn common_source_config( - &self, - node: Box>, - ) -> Box>; + fn common_source_config(&self, node: Box) + -> Box; - async fn downstream_sources_ids(&self, node: Box>) -> Vec; + fn downstream_sources_ids(&self, node: Box) -> Vec; - async fn downstream_sources_result_ids( + fn downstream_sources_result_ids( &self, - node: Box>, + node: Box, ) -> Vec>; - async fn upstream_of_type( - &self, - node: Box>, - ) -> Vec>>; + fn upstream_of_type(&self, node: Box) -> Vec>; - async fn downstream_of_type( - &self, - node: Box>, - ) -> Vec>>; + fn downstream_of_type(&self, node: Box) -> Vec>; - async fn sinks(&self) -> Vec>; + fn sinks(&self) -> Vec>; - async fn add_id(&self, hash: u64, id: Box); + fn add_id(&self, hash: u64, id: Box); - async fn get_id(&self, hash: u64) -> Box; + fn get_id(&self, hash: u64) -> Box; - async fn has_id(&self, hash: u64) -> bool; + fn has_id(&self, hash: u64) -> bool; - async fn close(&self); + fn close(&self); } diff --git a/src/common/query/query_result.rs b/src/common/query/query_result.rs index 283e096..c43f101 100644 --- a/src/common/query/query_result.rs +++ b/src/common/query/query_result.rs @@ -1,47 +1,34 @@ +use super::{query_node::QueryNode, query_result_id::QueryResultID}; use crate::common::{ data::{ - time_series::TimeSeries, time_series_byte_id::TimeSeriesID, + time_series::TimeSeries, time_series_id::TimeSeriesID, time_specification::TimeSpecification, }, rollup::rollup_config::RollupConfig, }; -use super::{ - query_node::QueryNode, - query_node_config::{Builder, QueryNodeConfig}, - query_result_id::QueryResultID, -}; - -#[async_trait::async_trait] -pub(crate) trait GetSource -where - B: Builder, - C: QueryNodeConfig, -{ - async fn source(&self) -> Box>; -} - -#[async_trait::async_trait] pub(crate) trait QueryResult { - async fn time_specification(&self) -> Box; + fn time_specification(&self) -> Box; - async fn time_series(&self) -> Vec>; + fn time_series(&self) -> Vec>; - async fn error(&self) -> String; + fn error(&self) -> String; - async fn exception(&self); + fn exception(&self); fn sequence_id(&self) -> u64; - async fn data_source(&self) -> Box; + fn data_source(&self) -> Box; - async fn id_type(&self) -> Box; + fn id_type(&self) -> Box; - async fn resolution(&self) -> u64; + fn resolution(&self) -> u64; fn rollup_config(&self) -> Box; - async fn close(&self); + fn close(&self); fn process_in_parallel(&self) -> bool; + + fn source(&self) -> Box; } diff --git a/src/common/query/serdes/time_series_serdes.rs b/src/common/query/serdes/time_series_serdes.rs index f0fedab..6695223 100644 --- a/src/common/query/serdes/time_series_serdes.rs +++ b/src/common/query/serdes/time_series_serdes.rs @@ -1,24 +1,14 @@ -use dyn_clone::DynClone; +use dyn_clone::{clone_trait_object, DynClone}; +use super::serde_callback::SerdesCallback; use crate::common::{ data::partial_time_series::PartialTimeSeries, - query::{ - query_node::QueryNode, - query_node_config::{Builder, QueryNodeConfig}, - query_result::QueryResult, - }, + query::{query_node::QueryNode, query_result::QueryResult}, stats::span::Span, }; -use super::serde_callback::SerdesCallback; - #[async_trait::async_trait] -pub(crate) trait TimeSeriesSerdes -where - Self: DynClone, - B: Builder, - C: QueryNodeConfig, -{ +pub(crate) trait TimeSeriesSerdes: DynClone { async fn serialize(&self, result: Box, span: Box); async fn serialize_with_partial( @@ -30,15 +20,7 @@ where async fn serialize_complete(&self, span: Box); - async fn deserialize(&self, node: Box>, span: Box); + async fn deserialize(&self, node: Box, span: Box); } -impl Clone for Box> -where - B: Builder + Clone, - C: QueryNodeConfig + Clone, -{ - fn clone(&self) -> Self { - todo!() - } -} +clone_trait_object!(TimeSeriesSerdes); diff --git a/src/common/query/time_series_query.rs b/src/common/query/time_series_query.rs index 8d85c58..fb2e082 100644 --- a/src/common/query/time_series_query.rs +++ b/src/common/query/time_series_query.rs @@ -1,13 +1,12 @@ use std::hash::Hash; -use crate::common::data::timestamp::TimeStamp; - use super::{ filter::{named_filter::NamedFilter, query_filter::QueryFilter}, query_mode::QueryMode, query_node_config::{Builder, QueryNodeConfig}, serdes::serdes_options::SerdesOptions, }; +use crate::common::data::timestamp::TimeStamp; pub(crate) enum LogLevel { OFF, @@ -26,15 +25,6 @@ pub(crate) enum CacheMode { CLEAR, } -#[async_trait::async_trait] -trait AsyncGetExeGraph -where - B: Builder, - C: QueryNodeConfig, -{ - async fn get_execution_graph(&self) -> Vec>>; -} - pub(crate) trait TimeSeriesQuery { fn get_start(&self) -> String; @@ -54,6 +44,8 @@ pub(crate) trait TimeSeriesQuery { fn end_time(&self) -> Box; + fn get_execution_graph(&self) -> Vec>; + fn get_serdes_configs(&self) -> Vec>; fn get_log_level(&self) -> LogLevel; diff --git a/src/common/rollup/mod.rs b/src/common/rollup/mod.rs index d596487..d4d4f20 100644 --- a/src/common/rollup/mod.rs +++ b/src/common/rollup/mod.rs @@ -1 +1,2 @@ pub mod rollup_config; +pub mod rollup_interval; diff --git a/src/common/rollup/rollup_config.rs b/src/common/rollup/rollup_config.rs index e51b91d..861df4b 100644 --- a/src/common/rollup/rollup_config.rs +++ b/src/common/rollup/rollup_config.rs @@ -1 +1,30 @@ -pub(crate) trait RollupConfig {} +use std::collections::HashMap; + +use bytes::Bytes; + +use super::rollup_interval::RollupInterval; + +pub(crate) trait RollupConfig { + fn get_aggregation_ids(&self) -> HashMap; + + fn get_aggregation_for_id(&self, id: i32) -> String; + + fn get_id_for_aggregator(&self, aggregator: String) -> i32; + + fn get_intervals(&self) -> Vec; + + fn get_possible_intervals(&self, interval: String) -> Vec; + + fn get_rollup_intervals( + &self, + interval: u64, + str_interval: String, + skip_default: bool, + ) -> Vec>; + + fn get_default_intervals(&self) -> Box; + + fn get_id_for_aggregator_with_qualifier(&self, qualifier: Vec) -> i32; + + fn get_offset_start_from_qualifier(&self, qualifier: Vec) -> i32; +} diff --git a/src/common/rollup/rollup_interval.rs b/src/common/rollup/rollup_interval.rs new file mode 100644 index 0000000..80d78e8 --- /dev/null +++ b/src/common/rollup/rollup_interval.rs @@ -0,0 +1,31 @@ +use bytes::Bytes; + +pub(crate) trait RollupInterval { + fn get_table(&self) -> String; + + fn get_temporal_table(&self) -> Vec; + + fn get_pre_aggregation_table(&self) -> String; + + fn get_group_by_table(&self) -> Vec; + + fn get_interval(&self) -> String; + + fn get_units(&self) -> String; + + fn get_unit_multiplier(&self) -> i32; + + fn get_interval_units(&self) -> String; + + fn get_interval_seconds(&self) -> i32; + + fn get_interval_count(&self) -> i32; + + fn is_default_interval(&self) -> bool; + + fn get_row_span(&self) -> String; + + fn get_rollup_config(&self) -> Box; + + fn set_rollup_config(&self, config: Box); +} diff --git a/src/common/stats/span.rs b/src/common/stats/span.rs index 280bd0d..753b039 100644 --- a/src/common/stats/span.rs +++ b/src/common/stats/span.rs @@ -1,6 +1,6 @@ #[async_trait::async_trait] pub(crate) trait Span { - //type Object; + // type Object; async fn finish(&self); diff --git a/src/common/storage/mod.rs b/src/common/storage/mod.rs new file mode 100644 index 0000000..0c657c3 --- /dev/null +++ b/src/common/storage/mod.rs @@ -0,0 +1,3 @@ +pub mod time_series_data_consumer; +pub mod write_callback; +pub mod write_status; diff --git a/src/common/storage/time_series_data_consumer.rs b/src/common/storage/time_series_data_consumer.rs new file mode 100644 index 0000000..26137e8 --- /dev/null +++ b/src/common/storage/time_series_data_consumer.rs @@ -0,0 +1,26 @@ +use super::write_callback::WriteCallback; +use crate::common::data::{ + low_level_time_series_data::LowLevelTimeSeriesData, time_series_datum::TimeSeriesDatum, + time_series_shared_tags_and_time_data::TimeSeriesShardTagsAndTimeData, +}; + +#[async_trait::async_trait] +pub(crate) trait TimeSeriesDataConsumer { + async fn write( + &self, + datum: Box, + callback: Box, + ); + + async fn write_with_shard_tags( + &self, + data: Box, + callback: Box, + ); + + async fn write_with_low_level( + &self, + data: Box, + callback: Box, + ); +} diff --git a/src/common/storage/write_callback.rs b/src/common/storage/write_callback.rs new file mode 100644 index 0000000..92c3f30 --- /dev/null +++ b/src/common/storage/write_callback.rs @@ -0,0 +1,12 @@ +use super::write_status::WriteStatus; + +#[async_trait::async_trait] +pub(crate) trait WriteCallback { + async fn success(&self); + + async fn partial_success(&self, status: Vec>, length: i32); + + async fn retry_all(&self); + + async fn fail_all(&self, status: Box); +} diff --git a/src/common/storage/write_status.rs b/src/common/storage/write_status.rs new file mode 100644 index 0000000..ec5c295 --- /dev/null +++ b/src/common/storage/write_status.rs @@ -0,0 +1,31 @@ +pub(crate) enum WriteState { + OK, + RETRY, + REJECTED, + ERROR, +} + +#[async_trait::async_trait] +pub(crate) trait WriteStatus { + async fn state(&self) -> WriteState; + + async fn message(&self) -> String; + + fn ok(&self) -> WriteState { + WriteState::OK + } + + fn retry(&self) -> WriteState { + WriteState::RETRY + } + + fn reject(&self) -> WriteState { + WriteState::REJECTED + } + + // fn retry_with_msg(&self, msg: String) -> WriteState { + // if msg.is_empty() { + // return WriteState::RETRY; + // } + // } +} diff --git a/src/core/coredb/default_registry.rs b/src/core/coredb/default_registry.rs index c50968a..557dddd 100644 --- a/src/core/coredb/default_registry.rs +++ b/src/core/coredb/default_registry.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use super::plugin_config::PluginsConfig; use crate::{ common::{ core::{ @@ -11,9 +12,7 @@ use crate::{ pool::{executor::ExecutorService, object_pool::ObjectPool}, query::{ interpolation::query_interpolator_factory::QueryInterpolatorFactory, - query_iter_factory::QueryIteratorFactory, - query_node_config::{Builder, QueryNodeConfig}, - query_node_factory::QueryNodeFactory, + query_iter_factory::QueryIteratorFactory, query_node_factory::QueryNodeFactory, serdes::time_series_serdes::TimeSeriesSerdes, }, }, @@ -26,8 +25,6 @@ use crate::{ }, }; -use super::plugin_config::PluginsConfig; - pub const PLUGIN_CONFIG_KEY: &str = "tsd.plugin.config"; pub const V2_LOAD_FILTERS_KEY: &str = "tsd.plugin.v2.load_filters"; pub const DEFAULT_CLUSTERS_KEY: &str = "tsd.query.default_clusters"; @@ -35,46 +32,26 @@ pub const DEFAULT_GRAPHS_KEY: &str = "tsd.query.default_execution_graphs"; #[allow(dead_code)] #[derive(Clone)] -pub(crate) struct DefaultRegistry -where - Box>: Clone, - Box>: Clone, -{ +pub(crate) struct DefaultRegistry { tsdb: Box, type_map: HashMap>, default_type_name_map: HashMap, String>, factories: HashMap>, clusters: HashMap, - serdes: HashMap>>, - node_factories: HashMap>>, + serdes: HashMap>, + node_factories: HashMap>, shared_objects: HashMap, pools: HashMap>, cleanup_pool: ExecutorService, plugins: PluginsConfig, } -unsafe impl Send for DefaultRegistry -where - Box>: Clone, - Box>: Clone, -{ -} +unsafe impl Send for DefaultRegistry {} -unsafe impl Sync for DefaultRegistry -where - Box>: Clone, - Box>: Clone, -{ -} +unsafe impl Sync for DefaultRegistry {} #[async_trait::async_trait] -impl Registry for DefaultRegistry -where - Box>: Clone, - Box>: Clone, - B: Clone, - C: Clone, -{ +impl Registry for DefaultRegistry { async fn initialize(&self, _load_plugins: bool) { todo!() } @@ -83,7 +60,7 @@ where todo!() } - async fn register_plugin(&self, _id: String, _plugin: Box) { + async fn register_plugin(&self, _id: String, _plugin: Box) { todo!() } @@ -153,14 +130,8 @@ where } } -impl RegistryGetQueryOpt for DefaultRegistry -where - B: Builder, - C: QueryNodeConfig, - Box>: Clone, - Box>: Clone, -{ - fn get_query_iter_factory(&self, _id: String) -> Box> { +impl RegistryGetQueryOpt for DefaultRegistry { + fn get_query_iter_factory(&self, _id: String) -> Box { todo!() } @@ -171,7 +142,7 @@ where todo!() } - fn get_query_node_factory(&self, _id: String) -> Box> { + fn get_query_node_factory(&self, _id: String) -> Box { todo!() } } diff --git a/src/core/coredb/default_tsdb.rs b/src/core/coredb/default_tsdb.rs index fdbe6c6..95e8f29 100644 --- a/src/core/coredb/default_tsdb.rs +++ b/src/core/coredb/default_tsdb.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use anyhow::Result; use hashed_wheel_timer::WheelTimer; use crate::common::{ - configuration::Configuration, + configuration::configuration::Configuration, core::{registry::Registry, tsdb::TSDB}, pool::executor::ExecutorService, query::query_context::QueryContext, @@ -85,11 +86,11 @@ impl TSDB for DefaultTSDB { #[allow(dead_code)] impl DefaultTSDB { - fn new_default_tsdb(&self, _config: Configuration) -> Self { + pub fn new(_config: Configuration) -> Self { todo!() } - async fn initialize_registry(&self, _load_plugin: bool) { + pub async fn initialize_registry(&self, _load_plugin: bool) -> Result<()> { todo!() } } diff --git a/src/core/meta/batch_meta_query.rs b/src/core/meta/batch_meta_query.rs new file mode 100644 index 0000000..4c7b167 --- /dev/null +++ b/src/core/meta/batch_meta_query.rs @@ -0,0 +1,101 @@ +use super::meta_query::MetaQuery; +use crate::common::data::timestamp::TimeStamp; + +pub(crate) enum QueryType { + NAMESPACES, + METRICS, + TAGKEYS, + TAGVALUES, + TAGKEYSANDVALUES, + TIMESERIES, + BASIC, +} + +pub(crate) enum QueryOrder { + ASCENDING, + DESCENDING, +} + +pub(crate) trait BatchMetaQuery { + fn get_from(&self) -> i32; + + fn get_to(&self) -> i32; + + fn get_aggregation_field(&self) -> String; + + fn get_aggregation_size(&self) -> i32; + + fn get_query_type(&self) -> QueryType; + + fn get_query_order(&self) -> QueryOrder; + + fn get_start(&self) -> Box; + + fn get_end(&self) -> Box; + + fn get_queries(&self) -> Vec>; + + fn meta_query(&self) -> Vec>; + + fn source(&self) -> String; +} + +pub(crate) struct BatchMetaQueryBuilder { + from: i32, + to: i32, + aggregation_field: String, + agg_size: i32, + query_type: QueryType, + source: String, + query_order: QueryOrder, + start: String, + end: String, + time_zone: String, + meta_query: Vec>, +} + +impl BatchMetaQueryBuilder { + pub fn set_from(&mut self, from: i32) { + self.from = from + } + + pub fn set_aggregation_field(&mut self, agg_field: String) { + self.aggregation_field = agg_field + } + + pub fn set_aggregation_size(&mut self, size: i32) { + self.agg_size = size + } + + pub fn set_query_type(&mut self, query_type: QueryType) { + self.query_type = query_type + } + + pub fn set_source(&mut self, source: String) { + self.source = source + } + + pub fn set_query_order(&mut self, order: QueryOrder) { + self.query_order = order + } + + pub fn set_start(&mut self, start: String) { + self.start = start + } + + pub fn set_end(&mut self, end: String) { + self.end = end + } + + pub fn set_time_zone(&mut self, time_zone: String) { + self.time_zone = time_zone + } + + pub fn set_meta_query(&mut self, meta_query: Vec>) { + self.meta_query = meta_query + } + + pub fn build(&self) { + todo!() + } +} diff --git a/src/core/meta/meta_data_storage_schema.rs b/src/core/meta/meta_data_storage_schema.rs new file mode 100644 index 0000000..575866e --- /dev/null +++ b/src/core/meta/meta_data_storage_schema.rs @@ -0,0 +1,33 @@ +use super::{ + batch_meta_query::{BatchMetaQuery, QueryType}, + meta_query::MetaQuery, +}; +use crate::common::{ + core::tsdb::TSDB, + data::time_series_data_source_config::TimeSeriesDataSourceConfig, + query::{ + query_pipeline_context::QueryPipelineContext, + util::{JsonNode, ObjectMapper}, + }, + stats::span::Span, +}; + +#[async_trait::async_trait] +pub(crate) trait MetaDataStorageSchema { + async fn run_query(&self, query: Box, span: Box); + + async fn run_query_with_context( + &self, + context: Box, + config: Box, + span: Box, + ); + + async fn parse( + &self, + tsdb: Box, + mapper: ObjectMapper, + jsonnode: JsonNode, + query_type: QueryType, + ) -> Box; +} diff --git a/src/core/meta/meta_query.rs b/src/core/meta/meta_query.rs new file mode 100644 index 0000000..548956e --- /dev/null +++ b/src/core/meta/meta_query.rs @@ -0,0 +1,33 @@ +use crate::common::query::filter::query_filter::QueryFilter; + +pub(crate) trait MetaQuery { + fn get_namespace(&self) -> String; + + fn get_filter(&self) -> Box; + + fn get_id(&self) -> String; +} + +pub(crate) struct MetaQueryBuilder { + namespace: String, + filter: Box, + id: String, +} + +impl MetaQueryBuilder { + pub fn set_namespace(&mut self, namespace: String) { + self.namespace = namespace + } + + pub fn set_filter(&mut self, filter: Box) { + self.filter = filter + } + + pub fn set_id(&mut self, id: String) { + self.id = id + } + + pub fn build(&self) { + todo!() + } +} diff --git a/src/core/meta/mod.rs b/src/core/meta/mod.rs new file mode 100644 index 0000000..78d9a56 --- /dev/null +++ b/src/core/meta/mod.rs @@ -0,0 +1,3 @@ +pub mod batch_meta_query; +pub mod meta_data_storage_schema; +pub mod meta_query; diff --git a/src/core/mod.rs b/src/core/mod.rs index 783faab..920ad43 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,3 +1,7 @@ pub mod coredb; +pub mod meta; pub mod pool; pub mod query; +pub mod rollup_config; +pub mod storage; +pub mod uid; diff --git a/src/core/query/abstract_query_node.rs b/src/core/query/abstract_query_node.rs new file mode 100644 index 0000000..ee72d51 --- /dev/null +++ b/src/core/query/abstract_query_node.rs @@ -0,0 +1,97 @@ +use crate::common::{ + data::{partial_time_series::PartialTimeSeries, time_series_data_source::TimeSeriesDataSource}, + query::{ + query_node::QueryNode, + query_node_config::{Builder, QueryNodeConfig}, + query_node_factory::QueryNodeFactory, + query_pipeline_context::QueryPipelineContext, + query_result::QueryResult, + }, + stats::span::Span, +}; + +pub(crate) struct AbstractQueryNode { + factory: Box, + context: Box, + upstream: Vec>, + downstream: Vec>, + downstream_sources: Vec>, +} + +unsafe impl Send for AbstractQueryNode {} + +unsafe impl Sync for AbstractQueryNode {} + +#[allow(unused_variables)] +#[allow(dead_code)] +impl AbstractQueryNode { + pub fn new(factory: Box, context: Box) -> Self { + todo!() + } + + pub fn fetch_down_stream(&self, span: Box) { + todo!() + } + + pub fn send_up_stream_with_ressult(&self, result: Box) { + todo!() + } + + pub fn send_up_stream_with_partial(&self, series: Box) { + todo!() + } + + // TODO: This is used to caught log or warning + pub fn send_up_stream(&self) { + todo!() + } + + pub fn complete_up_stream(&self, final_seq: u64, total_seq: u64) { + todo!() + } +} + +#[allow(unused_variables)] +#[async_trait::async_trait] +impl QueryNode for AbstractQueryNode { + async fn initialize(&self, span: Box) { + todo!() + } + + async fn factory(&self) -> Box { + todo!() + } + + async fn pipeline_context(&self) -> Box { + todo!() + } + + fn config(&self) -> Box { + todo!() + } + + async fn close(&self) { + todo!() + } + + async fn on_complete( + &self, + down_stream: Box, + final_seq: u64, + total_seq: u64, + ) { + todo!() + } + + async fn on_next(&self, next: Box) { + todo!() + } + + async fn on_next_by_partial(&self, next: Box) { + todo!() + } + + async fn on_error(&self) { + todo!() + } +} diff --git a/src/core/query/abstract_query_pipeline_context.rs b/src/core/query/abstract_query_pipeline_context.rs new file mode 100644 index 0000000..c5201b6 --- /dev/null +++ b/src/core/query/abstract_query_pipeline_context.rs @@ -0,0 +1,185 @@ +use std::{ + collections::HashMap, + sync::atomic::{AtomicBool, AtomicI32}, +}; + +use super::plan::default_query_planner::DefaultQueryPlanner; +use crate::common::{ + data::{partial_time_series::PartialTimeSeries, time_series_id::TimeSeriesID}, + query::{ + query_context::QueryContext, + query_node::QueryNode, + query_node_config::{Builder, QueryNodeConfig}, + query_node_factory::QueryNodeFactory, + query_pipeline_context::QueryPipelineContext, + query_result::QueryResult, + query_result_id::QueryResultID, + query_sink::QuerySink, + query_sink_callback::QuerySinkCallback, + }, + stats::span::Span, +}; + +struct PartialTimeSetWrapper { + count: AtomicI32, + max: AtomicI32, +} + +pub(crate) struct AbstractQueryPipelineContext { + sinks: Vec>, + count_downs: HashMap, AtomicI32>, + context: Box, + plan: DefaultQueryPlanner, + complete: AtomicBool, + source_idx: i32, + pts: HashMap>, + finished_sources: HashMap, + total_finished: AtomicI32, + ids: HashMap, HashMap>>, +} + +unsafe impl Send for AbstractQueryPipelineContext {} + +unsafe impl Sync for AbstractQueryPipelineContext {} + +#[async_trait::async_trait] +impl QueryNode for AbstractQueryPipelineContext { + async fn initialize(&self, span: Box) { + todo!() + } + + async fn factory(&self) -> Box { + todo!() + } + + async fn pipeline_context(&self) -> Box { + todo!() + } + + fn config(&self) -> Box { + todo!() + } + + async fn close(&self) { + todo!() + } + + async fn on_complete( + &self, + down_stream: Box, + final_seq: u64, + total_seq: u64, + ) { + todo!() + } + + async fn on_next(&self, next: Box) { + todo!() + } + + async fn on_next_by_partial(&self, next: Box) { + todo!() + } + + async fn on_error(&self) { + todo!() + } +} + +impl QueryPipelineContext for AbstractQueryPipelineContext { + fn factory(&self) -> Box { + todo!() + } + + fn tsdb(&self) -> Box { + todo!() + } + + fn query(&self) -> Box { + todo!() + } + + fn query_context(&self) -> Box { + todo!() + } + + fn initialize(&self, span: Box) { + todo!() + } + + fn fetch_next(&self, span: Box) { + todo!() + } + + fn upstream(&self, node: Box) -> Vec> { + todo!() + } + + fn downstream(&self, node: Box) -> Vec> { + todo!() + } + + fn downstream_sources( + &self, + node: Box, + ) -> Vec> { + todo!() + } + + fn common_source_config( + &self, + node: Box, + ) -> Box + { + todo!() + } + + fn downstream_sources_ids(&self, node: Box) -> Vec { + todo!() + } + + fn downstream_sources_result_ids( + &self, + node: Box, + ) -> Vec> { + todo!() + } + + fn upstream_of_type(&self, node: Box) -> Vec> { + todo!() + } + + fn downstream_of_type(&self, node: Box) -> Vec> { + todo!() + } + + fn sinks(&self) -> Vec> { + todo!() + } + + fn add_id(&self, hash: u64, id: Box) { + todo!() + } + + fn get_id(&self, hash: u64) -> Box { + todo!() + } + + fn has_id(&self, hash: u64) -> bool { + todo!() + } + + fn close(&self) { + todo!() + } +} + +impl QuerySinkCallback for AbstractQueryPipelineContext { + fn on_complete(&self, pts: Box) { + todo!() + } + + fn on_error(&self, pts: Box) { + todo!() + } +} diff --git a/src/core/query/bad_query_result.rs b/src/core/query/bad_query_result.rs new file mode 100644 index 0000000..49054f8 --- /dev/null +++ b/src/core/query/bad_query_result.rs @@ -0,0 +1,68 @@ +use crate::common::{ + data::{time_series::TimeSeries, time_series_id::TimeSeriesID}, + query::{ + query_node::QueryNode, + query_node_config::{Builder, QueryNodeConfig}, + query_result::QueryResult, + query_result_id::QueryResultID, + }, + rollup::rollup_config::RollupConfig, +}; + +pub(crate) struct BadQueryResult { + node: Box, + data_source: Box, + error: String, +} + +impl QueryResult for BadQueryResult { + fn time_specification( + &self, + ) -> Box { + todo!() + } + + fn time_series(&self) -> Vec> { + todo!() + } + + fn error(&self) -> String { + todo!() + } + + fn exception(&self) { + todo!() + } + + fn sequence_id(&self) -> u64 { + todo!() + } + + fn data_source(&self) -> Box { + todo!() + } + + fn id_type(&self) -> Box { + todo!() + } + + fn resolution(&self) -> u64 { + todo!() + } + + fn rollup_config(&self) -> Box { + todo!() + } + + fn close(&self) { + todo!() + } + + fn process_in_parallel(&self) -> bool { + todo!() + } + + fn source(&self) -> Box { + todo!() + } +} diff --git a/src/core/query/base_query_context.rs b/src/core/query/base_query_context.rs new file mode 100644 index 0000000..76ca0a2 --- /dev/null +++ b/src/core/query/base_query_context.rs @@ -0,0 +1,145 @@ +use std::{collections::HashMap, sync::atomic::AtomicBool}; + +use super::semantic_query::SemanticQuery; +use crate::common::{ + core::tsdb::TSDB, + data::time_series_id::TimeSeriesID, + query::{ + query_context::QueryContext, + query_context_builder::QueryContextBuilder, + query_mode::QueryMode, + query_node_config::{Builder, QueryNodeConfig}, + query_pipeline_context::QueryPipelineContext, + query_sink::QuerySink, + query_sink_config::QuerySinkConfig, + time_series_query::TimeSeriesQuery, + }, + stats::{query_stats::QueryStats, span::Span}, +}; + +pub(crate) struct BaseQueryContext { + tsdb: Box, + query: SemanticQuery, + stats: Box, + sink_config: Vec>, + pipeline: Box, + // auth_state: AuthState, + headers: HashMap, + logs: Vec, + builder_sinks: Vec>, + local_span: Box, + is_closed: AtomicBool, + cacheable: AtomicBool, +} + +impl Clone for BaseQueryContext { + fn clone(&self) -> Self { + todo!() + } +} + +impl QueryContext for BaseQueryContext { + fn sinks(&self) -> Vec> { + todo!() + } + + fn mode(&self) -> QueryMode { + todo!() + } + + fn fetch_next(&self, span: Box) { + todo!() + } + + fn close(&self) { + todo!() + } + + fn is_closed(&self) -> bool { + todo!() + } + + fn stats(&self) -> Box { + todo!() + } + + fn sink_configs(&self) -> Vec> { + todo!() + } + + fn query(&self) -> Box { + todo!() + } + + fn tsdb(&self) -> Box { + todo!() + } + + fn headers(&self) -> HashMap { + todo!() + } + + fn cacheable(&self) -> bool { + todo!() + } + + fn initialize(&self, span: Box) { + todo!() + } + + fn get_id(&self, hash: u64) -> Box { + todo!() + } +} + +pub(crate) struct LocalPipeline {} + +impl LocalPipeline { + pub fn new(context: Box, direct_sinks: Vec>) -> Self { + todo!() + } +} + +pub(crate) struct BaseBuilder { + tsdb: Box, + query: SemanticQuery, + stats: Box, + sink_configs: Vec>, + sinks: Vec>, + // auth_state: AuthState, + headers: HashMap, +} + +impl QueryContextBuilder for BaseBuilder { + fn set_query(&mut self, query: Box) { + todo!() + } + + fn set_mode(&mut self, mode: QueryMode) { + todo!() + } + + fn set_stats(&mut self, stats: Box) { + todo!() + } + + fn set_sinks(&mut self, configs: Vec>) { + todo!() + } + + fn add_sink(&mut self, sink: Box) { + todo!() + } + + fn set_local_sinks(&mut self, sinks: Vec>) { + todo!() + } + + fn set_header(&mut self, headers: HashMap) { + todo!() + } + + fn build(&self) -> Box { + todo!() + } +} diff --git a/src/core/query/base_query_node_config.rs b/src/core/query/base_query_node_config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/base_query_node_config_with_interpolators.rs b/src/core/query/base_query_node_config_with_interpolators.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/base_timeseries_data_source_config.rs b/src/core/query/base_timeseries_data_source_config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/base_wrapped_query_result.rs b/src/core/query/base_wrapped_query_result.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/converted_query_result.rs b/src/core/query/converted_query_result.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/default_query_context_filter.rs b/src/core/query/default_query_context_filter.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/default_time_series_data_source_config.rs b/src/core/query/default_time_series_data_source_config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/mod.rs b/src/core/query/mod.rs index 60afac5..b805c88 100644 --- a/src/core/query/mod.rs +++ b/src/core/query/mod.rs @@ -1,3 +1,23 @@ +pub mod abstract_query_node; +pub mod abstract_query_pipeline_context; +pub mod bad_query_result; +pub mod base_query_context; +pub mod base_query_node_config; +pub mod base_query_node_config_with_interpolators; +pub mod base_timeseries_data_source_config; +pub mod base_wrapped_query_result; +pub mod converted_query_result; +pub mod default_query_context_filter; +pub mod default_time_series_data_source_config; pub mod execution; pub mod hacluster; +pub mod plan; pub mod pojo; +pub mod preagg_config; +pub mod read_cache_query_pipeline_context; +pub mod semantic_query; +pub mod semantic_query_context; +pub mod slice_config; +pub mod ts_query; +pub mod ts_sub_query; +pub mod wrapped_time_series_data_source_config; diff --git a/src/core/query/plan/context_node_config.rs b/src/core/query/plan/context_node_config.rs new file mode 100644 index 0000000..83833ed --- /dev/null +++ b/src/core/query/plan/context_node_config.rs @@ -0,0 +1,86 @@ +use std::collections::HashMap; + +use crate::common::{ + configuration::configuration::Configuration, + query::{ + query_node_config::{Builder, QueryNodeConfig}, + query_node_config_options::QueryNodeConfigOptions, + query_result_id::QueryResultID, + }, +}; + +pub(crate) struct ContextNodeConfig {} + +impl QueryNodeConfig for ContextNodeConfig { + fn get_id(&self) -> String { + todo!() + } + + fn get_type(&self) -> String { + todo!() + } + + fn get_sources(&self) -> Vec { + todo!() + } + + fn is_push_down(&self) -> bool { + todo!() + } + + fn is_joins(&self) -> bool { + todo!() + } + + fn is_node_flag(&self, option: QueryNodeConfigOptions) -> bool { + todo!() + } + + fn is_read_cache(&self) -> bool { + todo!() + } + + fn get_overrides(&self) -> HashMap { + todo!() + } + + fn get_string(&self, config: Configuration, key: String) -> String { + todo!() + } + + fn get_int(&self, config: Configuration, key: String) -> i32 { + todo!() + } + + fn get_long(&self, config: Configuration, key: String) -> u64 { + todo!() + } + + fn get_bool(&self, config: Configuration, key: String) -> bool { + todo!() + } + + fn get_double(&self, config: Configuration, key: String) -> u32 { + todo!() + } + + fn has_key(&self, key: String) -> String { + todo!() + } + + fn to_builder(&self) -> Box { + todo!() + } + + fn result_ids(&self) -> Vec> { + todo!() + } + + fn is_marked_cacheable(&self) -> bool { + todo!() + } + + fn mark_cacheable(&self) { + todo!() + } +} diff --git a/src/core/query/plan/default_query_planner.rs b/src/core/query/plan/default_query_planner.rs new file mode 100644 index 0000000..5d26f4b --- /dev/null +++ b/src/core/query/plan/default_query_planner.rs @@ -0,0 +1,124 @@ +use std::collections::{HashMap, HashSet}; + +use petgraph::Graph; + +use super::context_node_config::ContextNodeConfig; +use crate::common::{ + data::{ + time_series_data_source::TimeSeriesDataSource, + time_series_data_source_config::TimeSeriesDataSourceConfig, + }, + query::{ + plan::query_plan::{QueryPlanner, TimeAdjustments}, + query_node::QueryNode, + query_node_config::{Builder, QueryNodeConfig}, + query_node_factory::QueryNodeFactory, + query_pipeline_context::QueryPipelineContext, + query_result_id::QueryResultID, + }, + stats::span::Span, +}; + +pub(crate) struct DefaultQueryPlanner { + context: Box, + context_sink: Box, + context_sink_config: ContextNodeConfig, + sink_filter: HashMap, + roots: Vec>, + graph: Graph, Box>, + data_sources: Vec>, + source_node: HashSet>, + config_graph: Graph, Box>, + nodes_map: HashMap>, + factory_cache: HashMap>, + context_node: Box, + serialization_sources: Vec>, + modified_during_setup: bool, + satisfied_filter: HashSet, +} + +unsafe impl Send for DefaultQueryPlanner {} + +unsafe impl Sync for DefaultQueryPlanner {} + +#[async_trait::async_trait] +impl QueryPlanner for DefaultQueryPlanner { + async fn plan(&self, span: Box) { + todo!() + } + + async fn replace( + &self, + old_config: Box, + new_config: Box, + ) { + todo!() + } + + async fn add_edge( + &self, + from: Box, + to: Box, + ) -> bool { + todo!() + } + + async fn remove_edge( + &self, + from: Box, + to: Box, + ) -> bool { + todo!() + } + + async fn remove_node(&self, config: Box) -> bool { + todo!() + } + + fn graph(&self) -> Graph, Box> { + todo!() + } + + fn config_graph(&self) -> Graph, Box> { + todo!() + } + + fn context(&self) -> Box { + todo!() + } + + async fn terminal_source_node( + &self, + config: Box, + ) -> Vec> { + todo!() + } + + async fn base_setup_graph( + &self, + context: Box, + config: Box, + ) { + todo!() + } + + fn node_for_id(&self, id: String) -> Box { + todo!() + } + + fn get_factory(&self, node: Box) -> Box { + todo!() + } + + fn get_metric_for_data_source( + &self, + node: Box, + data_source_id: String, + ) -> String { + todo!() + } + + fn get_adjustments(&self, config: Box) -> TimeAdjustments { + todo!() + } +} diff --git a/src/core/query/plan/mod.rs b/src/core/query/plan/mod.rs new file mode 100644 index 0000000..aa07d71 --- /dev/null +++ b/src/core/query/plan/mod.rs @@ -0,0 +1,2 @@ +pub mod context_node_config; +pub mod default_query_planner; diff --git a/src/core/query/preagg_config.rs b/src/core/query/preagg_config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/read_cache_query_pipeline_context.rs b/src/core/query/read_cache_query_pipeline_context.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/semantic_query.rs b/src/core/query/semantic_query.rs new file mode 100644 index 0000000..ac8f5ee --- /dev/null +++ b/src/core/query/semantic_query.rs @@ -0,0 +1,88 @@ +use std::collections::HashMap; + +use crate::common::{ + data::timestamp::TimeStamp, + query::{ + filter::{named_filter::NamedFilter, query_filter::QueryFilter}, + query_mode::QueryMode, + query_node_config::QueryNodeConfig, + serdes::serdes_options::SerdesOptions, + time_series_query::{CacheMode, LogLevel, TimeSeriesQuery}, + }, +}; + +pub(crate) struct SemanticQuery { + start: String, + start_ts: Box, + end: String, + end_ts: Box, + time_zone: String, + execution_graph: Vec>, + filters: HashMap>, + query_mode: QueryMode, + cache_mode: CacheMode, + log_level: LogLevel, + cached_hash: u64, +} + +impl TimeSeriesQuery for SemanticQuery { + fn get_start(&self) -> String { + todo!() + } + + fn get_end(&self) -> String { + todo!() + } + + fn get_timezone(&self) -> String { + todo!() + } + + fn get_mode(&self) -> crate::common::query::query_mode::QueryMode { + todo!() + } + + fn get_cache_mode(&self) -> CacheMode { + todo!() + } + + fn get_filters(&self) -> Vec> { + todo!() + } + + fn get_filter(&self, filter_id: String) -> Box { + todo!() + } + + fn start_time(&self) -> Box { + todo!() + } + + fn end_time(&self) -> Box { + todo!() + } + + fn get_serdes_configs(&self) -> Vec> { + todo!() + } + + fn get_log_level(&self) -> LogLevel { + todo!() + } + + fn is_trace_enable(&self) -> bool { + todo!() + } + + fn is_debug_enable(&self) -> bool { + todo!() + } + + fn is_warn_enable(&self) -> bool { + todo!() + } + + fn get_execution_graph(&self) -> Vec> { + todo!() + } +} diff --git a/src/core/query/semantic_query_context.rs b/src/core/query/semantic_query_context.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/slice_config.rs b/src/core/query/slice_config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/ts_query.rs b/src/core/query/ts_query.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/ts_sub_query.rs b/src/core/query/ts_sub_query.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/query/wrapped_time_series_data_source_config.rs b/src/core/query/wrapped_time_series_data_source_config.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/core/rollup_config/default_rollup_config.rs b/src/core/rollup_config/default_rollup_config.rs new file mode 100644 index 0000000..3c15f8a --- /dev/null +++ b/src/core/rollup_config/default_rollup_config.rs @@ -0,0 +1,48 @@ +use crate::common::rollup::rollup_config::RollupConfig; + +pub(crate) struct DefaultRollupConfig {} + +impl RollupConfig for DefaultRollupConfig { + fn get_aggregation_ids(&self) -> std::collections::HashMap { + todo!() + } + + fn get_aggregation_for_id(&self, id: i32) -> String { + todo!() + } + + fn get_id_for_aggregator(&self, aggregator: String) -> i32 { + todo!() + } + + fn get_intervals(&self) -> Vec { + todo!() + } + + fn get_possible_intervals(&self, interval: String) -> Vec { + todo!() + } + + fn get_rollup_intervals( + &self, + interval: u64, + str_interval: String, + skip_default: bool, + ) -> Vec> { + todo!() + } + + fn get_default_intervals( + &self, + ) -> Box { + todo!() + } + + fn get_id_for_aggregator_with_qualifier(&self, qualifier: Vec) -> i32 { + todo!() + } + + fn get_offset_start_from_qualifier(&self, qualifier: Vec) -> i32 { + todo!() + } +} diff --git a/src/core/rollup_config/mod.rs b/src/core/rollup_config/mod.rs new file mode 100644 index 0000000..db0fd90 --- /dev/null +++ b/src/core/rollup_config/mod.rs @@ -0,0 +1 @@ +pub mod default_rollup_config; diff --git a/src/core/storage/mod.rs b/src/core/storage/mod.rs new file mode 100644 index 0000000..4207be3 --- /dev/null +++ b/src/core/storage/mod.rs @@ -0,0 +1 @@ +pub mod schemas; diff --git a/src/core/storage/schemas/codec.rs b/src/core/storage/schemas/codec.rs new file mode 100644 index 0000000..52f2700 --- /dev/null +++ b/src/core/storage/schemas/codec.rs @@ -0,0 +1,36 @@ +use bytes::Bytes; + +use super::{rowseq::RowSeq, seqspan::SeqSpan}; +use crate::common::{ + data::{time_series_datatype::TimeSeriesDataType, time_series_value::TimeSeriesValue}, + rollup::rollup_interval::RollupInterval, + storage::write_status::WriteStatus, +}; + +pub(crate) trait Codec { + fn get_type(&self) -> Box; + + fn new_sequence(&self, reversed: bool) -> Box>>; + + fn new_rowseq(&self, base_time: u64) -> Box; + + fn encode( + &self, + value: Box, + append_format: bool, + base_time: i32, + rollup_interval: Box, + ) -> Box; + + fn reset(&self); + + fn encoded_values(&self) -> i32; + + fn qualifiers(&self) -> Vec>; + + fn values(&self) -> Vec>; + + fn qualifier_lengths(&self) -> Vec; + + fn value_lengths(&self) -> Vec; +} diff --git a/src/core/storage/schemas/datum_validator.rs b/src/core/storage/schemas/datum_validator.rs new file mode 100644 index 0000000..f2fb9ae --- /dev/null +++ b/src/core/storage/schemas/datum_validator.rs @@ -0,0 +1,5 @@ +use crate::common::data::time_series_datum_id::TimeSeriesDatumID; + +pub(crate) trait DatumIDValidator { + fn validate(&self, id: Box) -> String; +} diff --git a/src/core/storage/schemas/mod.rs b/src/core/storage/schemas/mod.rs new file mode 100644 index 0000000..c409101 --- /dev/null +++ b/src/core/storage/schemas/mod.rs @@ -0,0 +1,9 @@ +pub mod codec; +pub mod datum_validator; +pub mod rowseq; +pub mod schema; +pub mod schema_factory; +pub mod seqspan; +pub mod source_node; +pub mod tsdb_data_store; +pub mod tsdb_query_node; diff --git a/src/core/storage/schemas/rowseq.rs b/src/core/storage/schemas/rowseq.rs new file mode 100644 index 0000000..212af22 --- /dev/null +++ b/src/core/storage/schemas/rowseq.rs @@ -0,0 +1,15 @@ +use bytes::Bytes; + +use crate::common::{core::tsdb::TSDB, data::time_series_datatype::TimeSeriesDataType}; + +pub(crate) trait RowSeq { + fn get_type(&self) -> Box; + + fn add_column(&self, prefix: Bytes, qualifier: Vec, value: Vec); + + fn dedupe(&self, tsdb: Vec>, keep_earliest: bool, reverse: bool) -> u64; + + fn size(&self) -> i32; + + fn data_point(&self) -> i32; +} diff --git a/src/core/storage/schemas/schema.rs b/src/core/storage/schemas/schema.rs new file mode 100644 index 0000000..4a4c067 --- /dev/null +++ b/src/core/storage/schemas/schema.rs @@ -0,0 +1,321 @@ +use std::{any::Any, collections::HashMap}; + +use super::{ + codec::Codec, datum_validator::DatumIDValidator, schema_factory::SchemaFactory, + tsdb_data_store::TSDBDataStore, +}; +use crate::{ + common::{ + core::tsdb::TSDB, + data::{ + low_level_time_series_data::LowLevelTimeSeriesData, + partial_time_series::PartialTimeSeries, partial_time_series_set::PartialTimeSeriesSet, + time_series_byte_id::TimeSeriesByteID, time_series_datatype::TimeSeriesDataType, + time_series_datum::TimeSeriesDatum, time_series_id::TimeSeriesID, + time_series_shared_tags_and_time_data::TimeSeriesShardTagsAndTimeData, + time_series_string_id::TimeSeriesStringID, timestamp::TimeStamp, + }, + pool::object_pool::ObjectPool, + rollup::rollup_interval::RollupInterval, + stats::span::Span, + storage::{ + time_series_data_consumer::TimeSeriesDataConsumer, write_callback::WriteCallback, + }, + }, + core::{ + meta::meta_data_storage_schema::MetaDataStorageSchema, + uid::{uniqueid::UniqueID, uniqueid_store::UniqueIDStore}, + }, +}; + +pub const APPENDS_PREFIX: i32 = 5; +pub const QUERY_BYTE_LIMIT_KEY: &str = "tsd.query.limits.bytes"; +pub const QUERY_BYTE_LIMIT_DEFAULT: u64 = 0; +pub const QUERY_DP_LIMIT_KEY: &str = "tsd.query.limits.data_points"; +pub const QUERY_DP_LIMIT_DEFAULT: u64 = 0; +pub const QUERY_REVERSE_KEY: &str = "tsd.query.time.descending"; +pub const QUERY_KEEP_FIRST_KEY: &str = "tsd.query.duplicates.keep_earliest"; +pub const TIMELESS_SALTING_KEY: &str = "tsd.storage.salt.timeless"; +pub const OLD_SALTING_KEY: &str = "tsd.storage.salt.old"; +pub const MAX_RAW_TIMESPAN: i16 = 3600; +pub const TIMESTAMP_BYTES: i16 = 4; +pub const METRIC_TYPE: &str = "metric"; +pub const TAGK_TYPE: &str = "tagk"; +pub const TAGV_TYPE: &str = "tagv"; + +pub(crate) struct Schema { + tsdb: Box, + id: String, + data_store: Box, + uid_store: Box, + metrics: Box, + tag_names: Box, + tag_values: Box, + metric_width: i32, + tagk_width: i32, + tagv_width: i32, + salt_buckets: i32, + salt_width: i32, + timeless_salting: bool, + old_salting: bool, + pool_cache: HashMap, Box>, + // TODO: Any represent TypeToken, maybe better idea? + codecs: HashMap, Box>, + encoders: HashMap, Box>, + meta_schema: Box, + id_validator: Box, + factory: SchemaFactory, +} + +unsafe impl Send for Schema {} +unsafe impl Sync for Schema {} + +#[async_trait::async_trait] +impl TimeSeriesDataConsumer for Schema { + async fn write( + &self, + datum: Box, + callback: Box, + ) { + todo!() + } + + async fn write_with_shard_tags( + &self, + data: Box, + callback: Box, + ) { + todo!() + } + + async fn write_with_low_level( + &self, + data: Box, + callback: Box, + ) { + todo!() + } +} + +impl Schema { + pub fn config_key(&self, suffix: &str) -> String { + "tsd.storage.".to_owned() + &self.id.to_string() + "." + &suffix + } + + pub fn set_config(&mut self) { + let key = self.config_key("uid.width.metric"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_int_value( + key.clone(), + 3, + false, + "The width, in bytes, of UIDs for metrics.".to_string(), + ) + } + self.metric_width = self.tsdb.get_config().get_int(key); + + let key = self.config_key("uid.width.tagk"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_int_value( + key.clone(), + 3, + false, + "The width, in bytes, of UIDs for tag keys.".to_string(), + ) + } + self.tagk_width = self.tsdb.get_config().get_int(key); + + let key = self.config_key("uid.width.tagv"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_int_value( + key.clone(), + 3, + false, + "The width, in bytes, of UIDs for tag values.".to_string(), + ) + } + self.tagv_width = self.tsdb.get_config().get_int(key); + + let key = self.config_key("uid.cache.type.metric"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_string_value( + key.clone(), + "LRU".to_string(), + false, + "The name of the UniqueId factory used for caching metric UIDs.".to_string(), + ) + } + + let key = self.config_key("uid.cache.type.tagk"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_string_value( + key.clone(), + "LRU".to_string(), + false, + "The name of the UniqueId factory used for caching tagk UIDs.".to_string(), + ) + } + + let key = self.config_key("uid.cache.type.tagv"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_string_value( + key.clone(), + "LRU".to_string(), + false, + "The name of the UniqueId factory used for caching tagv UIDs.".to_string(), + ) + } + + let key = self.config_key("salt.buckets"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_int_value( + key.clone(), + 20, + false, + "The number of salt buckets to spread data into.".to_string(), + ) + } + self.salt_buckets = self.tsdb.get_config().get_int(key); + + let key = self.config_key("salt.width"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_int_value( + key.clone(), + 0, + false, + "The width, in bytes, of the salt prefix in row keys.".to_string(), + ) + } + self.salt_width = self.tsdb.get_config().get_int(key); + + let key = self.config_key("data.store"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_string_value( + key.clone(), + "".to_string(), + false, + "The name of the data store factory to load and associate with this schema." + .to_string(), + ) + } + + let key = self.config_key("rollups.enable"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_bool_value( + key.clone(), + false, + false, + "Whether or not rollups are enabled for this schema.".to_string(), + ) + } + + let key = self.config_key("rollups.config"); + if !self.tsdb.get_config().has_property(key.clone()) { + self.tsdb.get_config().register_by_string_value( + key.clone(), + "".to_string(), + false, + "The path to a JSON file containing the rollup configuration.".to_string(), + ) + } + } + + pub fn get_data_store(&self) -> Box { + self.data_store.clone() + } + + pub fn get_uid_store(&self) -> Box { + self.uid_store.clone() + } + + pub fn get_metrics(&self) -> Box { + self.metrics.clone() + } + + pub fn get_tag_names(&self) -> Box { + self.tag_names.clone() + } + + pub fn get_tag_values(&self) -> Box { + self.tag_values.clone() + } + + pub fn get_tsdb(&self) -> Box { + self.tsdb.clone() + } + + async fn resolve_byte_id( + &self, + id: Box, + span: Box, + ) -> Box { + todo!() + } + + pub fn new_series( + &self, + dtype: Box, + base_timestamp: Box, + id_hash: u64, + set: Box, + interval: Box, + ) -> Box { + todo!() + } + + pub fn new_schema(&self, factory: SchemaFactory, tsdb: Box, id: String) { + todo!() + } +} + +struct MetricAndTagsDatumID { + metric: String, + tags: HashMap, +} +impl TimeSeriesID for MetricAndTagsDatumID { + fn encode(&self) -> bool { + todo!() + } + + fn hashcode(&self) -> u64 { + todo!() + } +} + +impl TimeSeriesStringID for MetricAndTagsDatumID { + fn alias(&self) -> String { + todo!() + } + + fn namespace(&self) -> String { + todo!() + } + + fn metric(&self) -> String { + todo!() + } + + fn tags(&self) -> HashMap { + todo!() + } + + fn get_tag_value(&self, key: String) -> String { + todo!() + } + + fn aggregated_tags(&self) -> Vec { + todo!() + } + + fn disjoint_tags(&self) -> Vec { + todo!() + } + + fn unique_ids(&self) -> std::collections::HashSet { + todo!() + } + + fn hits(&self) -> u64 { + todo!() + } +} diff --git a/src/core/storage/schemas/schema_factory.rs b/src/core/storage/schemas/schema_factory.rs new file mode 100644 index 0000000..f9e5819 --- /dev/null +++ b/src/core/storage/schemas/schema_factory.rs @@ -0,0 +1,109 @@ +use bytes::Bytes; + +use super::schema::Schema; +use crate::{ + common::{ + core::{base_tsdb_plugin::BaseTSDBPlugin, tsdb::TSDB, tsdb_plugin::TSDBPlugin}, + data::{ + time_series_byte_id::TimeSeriesByteID, + time_series_data_consumer_factory::TimeSeriesDataConsumerFactory, + time_series_data_source_config::TimeSeriesDataSourceConfig, + time_series_data_source_factory::TimeSeriesDataSourceFactory, + time_series_id::TimeSeriesID, time_series_string_id::TimeSeriesStringID, + }, + query::{query_node_config::QueryNodeConfig, query_pipeline_context::QueryPipelineContext}, + rollup::rollup_config::RollupConfig, + stats::span::Span, + storage::time_series_data_consumer::TimeSeriesDataConsumer, + }, + core::rollup_config::default_rollup_config::DefaultRollupConfig, +}; + +pub const SchemaFactoryType: &str = "Tsdb1xSchemaFactory"; +pub const SchemaFactoryKeyPrefix: &str = "tsd.storage."; +pub const RollupEnableKey: &str = "rollups.enable"; +pub const RollupKey: &str = "rollups.config"; + +pub(crate) struct SchemaFactory { + schema: Box, + rollup_config: DefaultRollupConfig, +} + +unsafe impl Send for SchemaFactory {} +unsafe impl Sync for SchemaFactory {} + +#[async_trait::async_trait] +impl TSDBPlugin for SchemaFactory { + fn id(&self) -> String { + todo!() + } + + fn get_type(&self) -> String { + todo!() + } + + fn version(&self) -> String { + todo!() + } + + async fn initialize(&mut self, tsdb: Box, id: String) { + todo!() + } + + async fn shutdown(&self) { + todo!() + } +} + +#[async_trait::async_trait] +impl TimeSeriesDataSourceFactory for SchemaFactory { + fn id_type(&self) -> Box { + todo!() + } + + fn supports_query( + &self, + context: Box, + config: Box, + ) -> bool { + todo!() + } + + fn supports_push_down(&self, operation: Box) -> bool { + todo!() + } + + async fn resolve_byte_id( + &self, + id: Box, + span: Box, + ) -> Box { + todo!() + } + + async fn encode_join_keys( + &self, + join_keys: Vec, + span: Box, + ) -> Vec> { + todo!() + } + + async fn encode_join_metrics( + &self, + join_metrics: Vec, + span: Box, + ) -> Vec> { + todo!() + } + + fn rollup_config(&self) -> Box { + todo!() + } +} + +impl TimeSeriesDataConsumerFactory for SchemaFactory { + fn consumer(&self) -> Box { + todo!() + } +} diff --git a/src/core/storage/schemas/seqspan.rs b/src/core/storage/schemas/seqspan.rs new file mode 100644 index 0000000..df77cea --- /dev/null +++ b/src/core/storage/schemas/seqspan.rs @@ -0,0 +1,6 @@ +use super::rowseq::RowSeq; +use crate::common::{core::tsdb::TSDB, data::time_series_datatype::TimeSeriesDataType}; + +pub(crate) trait SeqSpan { + fn add_sequence(&self, tsdb: Box, sequence: Box, keep_earliest: bool); +} diff --git a/src/core/storage/schemas/source_node.rs b/src/core/storage/schemas/source_node.rs new file mode 100644 index 0000000..433d582 --- /dev/null +++ b/src/core/storage/schemas/source_node.rs @@ -0,0 +1,8 @@ +use super::schema::Schema; +use crate::common::data::timestamp::TimeStamp; + +pub(crate) trait SourceNode { + fn get_seq_end(&self) -> Box; + + fn get_schema(&self) -> Schema; +} diff --git a/src/core/storage/schemas/tsdb_data_store.rs b/src/core/storage/schemas/tsdb_data_store.rs new file mode 100644 index 0000000..6ffc74c --- /dev/null +++ b/src/core/storage/schemas/tsdb_data_store.rs @@ -0,0 +1,9 @@ +use dyn_clone::{clone_trait_object, DynClone}; + +use crate::common::storage::time_series_data_consumer::TimeSeriesDataConsumer; + +pub(crate) trait TSDBDataStore: TimeSeriesDataConsumer + DynClone { + fn id(&self) -> String; +} + +clone_trait_object!(TSDBDataStore); diff --git a/src/core/storage/schemas/tsdb_query_node.rs b/src/core/storage/schemas/tsdb_query_node.rs new file mode 100644 index 0000000..b44ce14 --- /dev/null +++ b/src/core/storage/schemas/tsdb_query_node.rs @@ -0,0 +1,8 @@ +use super::source_node::SourceNode; +use crate::common::data::time_series_data_source::TimeSeriesDataSource; + +pub(crate) trait TSDBQueryNode: TimeSeriesDataSource + SourceNode { + fn set_sent_data(&self); + + fn sent_data(&self) -> bool; +} diff --git a/src/core/uid/mod.rs b/src/core/uid/mod.rs new file mode 100644 index 0000000..ea847fe --- /dev/null +++ b/src/core/uid/mod.rs @@ -0,0 +1,3 @@ +pub mod uniqueid; +pub mod uniqueid_store; +pub mod uniqueid_type; diff --git a/src/core/uid/uniqueid.rs b/src/core/uid/uniqueid.rs new file mode 100644 index 0000000..cd1830c --- /dev/null +++ b/src/core/uid/uniqueid.rs @@ -0,0 +1,70 @@ +use bytes::Bytes; +use dyn_clone::{clone_trait_object, DynClone}; + +use super::uniqueid_type::UniqueIDType; +use crate::common::{data::time_series_datum_id::TimeSeriesDatumID, stats::span::Span}; + +pub(crate) enum UniqueIDCacheMode { + WRITEONLY(String), + READONLY(String), + READWRITE(String), +} + +impl Default for UniqueIDCacheMode { + fn default() -> Self { + todo!() + } +} + +impl UniqueIDCacheMode { + pub fn get_name(&self) -> String { + todo!() + } + + pub fn from_string(&self, name: String) -> Self { + todo!() + } + + pub fn set_mode(&mut self, name: String) { + todo!() + } +} + +#[async_trait::async_trait] +pub(crate) trait UniqueID: DynClone { + fn get_type(&self) -> UniqueIDType; + + async fn drop_caches(&self, span: Box); + + fn get_name(&self, uid: u64, span: Box) -> String; + + fn get_names(&self, uid: u64, span: Box) -> Vec; + + fn get_id(&self, name: String, span: Box) -> Bytes; + + fn get_ids(&self, names: Vec, span: Box) -> Vec; + + // TODO: lack return idorerror + fn get_or_create_id(&self, name: String, id: Box, span: Box); + + // TODO: lack return vec of idorerror + fn get_or_create_ids( + &self, + name: Vec, + id: Box, + span: Box, + ); + + fn suggest(&self, search: String, max_results: i32, span: Box) -> Vec; + + fn rename(&self, oldname: String, newname: String, span: Box); + + fn add_to_cache(&self, name: String, id: Bytes); + + // TODO: need a result + async fn delete(&self, name: String, span: Box); + + // TODO: lack convert funcs!!! +} + +clone_trait_object!(UniqueID); diff --git a/src/core/uid/uniqueid_store.rs b/src/core/uid/uniqueid_store.rs new file mode 100644 index 0000000..7e33238 --- /dev/null +++ b/src/core/uid/uniqueid_store.rs @@ -0,0 +1,49 @@ +use bytes::Bytes; +use dyn_clone::{clone_trait_object, DynClone}; + +use super::uniqueid_type::UniqueIDType; +use crate::common::{data::time_series_datum_id::TimeSeriesDatumID, stats::span::Span}; + +#[async_trait::async_trait] +pub(crate) trait UniqueIDStore: DynClone { + async fn get_id(&self, utype: UniqueIDType, name: String, span: Box) -> Bytes; + + async fn get_ids( + &self, + utype: UniqueIDType, + names: Vec, + span: Box, + ) -> Vec; + + // TODO: lack idorerror + async fn get_or_create_id( + &self, + utype: UniqueIDType, + name: String, + id: Box, + span: Box, + ); + + async fn get_or_create_ids( + &self, + utype: UniqueIDType, + names: Vec, + id: Box, + span: Box, + ); + + async fn get_name(&self, utype: UniqueIDType, id: Bytes, span: Box) -> String; + + async fn get_names( + &self, + utype: UniqueIDType, + ids: Vec, + span: Box, + ) -> Vec; + + async fn suggest(&self, utype: UniqueIDType, query: String, max: i32) -> Vec; + + fn character_set(&self, utype: UniqueIDType) -> String; +} + +clone_trait_object!(UniqueIDStore); diff --git a/src/core/uid/uniqueid_type.rs b/src/core/uid/uniqueid_type.rs new file mode 100644 index 0000000..ee5283f --- /dev/null +++ b/src/core/uid/uniqueid_type.rs @@ -0,0 +1,7 @@ +pub(crate) enum UniqueIDType { + METRIC, + TAGK, + TAGV, + NAMESPACE, + FULLSERIES, +} diff --git a/src/implementation/mod.rs b/src/implementation/mod.rs new file mode 100644 index 0000000..475d868 --- /dev/null +++ b/src/implementation/mod.rs @@ -0,0 +1 @@ +pub mod server_undertow; diff --git a/src/implementation/server_undertow/command.rs b/src/implementation/server_undertow/command.rs new file mode 100644 index 0000000..4cd8230 --- /dev/null +++ b/src/implementation/server_undertow/command.rs @@ -0,0 +1,59 @@ +use clap::{Command, Parser}; + +use crate::common::configuration::configuration::Configuration; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +pub(crate) struct Args { + #[arg(long, default_value_t = 6667)] + pub http_port_key: i32, + #[arg(long, default_value_t = 7776)] + tls_port_key: i32, + #[arg(long, default_value = "0.0.0.0", required = true)] + bind_key: String, + #[arg(long, default_value = "/", required = true)] + root_key: String, + #[arg(long, default_value = "true", required = true)] + load_plugins_key: bool, + #[arg(long)] + keystore_key: String, + #[arg(long)] + tls_cert_key: String, + #[arg(long)] + tls_key_key: String, + #[arg(long)] + keystore_pass_key: String, + #[arg(long)] + tls_secret_cert_key: String, + #[arg(long)] + tls_secret_key_key: String, + #[arg(long)] + tls_ca_key: String, + #[arg(long)] + tls_verify_client_key: String, + #[arg(long)] + tls_protocols_key: String, + #[arg(long)] + tls_cipher_key: String, + #[arg(long)] + directory_key: String, + #[arg(long, default_value_t = 5 * 60 * 1000)] + read_to_key: i32, + #[arg(long, default_value_t = 5 * 60 * 1000)] + write_to_key: i32, + #[arg(long, default_value_t = 15 * 60 * 1000)] + no_request_key: i32, + #[arg(long, default_value = "combined")] + access_log_format_key: String, + #[arg(long)] + record_request_start_time: String, + #[arg(long)] + rewrite_key: String, +} + +pub fn build_command() -> Configuration { + let arg = Args::parse(); + let mut config = Configuration::new(); + + config +} diff --git a/src/implementation/server_undertow/mod.rs b/src/implementation/server_undertow/mod.rs new file mode 100644 index 0000000..20aecaa --- /dev/null +++ b/src/implementation/server_undertow/mod.rs @@ -0,0 +1,2 @@ +pub mod command; +pub mod tsd_main; diff --git a/src/implementation/server_undertow/tsd_main.rs b/src/implementation/server_undertow/tsd_main.rs new file mode 100644 index 0000000..6d26094 --- /dev/null +++ b/src/implementation/server_undertow/tsd_main.rs @@ -0,0 +1,60 @@ +use log; + +use super::command::build_command; +use crate::core::coredb::default_tsdb::DefaultTSDB; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let config = build_command(); + let port = config.get_port(); + let ssl_port = config.get_ssl_port(); + if port < 1 || ssl_port < 1 { + log::error!("Must provide a HTTP or SSL port."); + return Err(anyhow::anyhow!("Crash because port or ssl port wrong.")); + } + + let bind = config.get_bind(); + let root = config.get_root(); + let load_plugins = config.get_load_plugins(); + + let tsdb = DefaultTSDB::new(config); + + if load_plugins { + let res = tsdb.initialize_registry(true).await; + + if res.is_err() { + return Err(anyhow::anyhow!("Failed to initialize TSDB registry")); + } + } + + // Make sure shutdown gracefully. + // register_shutdown_hook(server, tsdb); + + // build http server with tsdb + + // check if config has directory key + + // load auth + + // deploy tsdb + + // start http handler + + // start query handler + + // start encoding handler + + // start stats collector handler + + // start rewrite handler + + // start access log handler + + // real start http listener + + // setup ssl + + // start + + Err(anyhow::anyhow!("Nothing to do")) +} diff --git a/src/lib.rs b/src/lib.rs index 1439820..ecf7778 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,12 +2,13 @@ #![feature(unboxed_closures)] #![feature(associated_type_defaults)] #![feature(return_position_impl_trait_in_trait)] -#[allow(unused)] -#[allow(clippy::upper_case_acronyms)] -#[allow(unused_variables)] -#[allow(unused_must_use)] -#[allow(unused_imports)] -#[allow(clippy::module_inception)] +// #[allow(unused)] +// #[allow(clippy::upper_case_acronyms)] +// #[allow(unused_variables)] +// #[allow(unused_must_use)] +// #[allow(unused_imports)] +// #[allow(clippy::module_inception)] pub mod common; pub mod core; +pub mod implementation; pub mod storage;