From 3779f40c84f3aeb9842b4962be1ca05d4dea78db Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 15 May 2024 08:40:26 +0000 Subject: [PATCH] perf(operator): reuse table info from creating Signed-off-by: Zhenchi --- src/operator/src/insert.rs | 88 +++++++++++-------- .../src/req_convert/insert/row_to_region.rs | 42 +++------ 2 files changed, 64 insertions(+), 66 deletions(-) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 21eb16e3246..a68ed9b6be1 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -108,15 +108,12 @@ impl Inserter { }); validate_column_count_match(&requests)?; - self.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor) + let table_name_to_ids = self + .create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor) + .await?; + let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref()) + .convert(requests) .await?; - let inserts = RowToRegion::new( - self.catalog_manager.as_ref(), - self.partition_manager.as_ref(), - &ctx, - ) - .convert(requests) - .await?; self.do_request(inserts, &ctx).await } @@ -143,17 +140,17 @@ impl Inserter { .await?; // check and create logical tables - self.create_or_alter_tables_on_demand( - &requests, - &ctx, - Some(physical_table.to_string()), - statement_executor, - ) - .await?; - let inserts = - RowToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, &ctx) - .convert(requests) - .await?; + let table_name_to_ids = self + .create_or_alter_tables_on_demand( + &requests, + &ctx, + Some(physical_table.to_string()), + statement_executor, + ) + .await?; + let inserts = RowToRegion::new(table_name_to_ids, &self.partition_manager) + .convert(requests) + .await?; self.do_request(inserts, &ctx).await } @@ -359,16 +356,20 @@ impl Inserter { Ok(inserts) } - // check if tables already exist: - // - if table does not exist, create table by inferred CreateExpr - // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` + /// Creates or alter tables on demand: + /// - if table does not exist, create table by inferred CreateExpr + /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` + /// + /// Returns a mapping from table name to table id, where table name is the table name involved in the requests. + /// This mapping is used in the conversion of RowToRegion. async fn create_or_alter_tables_on_demand( &self, requests: &RowInsertRequests, ctx: &QueryContextRef, on_physical_table: Option, statement_executor: &StatementExecutor, - ) -> Result<()> { + ) -> Result> { + let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len()); let mut create_tables = vec![]; let mut alter_tables = vec![]; for req in &requests.inserts { @@ -377,6 +378,9 @@ impl Inserter { let table = self.get_table(catalog, schema, &req.table_name).await?; match table { Some(table) => { + let table_info = table.table_info(); + table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + // TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`) validate_request_with_table(req, &table)?; let alter_expr = self.get_alter_table_expr_on_demand(req, table, ctx)?; @@ -393,13 +397,19 @@ impl Inserter { if let Some(on_physical_table) = on_physical_table { if !create_tables.is_empty() { // Creates logical tables in batch. - self.create_logical_tables( - create_tables, - ctx, - &on_physical_table, - statement_executor, - ) - .await?; + let tables = self + .create_logical_tables( + create_tables, + ctx, + &on_physical_table, + statement_executor, + ) + .await?; + + for table in tables { + let table_info = table.table_info(); + table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + } } if !alter_tables.is_empty() { // Alter logical tables in batch. @@ -409,7 +419,9 @@ impl Inserter { } } else { for req in create_tables { - self.create_table(req, ctx, statement_executor).await?; + let table = self.create_table(req, ctx, statement_executor).await?; + let table_info = table.table_info(); + table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); } for alter_expr in alter_tables.into_iter() { statement_executor @@ -418,7 +430,7 @@ impl Inserter { } } - Ok(()) + Ok(table_name_to_ids) } async fn create_physical_table_on_demand( @@ -527,7 +539,7 @@ impl Inserter { req: &RowInsertRequest, ctx: &QueryContextRef, statement_executor: &StatementExecutor, - ) -> Result<()> { + ) -> Result { let table_ref = TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name); @@ -542,12 +554,12 @@ impl Inserter { .await; match res { - Ok(_) => { + Ok(table) => { info!( "Successfully created table {}.{}.{}", table_ref.catalog, table_ref.schema, table_ref.table, ); - Ok(()) + Ok(table) } Err(err) => { error!( @@ -565,7 +577,7 @@ impl Inserter { ctx: &QueryContextRef, physical_table: &str, statement_executor: &StatementExecutor, - ) -> Result<()> { + ) -> Result> { let create_table_exprs = create_tables .iter() .map(|req| { @@ -593,9 +605,9 @@ impl Inserter { .await; match res { - Ok(_) => { + Ok(res) => { info!("Successfully created logical tables"); - Ok(()) + Ok(res) } Err(err) => { let failed_tables = create_table_exprs diff --git a/src/operator/src/req_convert/insert/row_to_region.rs b/src/operator/src/req_convert/insert/row_to_region.rs index 388b13e9add..a33a1329026 100644 --- a/src/operator/src/req_convert/insert/row_to_region.rs +++ b/src/operator/src/req_convert/insert/row_to_region.rs @@ -12,42 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::RowInsertRequests; -use catalog::CatalogManager; use partition::manager::PartitionRuleManager; -use session::context::QueryContext; -use snafu::{OptionExt, ResultExt}; -use table::TableRef; +use snafu::OptionExt; +use table::metadata::TableId; -use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu}; +use crate::error::{Result, TableNotFoundSnafu}; use crate::req_convert::common::partitioner::Partitioner; pub struct RowToRegion<'a> { - catalog_manager: &'a dyn CatalogManager, + table_name_to_ids: HashMap, partition_manager: &'a PartitionRuleManager, - ctx: &'a QueryContext, } impl<'a> RowToRegion<'a> { pub fn new( - catalog_manager: &'a dyn CatalogManager, + table_name_to_ids: HashMap, partition_manager: &'a PartitionRuleManager, - ctx: &'a QueryContext, ) -> Self { Self { - catalog_manager, + table_name_to_ids, partition_manager, - ctx, } } pub async fn convert(&self, requests: RowInsertRequests) -> Result { let mut region_request = Vec::with_capacity(requests.inserts.len()); for request in requests.inserts { - let table = self.get_table(&request.table_name).await?; - let table_id = table.table_info().table_id(); - + let table_id = self.get_table_id(&request.table_name)?; let requests = Partitioner::new(self.partition_manager) .partition_insert_requests(table_id, request.rows.unwrap_or_default()) .await?; @@ -60,19 +55,10 @@ impl<'a> RowToRegion<'a> { }) } - async fn get_table(&self, table_name: &str) -> Result { - let catalog_name = self.ctx.current_catalog(); - let schema_name = self.ctx.current_schema(); - self.catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: common_catalog::format_full_table_name( - catalog_name, - schema_name, - table_name, - ), - }) + fn get_table_id(&self, table_name: &str) -> Result { + self.table_name_to_ids + .get(table_name) + .cloned() + .context(TableNotFoundSnafu { table_name }) } }