From 559285f1f0dd632995ce1fdcabd5657ad3918a83 Mon Sep 17 00:00:00 2001 From: f1shl3gs Date: Fri, 10 Feb 2023 20:27:56 +0800 Subject: [PATCH] migrate extensions to configurable --- Cargo.toml | 2 +- .../src/configurable_component.rs | 6 ++- lib/framework/src/config/mod.rs | 4 +- lib/framework/src/topology/builder.rs | 2 +- src/extensions/healthcheck.rs | 37 +++++++------ src/extensions/heartbeat.rs | 51 ++++++++---------- src/extensions/jemalloc.rs | 54 +++++++++---------- src/extensions/pprof.rs | 35 ++++++------ 8 files changed, 89 insertions(+), 102 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fb01c8aa7..193b545c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,7 +142,7 @@ extensions = [ "extensions-heartbeat", # Musl does not provide "pthread_getname_np" in current release. # https://github.com/tikv/pprof-rs/issues/41 - # "extensions-pprof" + "extensions-pprof" ] extensions-heartbeat = ["uuid", "sysinfo", "parking_lot/serde"] diff --git a/lib/configurable-derive/src/configurable_component.rs b/lib/configurable-derive/src/configurable_component.rs index e3f5c7bfe..aaa92631b 100644 --- a/lib/configurable-derive/src/configurable_component.rs +++ b/lib/configurable-derive/src/configurable_component.rs @@ -44,6 +44,7 @@ pub fn configurable_component_impl( // inventory staff let desc_type: syn::Type = match component_type { + ComponentType::Extension => parse_quote!(::configurable::component::ExtensionDescription), ComponentType::Provider => parse_quote! { ::configurable::component::ProviderDescription }, ComponentType::Source => parse_quote! { ::configurable::component::SourceDescription }, ComponentType::Transform => { @@ -105,6 +106,7 @@ pub fn configurable_component_impl( #[derive(Clone, Debug, Default)] pub enum ComponentType { + Extension, Provider, #[default] Source, @@ -142,6 +144,8 @@ impl StructAttrs { if let Some(m) = errs.expect_meta_name_value(meta) { parse_attr_litstr(errs, m, &mut this.description); } + } else if name.is_ident("extension") { + this.component_type = Some(ComponentType::Extension) } else if name.is_ident("provider") { this.component_type = Some(ComponentType::Provider); } else if name.is_ident("source") { @@ -157,7 +161,7 @@ impl StructAttrs { } else { errs.err( &name.span(), - "Expect `name`, `description`, `provider`, `source`, `transforms` or `sink`", + "Expect `name`, `description`, `extension`, `provider`, `source`, `transforms` or `sink`", ) } } diff --git a/lib/framework/src/config/mod.rs b/lib/framework/src/config/mod.rs index f6ca17bdc..6a58a051f 100644 --- a/lib/framework/src/config/mod.rs +++ b/lib/framework/src/config/mod.rs @@ -552,11 +552,9 @@ pub struct ExtensionContext { #[async_trait] #[typetag::serde(tag = "type")] -pub trait ExtensionConfig: core::fmt::Debug + Send + Sync { +pub trait ExtensionConfig: NamedComponent + Debug + Send + Sync { async fn build(&self, cx: ExtensionContext) -> crate::Result; - fn extension_type(&self) -> &'static str; - fn resources(&self) -> Vec { Vec::new() } diff --git a/lib/framework/src/topology/builder.rs b/lib/framework/src/topology/builder.rs index 83c48dd98..a239c4e2d 100644 --- a/lib/framework/src/topology/builder.rs +++ b/lib/framework/src/topology/builder.rs @@ -322,7 +322,7 @@ pub async fn build_pieces( .iter() .filter(|(name, _)| diff.extensions.contains_new(name)) { - let typetag = extension.extension_type(); + let typetag = extension.component_name(); let (shutdown_signal, force_shutdown_tripwire) = shutdown_coordinator.register_extension(key); let cx = ExtensionContext { diff --git a/src/extensions/healthcheck.rs b/src/extensions/healthcheck.rs index 1f4766825..5d8525340 100644 --- a/src/extensions/healthcheck.rs +++ b/src/extensions/healthcheck.rs @@ -2,13 +2,13 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use framework::config::{ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig}; +use configurable::configurable_component; +use framework::config::{ExtensionConfig, ExtensionContext}; use framework::Extension; use futures_util::FutureExt; use http::{Request, Response, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Server}; -use serde::{Deserialize, Serialize}; static READINESS: AtomicBool = AtomicBool::new(false); @@ -20,26 +20,19 @@ fn default_endpoint() -> SocketAddr { "0.0.0.0:13133".parse().unwrap() } -#[derive(Debug, Deserialize, Serialize)] +#[configurable_component(extension, name = "healthcheck")] +#[derive(Debug)] #[serde(deny_unknown_fields)] -pub struct HealthcheckConfig { +pub struct Config { + /// Which address to listen for. #[serde(default = "default_endpoint")] + #[configurable(required)] pub endpoint: SocketAddr, } -impl GenerateConfig for HealthcheckConfig { - fn generate_config() -> String { - "endpoint: 0.0.0.0:13133".into() - } -} - -inventory::submit! { - ExtensionDescription::new::("healthcheck") -} - #[async_trait::async_trait] #[typetag::serde(name = "healthcheck")] -impl ExtensionConfig for HealthcheckConfig { +impl ExtensionConfig for Config { async fn build(&self, cx: ExtensionContext) -> crate::Result { info!( message = "start healthcheck server", @@ -61,10 +54,6 @@ impl ExtensionConfig for HealthcheckConfig { Ok(()) })) } - - fn extension_type(&self) -> &'static str { - "healthcheck" - } } async fn handle(req: Request) -> Result, Infallible> { @@ -95,3 +84,13 @@ async fn handle(req: Request) -> Result, Infallible> { .body(Body::from(body)) .unwrap()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::testing::test_generate_config::() + } +} diff --git a/src/extensions/heartbeat.rs b/src/extensions/heartbeat.rs index 61ec480cc..12130f3ff 100644 --- a/src/extensions/heartbeat.rs +++ b/src/extensions/heartbeat.rs @@ -6,9 +6,8 @@ use std::time::Duration; use async_trait::async_trait; use chrono::SecondsFormat; -use framework::config::{ - ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig, ProxyConfig, UriSerde, -}; +use configurable::configurable_component; +use framework::config::{ExtensionConfig, ExtensionContext, ProxyConfig, UriSerde}; use framework::http::HttpClient; use framework::tls::{TlsConfig, TlsSettings}; use framework::{Extension, ShutdownSignal}; @@ -17,18 +16,25 @@ use hyper::Body; use nix::net::if_::InterfaceFlags; use once_cell::sync::Lazy; use parking_lot::Mutex; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use tokio::select; const fn default_interval() -> Duration { Duration::from_secs(60) } -#[derive(Debug, Deserialize, Serialize)] +#[configurable_component(extension, name = "heartbeat")] +#[derive(Debug)] #[serde(deny_unknown_fields)] pub struct Config { + /// POST some state of vertex to remote endpoint. + /// then we can do much more, e.g. service discovery. + #[configurable(required)] endpoint: UriSerde, + tls: Option, + + /// Duration of each heartbeat sending. #[serde(default = "default_interval", with = "humanize::duration::serde")] interval: Duration, @@ -36,25 +42,6 @@ pub struct Config { tags: BTreeMap, } -impl GenerateConfig for Config { - fn generate_config() -> String { - r#"# POST some state of vertex to remote endpoint. -# then we can do much more, e.g. service discovery -endpoint: https://example.com/heartbeat - -# duration of each heartbeat sending -# -# Optional, default 60s -# interval: 1m -"# - .to_string() - } -} - -inventory::submit! { - ExtensionDescription::new::("heartbeat") -} - #[async_trait] #[typetag::serde(name = "heartbeat")] impl ExtensionConfig for Config { @@ -81,10 +68,6 @@ impl ExtensionConfig for Config { cx.shutdown, ))) } - - fn extension_type(&self) -> &'static str { - "heartbeat" - } } #[derive(Debug, Serialize)] @@ -122,7 +105,7 @@ pub fn report_config(config: &framework::config::Config) { resources.extend(ext.resources().into_iter().filter_map(|r| match r { framework::config::Resource::Port(addr, _) => Some(Resource { name: key.to_string(), - component_type: ext.extension_type().to_string(), + component_type: ext.component_name().to_string(), address: format!("{}", addr.ip()), port: addr.port(), }), @@ -242,3 +225,13 @@ fn get_advertise_addr() -> std::io::Result { addrs[0].to_string() }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::testing::test_generate_config::() + } +} diff --git a/src/extensions/jemalloc.rs b/src/extensions/jemalloc.rs index 6882ffe76..f8e29f847 100644 --- a/src/extensions/jemalloc.rs +++ b/src/extensions/jemalloc.rs @@ -3,39 +3,36 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::time::Duration; -use framework::config::{ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig}; +use configurable::configurable_component; +use framework::config::{ExtensionConfig, ExtensionContext}; use framework::shutdown::ShutdownSignal; use framework::Extension; use futures::FutureExt; use http::Request; -use humanize::{duration, parse_duration}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Response, Server, StatusCode}; -use serde::{Deserialize, Serialize}; use thiserror::Error; use tikv_jemalloc_ctl::{stats, Access, AsName}; const OUTPUT: &str = "profile.out"; -const PROF_ACTIVE: &'static [u8] = b"prof.active\0"; -const PROF_DUMP: &'static [u8] = b"prof.dump\0"; -const PROFILE_OUTPUT: &'static [u8] = b"profile.out\0"; - -inventory::submit! { - ExtensionDescription::new::("jemalloc") -} +const PROF_ACTIVE: &[u8] = b"prof.active\0"; +const PROF_DUMP: &[u8] = b"prof.dump\0"; +const PROFILE_OUTPUT: &[u8] = b"profile.out\0"; fn default_listen() -> SocketAddr { "0.0.0.0:10911".parse().unwrap() } -#[derive(Debug, Deserialize, Serialize)] +#[configurable_component(extension, name = "jemalloc")] +#[derive(Debug)] #[serde(deny_unknown_fields)] -struct JemallocConfig { +struct Config { #[serde(default = "default_listen")] + #[configurable(required)] pub listen: SocketAddr, } -impl Default for JemallocConfig { +impl Default for Config { fn default() -> Self { Self { listen: default_listen(), @@ -43,23 +40,18 @@ impl Default for JemallocConfig { } } -impl GenerateConfig for JemallocConfig { - fn generate_config() -> String { - format!("listen: {}", default_listen().to_string()) - } -} - #[derive(Debug, Error)] pub enum BuildError { - #[snafu(display("MALLOC_CONF is not set, {}", source))] + #[error("MALLOC_CONF is not set, {source}")] EnvNotSet { source: std::env::VarError }, - #[snafu(display("MALLOC_CONF is set but prof is not enabled"))] + + #[error("MALLOC_CONF is set but prof is not enabled")] ProfileNotEnabled, } #[async_trait::async_trait] #[typetag::serde(name = "jemalloc")] -impl ExtensionConfig for JemallocConfig { +impl ExtensionConfig for Config { async fn build(&self, ctx: ExtensionContext) -> crate::Result { match std::env::var("MALLOC_CONF") { Ok(value) => { @@ -72,10 +64,6 @@ impl ExtensionConfig for JemallocConfig { Ok(Box::pin(run(self.listen, ctx.shutdown))) } - - fn extension_type(&self) -> &'static str { - "jemalloc" - } } async fn run(addr: SocketAddr, shutdown: ShutdownSignal) -> Result<(), ()> { @@ -134,13 +122,13 @@ async fn profiling(req: Request) -> Result, Infallible> { let default = Duration::from_secs(30); let wait = match params.get("seconds") { - Some(value) => parse_duration(value).unwrap_or(default), + Some(value) => humanize::duration::parse_duration(value).unwrap_or(default), _ => default, }; info!( message = "Starting jemalloc profile", - wait = duration(&wait).as_str() + wait = humanize::duration::duration(&wait).as_str() ); set_prof_active(true); tokio::time::sleep(wait).await; @@ -161,3 +149,13 @@ fn dump_profile() { name.write(PROFILE_OUTPUT) .expect("Should succeed to dump profile") } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::testing::test_generate_config::() + } +} diff --git a/src/extensions/pprof.rs b/src/extensions/pprof.rs index 785dc8307..eb59fe439 100644 --- a/src/extensions/pprof.rs +++ b/src/extensions/pprof.rs @@ -2,7 +2,8 @@ use std::convert::Infallible; use std::net::SocketAddr; use bytes::{BufMut, BytesMut}; -use framework::config::{ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig}; +use configurable::configurable_component; +use framework::config::{ExtensionConfig, ExtensionContext}; use framework::shutdown::ShutdownSignal; use framework::Extension; use futures::FutureExt; @@ -12,26 +13,24 @@ use hyper::{ Body, Request, Response, Server, }; use pprof::protos::Message; -use serde::{Deserialize, Serialize}; const DEFAULT_PROFILE_SECONDS: u64 = 30; -#[derive(Debug, Deserialize, Serialize)] +#[configurable_component(extension, name = "pprof")] +#[derive(Debug)] #[serde(deny_unknown_fields)] -pub struct PprofConfig { +pub struct Config { + /// Which address the pprof server will listen + #[configurable(required)] pub listen: SocketAddr, } #[async_trait::async_trait] #[typetag::serde(name = "pprof")] -impl ExtensionConfig for PprofConfig { +impl ExtensionConfig for Config { async fn build(&self, cx: ExtensionContext) -> crate::Result { Ok(Box::pin(run(self.listen, cx.shutdown))) } - - fn extension_type(&self) -> &'static str { - "pprof" - } } async fn run(addr: SocketAddr, shutdown: ShutdownSignal) -> Result<(), ()> { @@ -98,16 +97,12 @@ async fn handle(req: Request) -> Result, Infallible> { } } -impl GenerateConfig for PprofConfig { - fn generate_config() -> String { - r#" -# Which address the pprof server will listen -listen: 0.0.0.0:10910 -"# - .into() - } -} +#[cfg(test)] +mod tests { + use super::*; -inventory::submit! { - ExtensionDescription::new::("pprof") + #[test] + fn generate_config() { + crate::testing::test_generate_config::() + } }