Skip to content

Commit

Permalink
migrate extensions to configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
f1shl3gs committed Feb 10, 2023
1 parent 636207a commit 559285f
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 102 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ extensions = [
"extensions-heartbeat",
# Musl does not provide "pthread_getname_np" in current release.
# https://github.com/tikv/pprof-rs/issues/41
# "extensions-pprof"
"extensions-pprof"
]

extensions-heartbeat = ["uuid", "sysinfo", "parking_lot/serde"]
Expand Down
6 changes: 5 additions & 1 deletion lib/configurable-derive/src/configurable_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn configurable_component_impl(

// inventory staff
let desc_type: syn::Type = match component_type {
ComponentType::Extension => parse_quote!(::configurable::component::ExtensionDescription),
ComponentType::Provider => parse_quote! { ::configurable::component::ProviderDescription },
ComponentType::Source => parse_quote! { ::configurable::component::SourceDescription },
ComponentType::Transform => {
Expand Down Expand Up @@ -105,6 +106,7 @@ pub fn configurable_component_impl(

#[derive(Clone, Debug, Default)]
pub enum ComponentType {
Extension,
Provider,
#[default]
Source,
Expand Down Expand Up @@ -142,6 +144,8 @@ impl StructAttrs {
if let Some(m) = errs.expect_meta_name_value(meta) {
parse_attr_litstr(errs, m, &mut this.description);
}
} else if name.is_ident("extension") {
this.component_type = Some(ComponentType::Extension)
} else if name.is_ident("provider") {
this.component_type = Some(ComponentType::Provider);
} else if name.is_ident("source") {
Expand All @@ -157,7 +161,7 @@ impl StructAttrs {
} else {
errs.err(
&name.span(),
"Expect `name`, `description`, `provider`, `source`, `transforms` or `sink`",
"Expect `name`, `description`, `extension`, `provider`, `source`, `transforms` or `sink`",
)
}
}
Expand Down
4 changes: 1 addition & 3 deletions lib/framework/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,9 @@ pub struct ExtensionContext {

#[async_trait]
#[typetag::serde(tag = "type")]
pub trait ExtensionConfig: core::fmt::Debug + Send + Sync {
pub trait ExtensionConfig: NamedComponent + Debug + Send + Sync {
async fn build(&self, cx: ExtensionContext) -> crate::Result<Extension>;

fn extension_type(&self) -> &'static str;

fn resources(&self) -> Vec<Resource> {
Vec::new()
}
Expand Down
2 changes: 1 addition & 1 deletion lib/framework/src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ pub async fn build_pieces(
.iter()
.filter(|(name, _)| diff.extensions.contains_new(name))
{
let typetag = extension.extension_type();
let typetag = extension.component_name();
let (shutdown_signal, force_shutdown_tripwire) =
shutdown_coordinator.register_extension(key);
let cx = ExtensionContext {
Expand Down
37 changes: 18 additions & 19 deletions src/extensions/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};

use framework::config::{ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig};
use configurable::configurable_component;
use framework::config::{ExtensionConfig, ExtensionContext};
use framework::Extension;
use futures_util::FutureExt;
use http::{Request, Response, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Server};
use serde::{Deserialize, Serialize};

static READINESS: AtomicBool = AtomicBool::new(false);

Expand All @@ -20,26 +20,19 @@ fn default_endpoint() -> SocketAddr {
"0.0.0.0:13133".parse().unwrap()
}

#[derive(Debug, Deserialize, Serialize)]
#[configurable_component(extension, name = "healthcheck")]
#[derive(Debug)]
#[serde(deny_unknown_fields)]
pub struct HealthcheckConfig {
pub struct Config {
/// Which address to listen for.
#[serde(default = "default_endpoint")]
#[configurable(required)]
pub endpoint: SocketAddr,
}

impl GenerateConfig for HealthcheckConfig {
fn generate_config() -> String {
"endpoint: 0.0.0.0:13133".into()
}
}

inventory::submit! {
ExtensionDescription::new::<HealthcheckConfig>("healthcheck")
}

#[async_trait::async_trait]
#[typetag::serde(name = "healthcheck")]
impl ExtensionConfig for HealthcheckConfig {
impl ExtensionConfig for Config {
async fn build(&self, cx: ExtensionContext) -> crate::Result<Extension> {
info!(
message = "start healthcheck server",
Expand All @@ -61,10 +54,6 @@ impl ExtensionConfig for HealthcheckConfig {
Ok(())
}))
}

fn extension_type(&self) -> &'static str {
"healthcheck"
}
}

async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
Expand Down Expand Up @@ -95,3 +84,13 @@ async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
.body(Body::from(body))
.unwrap())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::testing::test_generate_config::<Config>()
}
}
51 changes: 22 additions & 29 deletions src/extensions/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use std::time::Duration;

use async_trait::async_trait;
use chrono::SecondsFormat;
use framework::config::{
ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig, ProxyConfig, UriSerde,
};
use configurable::configurable_component;
use framework::config::{ExtensionConfig, ExtensionContext, ProxyConfig, UriSerde};
use framework::http::HttpClient;
use framework::tls::{TlsConfig, TlsSettings};
use framework::{Extension, ShutdownSignal};
Expand All @@ -17,44 +16,32 @@ use hyper::Body;
use nix::net::if_::InterfaceFlags;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde::Serialize;
use tokio::select;

const fn default_interval() -> Duration {
Duration::from_secs(60)
}

#[derive(Debug, Deserialize, Serialize)]
#[configurable_component(extension, name = "heartbeat")]
#[derive(Debug)]
#[serde(deny_unknown_fields)]
pub struct Config {
/// POST some state of vertex to remote endpoint.
/// then we can do much more, e.g. service discovery.
#[configurable(required)]
endpoint: UriSerde,

tls: Option<TlsConfig>,

/// Duration of each heartbeat sending.
#[serde(default = "default_interval", with = "humanize::duration::serde")]
interval: Duration,

#[serde(default)]
tags: BTreeMap<String, String>,
}

impl GenerateConfig for Config {
fn generate_config() -> String {
r#"# POST some state of vertex to remote endpoint.
# then we can do much more, e.g. service discovery
endpoint: https://example.com/heartbeat
# duration of each heartbeat sending
#
# Optional, default 60s
# interval: 1m
"#
.to_string()
}
}

inventory::submit! {
ExtensionDescription::new::<Config>("heartbeat")
}

#[async_trait]
#[typetag::serde(name = "heartbeat")]
impl ExtensionConfig for Config {
Expand All @@ -81,10 +68,6 @@ impl ExtensionConfig for Config {
cx.shutdown,
)))
}

fn extension_type(&self) -> &'static str {
"heartbeat"
}
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -122,7 +105,7 @@ pub fn report_config(config: &framework::config::Config) {
resources.extend(ext.resources().into_iter().filter_map(|r| match r {
framework::config::Resource::Port(addr, _) => Some(Resource {
name: key.to_string(),
component_type: ext.extension_type().to_string(),
component_type: ext.component_name().to_string(),
address: format!("{}", addr.ip()),
port: addr.port(),
}),
Expand Down Expand Up @@ -242,3 +225,13 @@ fn get_advertise_addr() -> std::io::Result<String> {
addrs[0].to_string()
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::testing::test_generate_config::<Config>()
}
}
54 changes: 26 additions & 28 deletions src/extensions/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,55 @@ use std::convert::Infallible;
use std::net::SocketAddr;
use std::time::Duration;

use framework::config::{ExtensionConfig, ExtensionContext, ExtensionDescription, GenerateConfig};
use configurable::configurable_component;
use framework::config::{ExtensionConfig, ExtensionContext};
use framework::shutdown::ShutdownSignal;
use framework::Extension;
use futures::FutureExt;
use http::Request;
use humanize::{duration, parse_duration};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Response, Server, StatusCode};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tikv_jemalloc_ctl::{stats, Access, AsName};

const OUTPUT: &str = "profile.out";
const PROF_ACTIVE: &'static [u8] = b"prof.active\0";
const PROF_DUMP: &'static [u8] = b"prof.dump\0";
const PROFILE_OUTPUT: &'static [u8] = b"profile.out\0";

inventory::submit! {
ExtensionDescription::new::<JemallocConfig>("jemalloc")
}
const PROF_ACTIVE: &[u8] = b"prof.active\0";
const PROF_DUMP: &[u8] = b"prof.dump\0";
const PROFILE_OUTPUT: &[u8] = b"profile.out\0";

fn default_listen() -> SocketAddr {
"0.0.0.0:10911".parse().unwrap()
}

#[derive(Debug, Deserialize, Serialize)]
#[configurable_component(extension, name = "jemalloc")]
#[derive(Debug)]
#[serde(deny_unknown_fields)]
struct JemallocConfig {
struct Config {
#[serde(default = "default_listen")]
#[configurable(required)]
pub listen: SocketAddr,
}

impl Default for JemallocConfig {
impl Default for Config {
fn default() -> Self {
Self {
listen: default_listen(),
}
}
}

impl GenerateConfig for JemallocConfig {
fn generate_config() -> String {
format!("listen: {}", default_listen().to_string())
}
}

#[derive(Debug, Error)]
pub enum BuildError {
#[snafu(display("MALLOC_CONF is not set, {}", source))]
#[error("MALLOC_CONF is not set, {source}")]
EnvNotSet { source: std::env::VarError },
#[snafu(display("MALLOC_CONF is set but prof is not enabled"))]

#[error("MALLOC_CONF is set but prof is not enabled")]
ProfileNotEnabled,
}

#[async_trait::async_trait]
#[typetag::serde(name = "jemalloc")]
impl ExtensionConfig for JemallocConfig {
impl ExtensionConfig for Config {
async fn build(&self, ctx: ExtensionContext) -> crate::Result<Extension> {
match std::env::var("MALLOC_CONF") {
Ok(value) => {
Expand All @@ -72,10 +64,6 @@ impl ExtensionConfig for JemallocConfig {

Ok(Box::pin(run(self.listen, ctx.shutdown)))
}

fn extension_type(&self) -> &'static str {
"jemalloc"
}
}

async fn run(addr: SocketAddr, shutdown: ShutdownSignal) -> Result<(), ()> {
Expand Down Expand Up @@ -134,13 +122,13 @@ async fn profiling(req: Request<Body>) -> Result<Response<Body>, Infallible> {

let default = Duration::from_secs(30);
let wait = match params.get("seconds") {
Some(value) => parse_duration(value).unwrap_or(default),
Some(value) => humanize::duration::parse_duration(value).unwrap_or(default),
_ => default,
};

info!(
message = "Starting jemalloc profile",
wait = duration(&wait).as_str()
wait = humanize::duration::duration(&wait).as_str()
);
set_prof_active(true);
tokio::time::sleep(wait).await;
Expand All @@ -161,3 +149,13 @@ fn dump_profile() {
name.write(PROFILE_OUTPUT)
.expect("Should succeed to dump profile")
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::testing::test_generate_config::<Config>()
}
}
Loading

0 comments on commit 559285f

Please sign in to comment.