Skip to content

Commit

Permalink
feat: log ingestion support (#4014)
Browse files Browse the repository at this point in the history
* chore: add log http ingester scaffold

* chore: add some example code

* chore: add log inserter

* chore: add log handler file

* chore: add pipeline lib

* chore: import log handler

* chore: add pipelime http handler

* chore: add pipeline private table

* chore: add pipeline API

* chore: improve error handling

* chore: merge main

* chore: add multi content type support for log handler

* refactor: remove servers dep on pipeline

* refactor: move define_into_tonic_status to common-error

* refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4

* chore: fix typo

* refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c

* chore: fix typo and license header

* refactor: move http event handler to a separate file

* chore: add test for pipeline

* chore: fmt

* refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93

* refactor: move `pipeline_operator` to `pipeline` crate

* chore: minor update

* refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4

* chore: add log

* chore: add log

* chore: remove open hook

* chore: minor update

* chore: fix fmt

* chore: minor update

* chore: rename desc for pipeline table

* refactor: remove updated_at in pipelines

* chore: add more content type support for log inserter api

* chore: introduce pipeline crate

* chore: update upload pipeline api

* chore: fix by pr commit

* chore: add some doc for pub fn/struct

* chore: some minro fix

* chore: add pipeline version support

* chore: impl log pipeline version

* chore: fix format issue

* fix: make the LogicalPlan of a query pipeline sorted in desc order

* chore: remove some debug log

* chore: replacing hashmap cache with moak

* chore: fix by pr commit

* chore: fix toml format issue

* chore: update Cargo.lock

* chore: fix by pr commit

* chore: fix some issue by pr commit

* chore: add more doc for pipeline version

---------

Co-authored-by: shuiyisong <xixing.sys@gmail.com>
  • Loading branch information
paomian and shuiyisong committed Jun 14, 2024
1 parent bf3ad44 commit 01e3a24
Show file tree
Hide file tree
Showing 22 changed files with 1,613 additions and 52 deletions.
114 changes: 110 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"src/object-store",
"src/operator",
"src/partition",
"src/pipeline",
"src/plugins",
"src/promql",
"src/puffin",
Expand Down Expand Up @@ -224,6 +225,7 @@ mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
partition = { path = "src/partition" }
pipeline = { path = "src/pipeline" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
puffin = { path = "src/puffin" }
Expand Down
1 change: 1 addition & 0 deletions src/auth/src/permission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum PermissionReq<'a> {
PromStoreWrite,
PromStoreRead,
Otlp,
LogWrite,
}

#[derive(Debug)]
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ meta-client.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
partition.workspace = true
pipeline.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
Expand All @@ -62,11 +63,13 @@ toml.workspace = true
tonic.workspace = true

[dev-dependencies]
catalog.workspace = true
catalog = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures = "0.3"
meta-srv = { workspace = true, features = ["mock"] }
serde_json.workspace = true
strfmt = "0.2"
tower.workspace = true
uuid.workspace = true
7 changes: 5 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod builder;
mod grpc;
mod influxdb;
mod log_handler;
mod opentsdb;
mod otlp;
mod prom_store;
Expand Down Expand Up @@ -48,6 +49,7 @@ use meta_client::MetaClientOptions;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
Expand All @@ -66,7 +68,7 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
InfluxdbLineProtocolHandler, LogHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
Expand Down Expand Up @@ -100,6 +102,7 @@ pub trait FrontendInstance:
+ OpenTelemetryProtocolHandler
+ ScriptHandler
+ PrometheusHandler
+ LogHandler
+ Send
+ Sync
+ 'static
Expand All @@ -108,12 +111,12 @@ pub trait FrontendInstance:
}

pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
pub type StatementExecutorRef = Arc<StatementExecutor>;

#[derive(Clone)]
pub struct Instance {
catalog_manager: CatalogManagerRef,
script_executor: Arc<ScriptExecutor>,
pipeline_operator: Arc<PipelineOperator>,
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
plugins: Plugins,
Expand Down
13 changes: 11 additions & 2 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::statement::StatementExecutor;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::QueryEngineFactory;
use servers::server::ServerHandlers;
use snafu::OptionExt;

use crate::error::{self, Result};
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::{Instance, StatementExecutorRef};
use crate::instance::Instance;
use crate::script::ScriptExecutor;

/// The frontend [`Instance`] builder.
Expand Down Expand Up @@ -172,11 +173,19 @@ impl FrontendBuilder {
table_route_cache,
));

let pipeline_operator = Arc::new(PipelineOperator::new(
inserter.clone(),
statement_executor.clone(),
self.catalog_manager.clone(),
query_engine.clone(),
));

plugins.insert::<StatementExecutorRef>(statement_executor.clone());

Ok(Instance {
catalog_manager: self.catalog_manager,
script_executor,
pipeline_operator,
statement_executor,
query_engine,
plugins,
Expand Down
Loading

0 comments on commit 01e3a24

Please sign in to comment.