diff --git a/Cargo.lock b/Cargo.lock index 13e05ac06c71..3bd96784858f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,7 +339,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", - "chrono-tz", + "chrono-tz 0.8.6", "half 2.4.1", "hashbrown 0.14.5", "num", @@ -771,6 +771,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -1388,7 +1389,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.2.1", + "phf", +] + +[[package]] +name = "chrono-tz" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +dependencies = [ + "chrono", + "chrono-tz-build 0.3.0", "phf", ] @@ -1403,6 +1415,17 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "chrono-tz-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "chunked_transfer" version = "1.5.0" @@ -2141,7 +2164,7 @@ version = "0.8.2" dependencies = [ "arrow", "chrono", - "chrono-tz", + "chrono-tz 0.8.6", "common-error", "common-macro", "once_cell", @@ -3544,6 +3567,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -3913,6 +3945,7 @@ dependencies = [ "common-time", "common-version", "datanode", + "datatypes", "futures", "humantime-serde", "lazy_static", @@ -3922,12 +3955,14 @@ dependencies = [ "opentelemetry-proto 0.5.0", "operator", "partition", + "pipeline", "prometheus", "prost 0.12.6", "query", "raft-engine", "script", "serde", + "serde_json", "servers", "session", "snafu 0.8.3", @@ -5941,6 +5976,24 @@ dependencies = [ "rand_core", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multimap" version = "0.8.3" @@ -6756,7 +6809,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "chrono-tz", + "chrono-tz 0.8.6", "datafusion 37.1.0", "datafusion-expr 37.1.0", "datafusion-physical-expr 37.1.0", @@ -7229,6 +7282,58 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pipeline" +version = "0.8.2" +dependencies = [ + "api", + "arrow", + "async-trait", + "catalog", + "chrono", + "chrono-tz 0.9.0", + "common-catalog", + "common-error", + "common-function", + "common-macro", + "common-meta", + "common-query", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "common-time", + "crossbeam-utils", + "csv", + "datafusion 38.0.0", + "datafusion-common 38.0.0", + "datafusion-expr 38.0.0", + "datafusion-functions 38.0.0", + "datafusion-physical-expr 38.0.0", + "datatypes", + "futures", + "greptime-proto", + "itertools 0.10.5", + "lazy_static", + "moka", + "once_cell", + "operator", + "paste", + "prometheus", + "query", + "rayon", + "regex", + "ron", + "serde", + "serde_json", + "session", + "snafu 0.8.3", + "sql", + "table", + "tokio", + "urlencoding", + "yaml-rust", +] + [[package]] name = "pkcs1" version = "0.3.3" @@ -9637,6 +9742,7 @@ dependencies = [ "permutation", "pgwire", "pin-project", + "pipeline", "postgres-types", "pprof", "prometheus", diff --git a/Cargo.toml b/Cargo.toml index 9241fe632ebf..0cb9d9b93999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ members = [ "src/object-store", "src/operator", "src/partition", + "src/pipeline", "src/plugins", "src/promql", "src/puffin", @@ -224,6 +225,7 @@ mito2 = { path = "src/mito2" } object-store = { path = "src/object-store" } operator = { path = "src/operator" } partition = { path = "src/partition" } +pipeline = { path = "src/pipeline" } plugins = { path = "src/plugins" } promql = { path = "src/promql" } puffin = { path = "src/puffin" } diff --git a/src/auth/src/permission.rs b/src/auth/src/permission.rs index 9a8c2a243de7..57afda471c8b 100644 --- a/src/auth/src/permission.rs +++ b/src/auth/src/permission.rs @@ -30,6 +30,7 @@ pub enum PermissionReq<'a> { PromStoreWrite, PromStoreRead, Otlp, + LogWrite, } #[derive(Debug)] diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 2b8d5c746f60..56f4ab904ae2 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -44,6 +44,7 @@ meta-client.workspace = true opentelemetry-proto.workspace = true operator.workspace = true partition.workspace = true +pipeline.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true @@ -62,11 +63,13 @@ toml.workspace = true tonic.workspace = true [dev-dependencies] -catalog.workspace = true +catalog = { workspace = true, features = ["testing"] } common-test-util.workspace = true datanode.workspace = true +datatypes.workspace = true futures = "0.3" meta-srv = { workspace = true, features = ["mock"] } +serde_json.workspace = true strfmt = "0.2" tower.workspace = true uuid.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 29c832afe595..ecc04af789b1 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -15,6 +15,7 @@ pub mod builder; mod grpc; mod influxdb; +mod log_handler; mod opentsdb; mod otlp; mod prom_store; @@ -48,6 +49,7 @@ use meta_client::MetaClientOptions; use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::statement::StatementExecutor; +use pipeline::pipeline_operator::PipelineOperator; use prometheus::HistogramTimer; use query::metrics::OnDone; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; @@ -66,7 +68,7 @@ use servers::prometheus_handler::PrometheusHandler; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ - InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, + InfluxdbLineProtocolHandler, LogHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, }; use servers::server::ServerHandlers; @@ -100,6 +102,7 @@ pub trait FrontendInstance: + OpenTelemetryProtocolHandler + ScriptHandler + PrometheusHandler + + LogHandler + Send + Sync + 'static @@ -108,12 +111,12 @@ pub trait FrontendInstance: } pub type FrontendInstanceRef = Arc; -pub type StatementExecutorRef = Arc; #[derive(Clone)] pub struct Instance { catalog_manager: CatalogManagerRef, script_executor: Arc, + pipeline_operator: Arc, statement_executor: Arc, query_engine: QueryEngineRef, plugins: Plugins, diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index f0993458a6aa..ae8d77dd20b5 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -27,9 +27,10 @@ use operator::delete::Deleter; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; use operator::request::Requester; -use operator::statement::StatementExecutor; +use operator::statement::{StatementExecutor, StatementExecutorRef}; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; +use pipeline::pipeline_operator::PipelineOperator; use query::QueryEngineFactory; use servers::server::ServerHandlers; use snafu::OptionExt; @@ -37,7 +38,7 @@ use snafu::OptionExt; use crate::error::{self, Result}; use crate::heartbeat::HeartbeatTask; use crate::instance::region_query::FrontendRegionQueryHandler; -use crate::instance::{Instance, StatementExecutorRef}; +use crate::instance::Instance; use crate::script::ScriptExecutor; /// The frontend [`Instance`] builder. @@ -172,11 +173,19 @@ impl FrontendBuilder { table_route_cache, )); + let pipeline_operator = Arc::new(PipelineOperator::new( + inserter.clone(), + statement_executor.clone(), + self.catalog_manager.clone(), + query_engine.clone(), + )); + plugins.insert::(statement_executor.clone()); Ok(Instance { catalog_manager: self.catalog_manager, script_executor, + pipeline_operator, statement_executor, query_engine, plugins, diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs new file mode 100644 index 000000000000..6ef48205cc56 --- /dev/null +++ b/src/frontend/src/instance/log_handler.rs @@ -0,0 +1,93 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::RowInsertRequests; +use async_trait::async_trait; +use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use client::Output; +use common_error::ext::BoxedError; +use pipeline::table::PipelineVersion; +use pipeline::{GreptimeTransformer, Pipeline}; +use servers::error::{ + AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, + UnsupportedDeletePipelineSnafu, +}; +use servers::query_handler::LogHandler; +use session::context::QueryContextRef; +use snafu::ResultExt; + +use crate::instance::Instance; + +#[async_trait] +impl LogHandler for Instance { + async fn insert_logs( + &self, + log: RowInsertRequests, + ctx: QueryContextRef, + ) -> ServerResult { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::LogWrite) + .context(AuthSnafu)?; + + self.handle_log_inserts(log, ctx).await + } + + async fn get_pipeline( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> ServerResult>> { + self.pipeline_operator + .get_pipeline(query_ctx, name, version) + .await + .context(PipelineSnafu) + } + + async fn insert_pipeline( + &self, + name: &str, + content_type: &str, + pipeline: &str, + query_ctx: QueryContextRef, + ) -> ServerResult<()> { + self.pipeline_operator + .insert_pipeline(name, content_type, pipeline, query_ctx) + .await + .context(PipelineSnafu) + } + + async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> { + // TODO(qtang): impl delete + Err(UnsupportedDeletePipelineSnafu {}.build()) + } +} + +impl Instance { + pub async fn handle_log_inserts( + &self, + log: RowInsertRequests, + ctx: QueryContextRef, + ) -> ServerResult { + self.inserter + .handle_log_inserts(log, ctx, self.statement_executor.as_ref()) + .await + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu) + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index a72269f863e6..1433a595ce81 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -90,6 +90,8 @@ where Some(self.instance.clone()), ); + builder = builder.with_log_ingest_handler(self.instance.clone()); + if let Some(user_provider) = self.plugins.get::() { builder = builder.with_user_provider(user_provider); } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 7a04ebdee5d9..4bae21f987b9 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -66,6 +66,22 @@ pub struct Inserter { pub type InserterRef = Arc; +enum AutoCreateTableType { + Logical(String), + Physical, + Log, +} + +impl AutoCreateTableType { + fn as_str(&self) -> &'static str { + match self { + AutoCreateTableType::Logical(_) => "logical", + AutoCreateTableType::Physical => "physical", + AutoCreateTableType::Log => "log", + } + } +} + impl Inserter { pub fn new( catalog_manager: CatalogManagerRef, @@ -108,7 +124,42 @@ impl Inserter { validate_column_count_match(&requests)?; let table_name_to_ids = self - .create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor) + .create_or_alter_tables_on_demand( + &requests, + &ctx, + AutoCreateTableType::Physical, + statement_executor, + ) + .await?; + let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref()) + .convert(requests) + .await?; + + self.do_request(inserts, &ctx).await + } + + pub async fn handle_log_inserts( + &self, + mut requests: RowInsertRequests, + ctx: QueryContextRef, + statement_executor: &StatementExecutor, + ) -> Result { + // remove empty requests + requests.inserts.retain(|req| { + req.rows + .as_ref() + .map(|r| !r.rows.is_empty()) + .unwrap_or_default() + }); + validate_column_count_match(&requests)?; + + let table_name_to_ids = self + .create_or_alter_tables_on_demand( + &requests, + &ctx, + AutoCreateTableType::Log, + statement_executor, + ) .await?; let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref()) .convert(requests) @@ -143,7 +194,7 @@ impl Inserter { .create_or_alter_tables_on_demand( &requests, &ctx, - Some(physical_table.to_string()), + AutoCreateTableType::Logical(physical_table.to_string()), statement_executor, ) .await?; @@ -380,12 +431,15 @@ impl Inserter { &self, requests: &RowInsertRequests, ctx: &QueryContextRef, - on_physical_table: Option, + auto_create_table_type: AutoCreateTableType, statement_executor: &StatementExecutor, ) -> Result> { let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len()); let mut create_tables = vec![]; let mut alter_tables = vec![]; + let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND + .with_label_values(&[auto_create_table_type.as_str()]) + .start_timer(); for req in &requests.inserts { let catalog = ctx.current_catalog(); let schema = ctx.current_schema(); @@ -407,42 +461,56 @@ impl Inserter { } } - if let Some(on_physical_table) = on_physical_table { - if !create_tables.is_empty() { - // Creates logical tables in batch. - let tables = self - .create_logical_tables( - create_tables, - ctx, - &on_physical_table, - statement_executor, - ) - .await?; + match auto_create_table_type { + AutoCreateTableType::Logical(on_physical_table) => { + if !create_tables.is_empty() { + // Creates logical tables in batch. + let tables = self + .create_logical_tables( + create_tables, + ctx, + &on_physical_table, + statement_executor, + ) + .await?; - for table in tables { + for table in tables { + let table_info = table.table_info(); + table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + } + } + if !alter_tables.is_empty() { + // Alter logical tables in batch. + statement_executor + .alter_logical_tables(alter_tables, ctx.clone()) + .await?; + } + } + AutoCreateTableType::Physical => { + for req in create_tables { + let table = self.create_table(req, ctx, statement_executor).await?; let table_info = table.table_info(); table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); } + for alter_expr in alter_tables.into_iter() { + statement_executor + .alter_table_inner(alter_expr, ctx.clone()) + .await?; + } } - if !alter_tables.is_empty() { - // Alter logical tables in batch. - statement_executor - .alter_logical_tables(alter_tables, ctx.clone()) - .await?; - } - } else { - for req in create_tables { - let table = self.create_table(req, ctx, statement_executor).await?; - let table_info = table.table_info(); - table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); - } - for alter_expr in alter_tables.into_iter() { - statement_executor - .alter_table_inner(alter_expr, ctx.clone()) - .await?; + AutoCreateTableType::Log => { + for req in create_tables { + let table = self.create_log_table(req, ctx, statement_executor).await?; + let table_info = table.table_info(); + table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + } + for alter_expr in alter_tables.into_iter() { + statement_executor + .alter_table_inner(alter_expr, ctx.clone()) + .await?; + } } } - Ok(table_name_to_ids) } @@ -568,17 +636,45 @@ impl Inserter { match res { Ok(table) => { - info!( - "Successfully created table {}.{}.{}", - table_ref.catalog, table_ref.schema, table_ref.table, - ); + info!("Successfully created table {}", table_ref,); Ok(table) } Err(err) => { - error!( - "Failed to create table {}.{}.{}: {}", - table_ref.catalog, table_ref.schema, table_ref.table, err - ); + error!(err; "Failed to create table {}", table_ref); + Err(err) + } + } + } + + async fn create_log_table( + &self, + req: &RowInsertRequest, + ctx: &QueryContextRef, + statement_executor: &StatementExecutor, + ) -> Result { + let table_ref = + TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name); + // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`. + let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); + let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?; + + info!("Table `{table_ref}` does not exist, try creating the log table"); + // Set append_mode to true for log table. + // because log tables should keep rows with the same ts and tags. + create_table_expr + .table_options + .insert("append_mode".to_string(), "true".to_string()); + let res = statement_executor + .create_table_inner(create_table_expr, None, ctx.clone()) + .await; + + match res { + Ok(table) => { + info!("Successfully created a log table {}", table_ref); + Ok(table) + } + Err(err) => { + error!(err; "Failed to create a log table {}", table_ref); Err(err) } } diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 97c5e0015a55..9a77f9844d38 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -51,4 +51,10 @@ lazy_static! { "DDL operator create view" ) .unwrap(); + pub static ref CREATE_ALTER_ON_DEMAND: HistogramVec = register_histogram_vec!( + "greptime_table_operator_create_alter_on_demand", + "table operator duration to create or alter tables on demand", + &["table_type"] + ) + .unwrap(); } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 8522b5db9bb0..6c1c33a0c809 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -73,6 +73,8 @@ pub struct StatementExecutor { inserter: InserterRef, } +pub type StatementExecutorRef = Arc; + impl StatementExecutor { pub fn new( catalog_manager: CatalogManagerRef, diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml new file mode 100644 index 000000000000..03096b47a7a1 --- /dev/null +++ b/src/pipeline/Cargo.toml @@ -0,0 +1,62 @@ +[package] +name = "pipeline" +edition.workspace = true +version.workspace = true +license.workspace = true + +[features] + +[lints] +workspace = true + +[dependencies] +api.workspace = true +arrow.workspace = true +async-trait.workspace = true +catalog.workspace = true +chrono.workspace = true +chrono-tz = "0.9.0" +common-catalog.workspace = true +common-error.workspace = true +common-function.workspace = true +common-macro.workspace = true +common-meta.workspace = true +common-query.workspace = true +common-recordbatch.workspace = true +common-runtime.workspace = true +common-telemetry.workspace = true +common-time.workspace = true +crossbeam-utils.workspace = true +csv = "1.3.0" +datafusion.workspace = true +datafusion-common.workspace = true +datafusion-expr.workspace = true +datafusion-functions.workspace = true +datafusion-physical-expr.workspace = true +datatypes.workspace = true +futures.workspace = true +greptime-proto.workspace = true +itertools.workspace = true +lazy_static.workspace = true +moka = { workspace = true, features = ["sync"] } +once_cell.workspace = true +operator.workspace = true +paste.workspace = true +prometheus.workspace = true +query.workspace = true +regex.workspace = true +serde_json.workspace = true +session.workspace = true +snafu.workspace = true +sql.workspace = true +table.workspace = true +tokio.workspace = true +urlencoding = "2.1" +yaml-rust = "0.4" + +[dev-dependencies] +catalog = { workspace = true, features = ["testing"] } +rayon = "1.0" +ron = "0.7" +serde = { version = "1.0", features = ["derive"] } +session = { workspace = true, features = ["testing"] } diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs new file mode 100644 index 000000000000..86ed9c7ea79b --- /dev/null +++ b/src/pipeline/src/lib.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod etl; +mod manager; + +pub use etl::transform::GreptimeTransformer; +pub use etl::value::Value; +pub use etl::{parse, Content, Pipeline}; +pub use manager::{error, pipeline_operator, table}; diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs new file mode 100644 index 000000000000..ad5d8a96bebd --- /dev/null +++ b/src/pipeline/src/manager/error.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use datatypes::timestamp::TimestampNanosecond; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Pipeline table not found"))] + PipelineTableNotFound { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to insert pipeline to pipelines table"))] + InsertPipeline { + source: operator::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to parse pipeline: {}", reason))] + CompilePipeline { + reason: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))] + PipelineNotFound { + name: String, + version: Option, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to collect record batch"))] + CollectRecords { + #[snafu(implicit)] + location: Location, + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to cast type, msg: {}", msg))] + CastType { + msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build DataFusion logical plan"))] + BuildDfLogicalPlan { + #[snafu(source)] + error: datafusion_common::DataFusionError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to execute internal statement"))] + ExecuteInternalStatement { + source: query::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("General catalog error"))] + Catalog { + source: catalog::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to create table"))] + CreateTable { + source: operator::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to execute pipeline, reason: {}", reason))] + PipelineTransform { + reason: String, + #[snafu(implicit)] + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + CastType { .. } => StatusCode::Unexpected, + PipelineTableNotFound { .. } => StatusCode::TableNotFound, + InsertPipeline { source, .. } => source.status_code(), + CollectRecords { source, .. } => source.status_code(), + PipelineNotFound { .. } | CompilePipeline { .. } | PipelineTransform { .. } => { + StatusCode::InvalidArguments + } + BuildDfLogicalPlan { .. } => StatusCode::Internal, + ExecuteInternalStatement { source, .. } => source.status_code(), + Catalog { source, .. } => source.status_code(), + CreateTable { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/pipeline/src/manager/mod.rs b/src/pipeline/src/manager/mod.rs new file mode 100644 index 000000000000..95ffb5822ec3 --- /dev/null +++ b/src/pipeline/src/manager/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod pipeline_operator; +pub mod table; diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs new file mode 100644 index 000000000000..390a48d834a4 --- /dev/null +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -0,0 +1,211 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use api::v1::CreateTableExpr; +use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; +use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; +use common_telemetry::info; +use operator::insert::InserterRef; +use operator::statement::StatementExecutorRef; +use query::QueryEngineRef; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; +use table::TableRef; + +use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; +use crate::table::{PipelineTable, PipelineTableRef, PipelineVersion}; +use crate::{GreptimeTransformer, Pipeline}; + +pub const PIPELINE_TABLE_NAME: &str = "pipelines"; + +/// PipelineOperator is responsible for managing pipelines. +/// It provides the ability to: +/// - Create a pipeline table if it does not exist +/// - Get a pipeline from the pipeline table +/// - Insert a pipeline into the pipeline table +/// - Compile a pipeline +/// - Add a pipeline table to the cache +/// - Get a pipeline table from the cache +pub struct PipelineOperator { + inserter: InserterRef, + statement_executor: StatementExecutorRef, + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + tables: RwLock>, +} + +impl PipelineOperator { + /// Create a table request for the pipeline table. + pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { + let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema(); + + let create_table_expr = CreateTableExpr { + catalog_name: catalog.to_string(), + schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), + table_name: PIPELINE_TABLE_NAME.to_string(), + desc: "GreptimeDB pipeline table for Log".to_string(), + column_defs, + time_index, + primary_keys, + create_if_not_exists: true, + table_options: Default::default(), + table_id: None, // Should and will be assigned by Meta. + engine: default_engine().to_string(), + }; + + RegisterSystemTableRequest { + create_table_expr, + open_hook: None, + } + } + + fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) { + let mut tables = self.tables.write().unwrap(); + if tables.contains_key(catalog) { + return; + } + tables.insert( + catalog.to_string(), + Arc::new(PipelineTable::new( + self.inserter.clone(), + self.statement_executor.clone(), + table, + self.query_engine.clone(), + )), + ); + } + + async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> { + let catalog = ctx.current_catalog(); + + // exist in cache + if self.get_pipeline_table_from_cache(catalog).is_some() { + return Ok(()); + } + + let RegisterSystemTableRequest { + create_table_expr: mut expr, + open_hook: _, + } = self.create_table_request(catalog); + + // exist in catalog, just open + if let Some(table) = self + .catalog_manager + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .await + .context(CatalogSnafu)? + { + self.add_pipeline_table_to_cache(catalog, table); + return Ok(()); + } + + // create table + self.statement_executor + .create_table_inner(&mut expr, None, ctx.clone()) + .await + .context(CreateTableSnafu)?; + + let schema = &expr.schema_name; + let table_name = &expr.table_name; + + // get from catalog + let table = self + .catalog_manager + .table(catalog, schema, table_name) + .await + .context(CatalogSnafu)? + .context(PipelineTableNotFoundSnafu)?; + + info!( + "Created pipelines table {} with table id {}.", + table.table_info().full_table_name(), + table.table_info().table_id() + ); + + // put to cache + self.add_pipeline_table_to_cache(catalog, table); + + Ok(()) + } + + /// Get a pipeline table from the cache. + pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option { + self.tables.read().unwrap().get(catalog).cloned() + } + + async fn insert_and_compile( + &self, + ctx: QueryContextRef, + name: &str, + content_type: &str, + pipeline: &str, + ) -> Result>> { + self.get_pipeline_table_from_cache(ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .insert_and_compile(ctx.current_schema(), name, content_type, pipeline) + .await + } +} + +impl PipelineOperator { + /// Create a new PipelineOperator. + pub fn new( + inserter: InserterRef, + statement_executor: StatementExecutorRef, + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + ) -> Self { + Self { + inserter, + statement_executor, + catalog_manager, + tables: RwLock::new(HashMap::new()), + query_engine, + } + } + + /// Get a pipeline from the pipeline table. + pub async fn get_pipeline( + &self, + query_ctx: QueryContextRef, + name: &str, + version: PipelineVersion, + ) -> Result>> { + self.create_pipeline_table_if_not_exists(query_ctx.clone()) + .await?; + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .get_pipeline(query_ctx.current_schema(), name, version) + .await + } + + /// Insert a pipeline into the pipeline table. + pub async fn insert_pipeline( + &self, + name: &str, + content_type: &str, + pipeline: &str, + query_ctx: QueryContextRef, + ) -> Result<()> { + self.create_pipeline_table_if_not_exists(query_ctx.clone()) + .await?; + + self.insert_and_compile(query_ctx, name, content_type, pipeline) + .await + .map(|_| ()) + } +} diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs new file mode 100644 index 000000000000..d037ae3d4832 --- /dev/null +++ b/src/pipeline/src/manager/table.rs @@ -0,0 +1,444 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, + RowInsertRequests, Rows, SemanticType, +}; +use common_query::OutputData; +use common_recordbatch::util as record_util; +use common_telemetry::{debug, info}; +use common_time::timestamp::{TimeUnit, Timestamp}; +use datafusion::datasource::DefaultTableSource; +use datafusion::logical_expr::{and, col, lit}; +use datafusion_common::TableReference; +use datafusion_expr::LogicalPlanBuilder; +use datatypes::prelude::ScalarVector; +use datatypes::timestamp::TimestampNanosecond; +use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; +use moka::sync::Cache; +use operator::insert::InserterRef; +use operator::statement::StatementExecutorRef; +use query::plan::LogicalPlan; +use query::QueryEngineRef; +use session::context::{QueryContextBuilder, QueryContextRef}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::metadata::TableInfo; +use table::table::adapter::DfTableProviderAdapter; +use table::TableRef; + +use crate::error::{ + BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu, + ExecuteInternalStatementSnafu, InsertPipelineSnafu, PipelineNotFoundSnafu, Result, +}; +use crate::etl::transform::GreptimeTransformer; +use crate::etl::{parse, Content, Pipeline}; + +/// Pipeline version. An optional timestamp with nanosecond precision. +/// If the version is None, it means the latest version of the pipeline. +/// User can specify the version by providing a timestamp string formatted as iso8601. +/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch. +pub type PipelineVersion = Option; + +pub type PipelineTableRef = Arc; + +pub const PIPELINE_TABLE_NAME: &str = "pipelines"; + +pub const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; +pub const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; +pub const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; +pub const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline"; +pub const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at"; + +/// Pipeline table cache size. +pub const PIPELINES_CACHE_SIZE: u64 = 10000; +/// Pipeline table cache time to live. +pub const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); + +/// PipelineTable is a table that stores the pipeline schema and content. +/// Every catalog has its own pipeline table. +pub struct PipelineTable { + inserter: InserterRef, + statement_executor: StatementExecutorRef, + table: TableRef, + query_engine: QueryEngineRef, + pipelines: Cache>>, +} + +impl PipelineTable { + /// Create a new PipelineTable. + pub fn new( + inserter: InserterRef, + statement_executor: StatementExecutorRef, + table: TableRef, + query_engine: QueryEngineRef, + ) -> Self { + Self { + inserter, + statement_executor, + table, + query_engine, + pipelines: Cache::builder() + .max_capacity(PIPELINES_CACHE_SIZE) + .time_to_live(PIPELINES_CACHE_TTL) + .build(), + } + } + + /// Build the schema for the pipeline table. + /// Returns the (time index, primary keys, column) definitions. + pub fn build_pipeline_schema() -> (String, Vec, Vec) { + ( + PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(), + vec![ + PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(), + PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(), + PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(), + ], + vec![ + ColumnDef { + name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + comment: "".to_string(), + datatype_extension: None, + }, + ColumnDef { + name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + comment: "".to_string(), + datatype_extension: None, + }, + ColumnDef { + name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, + comment: "".to_string(), + datatype_extension: None, + }, + ColumnDef { + name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "".to_string(), + datatype_extension: None, + }, + ColumnDef { + name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(), + data_type: ColumnDataType::TimestampNanosecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, + comment: "".to_string(), + datatype_extension: None, + }, + ], + ) + } + + /// Build the column schemas for inserting a row into the pipeline table. + fn build_insert_column_schemas() -> Vec { + vec![ + PbColumnSchema { + column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + ..Default::default() + }, + PbColumnSchema { + column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + ..Default::default() + }, + PbColumnSchema { + column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + ..Default::default() + }, + PbColumnSchema { + column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + PbColumnSchema { + column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + ..Default::default() + }, + ] + } + + fn query_ctx(table_info: &TableInfo) -> QueryContextRef { + QueryContextBuilder::default() + .current_catalog(table_info.catalog_name.to_string()) + .current_schema(table_info.schema_name.to_string()) + .build() + .into() + } + + /// Compile a pipeline from a string. + pub fn compile_pipeline(pipeline: &str) -> Result> { + let yaml_content = Content::Yaml(pipeline.into()); + parse::(&yaml_content) + .map_err(|e| CompilePipelineSnafu { reason: e }.build()) + } + + fn generate_pipeline_cache_key(schema: &str, name: &str, version: PipelineVersion) -> String { + match version { + Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)), + None => format!("{}/{}/latest", schema, name), + } + } + + fn get_compiled_pipeline_from_cache( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Option>> { + self.pipelines + .get(&Self::generate_pipeline_cache_key(schema, name, version)) + } + + /// Insert a pipeline into the pipeline table. + async fn insert_pipeline_to_pipeline_table( + &self, + schema: &str, + name: &str, + content_type: &str, + pipeline: &str, + ) -> Result { + let now = Timestamp::current_time(TimeUnit::Nanosecond); + + let table_info = self.table.table_info(); + + let insert = RowInsertRequest { + table_name: PIPELINE_TABLE_NAME.to_string(), + rows: Some(Rows { + schema: Self::build_insert_column_schemas(), + rows: vec![Row { + values: vec![ + ValueData::StringValue(name.to_string()).into(), + ValueData::StringValue(schema.to_string()).into(), + ValueData::StringValue(content_type.to_string()).into(), + ValueData::StringValue(pipeline.to_string()).into(), + ValueData::TimestampNanosecondValue(now.value()).into(), + ], + }], + }), + }; + + let requests = RowInsertRequests { + inserts: vec![insert], + }; + + let output = self + .inserter + .handle_row_inserts( + requests, + Self::query_ctx(&table_info), + &self.statement_executor, + ) + .await + .context(InsertPipelineSnafu)?; + + info!( + "Inserted pipeline: {} into {} table: {}, output: {:?}.", + name, + PIPELINE_TABLE_NAME, + table_info.full_table_name(), + output + ); + + Ok(now) + } + + /// Get a pipeline by name. + /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache. + pub async fn get_pipeline( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result>> { + if let Some(pipeline) = self.get_compiled_pipeline_from_cache(schema, name, version) { + return Ok(pipeline); + } + + let pipeline = self.find_pipeline_by_name(schema, name, version).await?; + let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); + + self.pipelines.insert( + Self::generate_pipeline_cache_key(schema, name, version), + compiled_pipeline.clone(), + ); + Ok(compiled_pipeline) + } + + /// Insert a pipeline into the pipeline table and compile it. + /// The compiled pipeline will be inserted into the cache. + pub async fn insert_and_compile( + &self, + schema: &str, + name: &str, + content_type: &str, + pipeline: &str, + ) -> Result>> { + let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?); + // we will use the version in the future + let version = self + .insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline) + .await?; + + { + self.pipelines.insert( + Self::generate_pipeline_cache_key(schema, name, None), + compiled_pipeline.clone(), + ); + self.pipelines.insert( + Self::generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), + compiled_pipeline.clone(), + ); + } + + Ok(compiled_pipeline) + } + + async fn find_pipeline_by_name( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result<(String, TimestampNanosecond)> { + let table_info = self.table.table_info(); + + let table_name = TableReference::full( + table_info.catalog_name.clone(), + table_info.schema_name.clone(), + table_info.name.clone(), + ); + + let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + let schema_and_name_filter = and( + col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), + col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), + ); + let filter = if let Some(v) = version { + and( + schema_and_name_filter, + col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), + ) + } else { + schema_and_name_filter + }; + + let plan = LogicalPlanBuilder::scan(table_name, table_source, None) + .context(BuildDfLogicalPlanSnafu)? + .filter(filter) + .context(BuildDfLogicalPlanSnafu)? + .project(vec![ + col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME), + col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME), + ]) + .context(BuildDfLogicalPlanSnafu)? + .sort(vec![ + col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true) + ]) + .context(BuildDfLogicalPlanSnafu)? + .limit(0, Some(1)) + .context(BuildDfLogicalPlanSnafu)? + .build() + .context(BuildDfLogicalPlanSnafu)?; + + debug!("find_pipeline_by_name: plan: {:?}", plan); + + let output = self + .query_engine + .execute(LogicalPlan::DfPlan(plan), Self::query_ctx(&table_info)) + .await + .context(ExecuteInternalStatementSnafu)?; + let stream = match output.data { + OutputData::Stream(stream) => stream, + OutputData::RecordBatches(record_batches) => record_batches.as_stream(), + _ => unreachable!(), + }; + + let records = record_util::collect(stream) + .await + .context(CollectRecordsSnafu)?; + + ensure!(!records.is_empty(), PipelineNotFoundSnafu { name, version }); + + ensure!( + records.len() == 1 && records[0].num_columns() == 2, + PipelineNotFoundSnafu { name, version } + ); + + let pipeline_content_column = records[0].column(0); + let pipeline_content = pipeline_content_column + .as_any() + .downcast_ref::() + .with_context(|| CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into string vector", + pipeline_content_column.data_type() + ), + })?; + + let pipeline_created_at_column = records[0].column(1); + let pipeline_created_at = pipeline_created_at_column + .as_any() + .downcast_ref::() + .with_context(|| CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into scalar vector", + pipeline_created_at_column.data_type() + ), + })?; + + debug!( + "find_pipeline_by_name: pipeline_content: {:?}, pipeline_created_at: {:?}", + pipeline_content, pipeline_created_at + ); + + ensure!( + pipeline_content.len() == 1, + PipelineNotFoundSnafu { name, version } + ); + + // Safety: asserted above + Ok(( + pipeline_content.get_data(0).unwrap().to_string(), + pipeline_created_at.get_data(0).unwrap(), + )) + } +} diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b30426d2e7ac..755d59bfacd6 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -22,7 +22,7 @@ arrow-ipc.workspace = true arrow-schema.workspace = true async-trait = "0.1" auth.workspace = true -axum.workspace = true +axum = { workspace = true, features = ["multipart"] } axum-macros = "0.3.8" base64.workspace = true bytes.workspace = true @@ -69,6 +69,7 @@ opentelemetry-proto.workspace = true parking_lot = "0.12" pgwire = "0.20" pin-project = "1.0" +pipeline.workspace = true postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } pprof = { version = "0.13", features = [ "flamegraph", diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index ae595b8e95b6..04b6fa196ca2 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -27,6 +27,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_telemetry::{debug, error}; use datatypes::prelude::ConcreteDataType; +use headers::ContentType; use query::parser::PromQuery; use serde_json::json; use snafu::{Location, Snafu}; @@ -148,6 +149,19 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Pipeline management api error"))] + Pipeline { + source: pipeline::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unsupported delete pipeline."))] + UnsupportedDeletePipeline { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to execute script by name: {}", name))] ExecuteScript { name: String, @@ -533,6 +547,27 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse payload as json"))] + ParseJson { + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to convert to structured log"))] + ToStructuredLog { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unsupported content type: {:?}", content_type))] + UnsupportedContentType { + content_type: ContentType, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to decode url"))] UrlDecode { #[snafu(source)] @@ -600,6 +635,7 @@ impl ErrorExt for Error { | FileWatch { .. } => StatusCode::Internal, UnsupportedDataType { .. } => StatusCode::Unsupported, + UnsupportedDeletePipeline { .. } => StatusCode::Unsupported, #[cfg(not(windows))] UpdateJemallocMetrics { .. } => StatusCode::Internal, @@ -614,6 +650,8 @@ impl ErrorExt for Error { | ExecuteGrpcRequest { source, .. } | CheckDatabaseValidity { source, .. } => source.status_code(), + Pipeline { source, .. } => source.status_code(), + NotSupported { .. } | InvalidParameter { .. } | InvalidQuery { .. } @@ -637,6 +675,9 @@ impl ErrorExt for Error { | MissingQueryContext { .. } | MysqlValueConversion { .. } | UnexpectedPhysicalTable { .. } + | ParseJson { .. } + | ToStructuredLog { .. } + | UnsupportedContentType { .. } | TimestampOverflow { .. } => StatusCode::InvalidArguments, RowWriter { source, .. } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 5ef29b8b38c1..3f7f71653f73 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -67,12 +67,13 @@ use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ - InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, - PromStoreProtocolHandlerRef, ScriptHandlerRef, + InfluxdbLineProtocolHandlerRef, LogHandlerRef, OpenTelemetryProtocolHandlerRef, + OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef, }; use crate::server::Server; pub mod authorize; +pub mod event; pub mod handler; pub mod header; pub mod influxdb; @@ -587,6 +588,16 @@ impl HttpServerBuilder { } } + pub fn with_log_ingest_handler(self, handler: LogHandlerRef) -> Self { + Self { + router: self.router.nest( + &format!("/{HTTP_API_VERSION}/events"), + HttpServer::route_log(handler), + ), + ..self + } + } + pub fn with_plugins(self, plugins: Plugins) -> Self { Self { plugins, ..self } } @@ -699,6 +710,21 @@ impl HttpServer { .with_state(metrics_handler) } + fn route_log(log_handler: LogHandlerRef) -> Router { + Router::new() + .route("/logs", routing::post(event::log_ingester)) + .route( + "/pipelines/:pipeline_name", + routing::post(event::add_pipeline), + ) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(RequestDecompressionLayer::new()), + ) + .with_state(log_handler) + } + fn route_sql(api_state: ApiState) -> ApiRouter { ApiRouter::new() .api_route( diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs new file mode 100644 index 000000000000..f9939b80572e --- /dev/null +++ b/src/servers/src/http/event.rs @@ -0,0 +1,257 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::result::Result as StdResult; + +use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; +use axum::body::HttpBody; +use axum::extract::{FromRequest, Multipart, Path, Query, State}; +use axum::headers::ContentType; +use axum::http::header::CONTENT_TYPE; +use axum::http::{Request, StatusCode}; +use axum::response::{IntoResponse, Response}; +use axum::{async_trait, BoxError, Extension, TypedHeader}; +use common_telemetry::{error, warn}; +use common_time::Timestamp; +use datatypes::timestamp::TimestampNanosecond; +use mime_guess::mime; +use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; +use pipeline::table::PipelineVersion; +use pipeline::Value as PipelineValue; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::{Deserializer, Value}; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, +}; +use crate::http::greptime_result_v1::GreptimedbV1Response; +use crate::http::HttpResponse; +use crate::query_handler::LogHandlerRef; + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct LogIngesterQueryParams { + pub table: Option, + pub db: Option, + pub pipeline_name: Option, + pub ignore_errors: Option, + + pub version: Option, +} + +pub struct PipelineContent(String); + +#[async_trait] +impl FromRequest for PipelineContent +where + B: HttpBody + Send + 'static, + B::Data: Send, + bytes::Bytes: std::convert::From<::Data>, + B::Error: Into, + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(req: Request, state: &S) -> StdResult { + let content_type_header = req.headers().get(CONTENT_TYPE); + let content_type = content_type_header.and_then(|value| value.to_str().ok()); + if let Some(content_type) = content_type { + if content_type.ends_with("yaml") { + let payload = String::from_request(req, state) + .await + .map_err(IntoResponse::into_response)?; + return Ok(Self(payload)); + } + + if content_type.starts_with("multipart/form-data") { + let mut payload: Multipart = Multipart::from_request(req, state) + .await + .map_err(IntoResponse::into_response)?; + let file = payload + .next_field() + .await + .map_err(IntoResponse::into_response)?; + let payload = file + .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())? + .text() + .await + .map_err(IntoResponse::into_response)?; + return Ok(Self(payload)); + } + } + + Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()) + } +} + +#[axum_macros::debug_handler] +pub async fn add_pipeline( + State(handler): State, + Path(pipeline_name): Path, + Extension(query_ctx): Extension, + PipelineContent(payload): PipelineContent, +) -> Result { + if pipeline_name.is_empty() { + return Err(InvalidParameterSnafu { + reason: "pipeline_name is required in path", + } + .build()); + } + + if payload.is_empty() { + return Err(InvalidParameterSnafu { + reason: "pipeline is required in body", + } + .build()); + } + + let content_type = "yaml"; + let result = handler + .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx) + .await; + + result.map(|_| "ok".to_string()).map_err(|e| { + error!(e; "failed to insert pipeline"); + e + }) +} + +/// Transform NDJSON array into a single array +fn transform_ndjson_array_factory( + values: impl IntoIterator>, + ignore_error: bool, +) -> Result { + values.into_iter().try_fold( + Value::Array(Vec::with_capacity(100)), + |acc, item| match acc { + Value::Array(mut acc_array) => { + if let Ok(item_value) = item { + match item_value { + Value::Array(item_array) => { + acc_array.extend(item_array); + } + Value::Object(_) => { + acc_array.push(item_value); + } + _ => { + if !ignore_error { + warn!("invalid item in array: {:?}", item_value); + return InvalidParameterSnafu { + reason: format!("invalid item:{} in array", item_value), + } + .fail(); + } + } + } + Ok(Value::Array(acc_array)) + } else if !ignore_error { + item.context(ParseJsonSnafu) + } else { + warn!("invalid item in array: {:?}", item); + Ok(Value::Array(acc_array)) + } + } + _ => unreachable!("invalid acc: {:?}", acc), + }, + ) +} + +#[axum_macros::debug_handler] +pub async fn log_ingester( + State(handler): State, + Query(query_params): Query, + Extension(query_ctx): Extension, + TypedHeader(content_type): TypedHeader, + payload: String, +) -> Result { + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { + reason: "pipeline_name is required", + })?; + let table_name = query_params.table.context(InvalidParameterSnafu { + reason: "table is required", + })?; + + let version = match query_params.version { + Some(version) => { + let ts = Timestamp::from_str_utc(&version).map_err(|e| { + InvalidParameterSnafu { + reason: format!("invalid pipeline version: {} with error: {}", &version, e), + } + .build() + })?; + Some(TimestampNanosecond(ts)) + } + None => None, + }; + + let ignore_errors = query_params.ignore_errors.unwrap_or(false); + + let m: mime::Mime = content_type.clone().into(); + let value = match m.subtype() { + mime::JSON => transform_ndjson_array_factory( + Deserializer::from_str(&payload).into_iter(), + ignore_errors, + )?, + // add more content type support + _ => UnsupportedContentTypeSnafu { content_type }.fail()?, + }; + + ingest_logs_inner( + handler, + pipeline_name, + version, + table_name, + value, + query_ctx, + ) + .await +} + +async fn ingest_logs_inner( + state: LogHandlerRef, + pipeline_name: String, + version: PipelineVersion, + table_name: String, + payload: Value, + query_ctx: QueryContextRef, +) -> Result { + let start = std::time::Instant::now(); + let pipeline_data = PipelineValue::try_from(payload) + .map_err(|reason| CastTypeSnafu { msg: reason }.build()) + .context(PipelineSnafu)?; + + let pipeline = state + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + let transformed_data: Rows = pipeline + .exec(pipeline_data) + .map_err(|reason| PipelineTransformSnafu { reason }.build()) + .context(PipelineSnafu)?; + + let insert_request = RowInsertRequest { + rows: Some(transformed_data), + table_name: table_name.clone(), + }; + let insert_requests = RowInsertRequests { + inserts: vec![insert_request], + }; + let output = state.insert_logs(insert_requests, query_ctx).await; + + let response = GreptimedbV1Response::from_output(vec![output]) + .await + .with_execution_time(start.elapsed().as_millis() as u64); + Ok(response) +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 0430005aed7d..f0c1170e0746 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -35,6 +35,8 @@ use common_query::Output; use headers::HeaderValue; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use pipeline::table::PipelineVersion; +use pipeline::{GreptimeTransformer, Pipeline}; use serde_json::Value; use session::context::QueryContextRef; @@ -48,6 +50,7 @@ pub type InfluxdbLineProtocolHandlerRef = Arc; pub type OpenTelemetryProtocolHandlerRef = Arc; pub type ScriptHandlerRef = Arc; +pub type LogHandlerRef = Arc; #[async_trait] pub trait ScriptHandler { @@ -118,3 +121,29 @@ pub trait OpenTelemetryProtocolHandler { ctx: QueryContextRef, ) -> Result; } + +/// LogHandler is responsible for handling log related requests. +/// It should be able to insert logs and manage pipelines. +/// The pipeline is a series of transformations that can be applied to logs. +/// The pipeline is stored in the database and can be retrieved by name. +#[async_trait] +pub trait LogHandler { + async fn insert_logs(&self, log: RowInsertRequests, ctx: QueryContextRef) -> Result; + + async fn get_pipeline( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result>>; + + async fn insert_pipeline( + &self, + name: &str, + content_type: &str, + pipeline: &str, + query_ctx: QueryContextRef, + ) -> Result<()>; + + async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()>; +}