From 241f8c8a0bbd129d567cebf54d8502d09cdd0ab0 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Sat, 16 Dec 2023 02:21:56 +0800 Subject: [PATCH] fix: by cr comments --- src/catalog/src/information_schema.rs | 64 ++++++++++++----- src/catalog/src/information_schema/columns.rs | 68 ++++++++----------- src/catalog/src/information_schema/tables.rs | 32 ++------- src/catalog/src/kvbackend/manager.rs | 15 ++-- src/catalog/src/memory/manager.rs | 3 +- tests-integration/src/tests/instance_test.rs | 5 +- .../common/show/show_databases_tables.result | 15 ++-- .../common/system/information_schema.result | 68 ++++++++++++------- 8 files changed, 147 insertions(+), 123 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 6b1002565b16..e3d49f041a07 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -46,7 +46,7 @@ use crate::CatalogManager; lazy_static! { // Memory tables in `information_schema`. - static ref MEMORY_TABLES: Vec<&'static str> = vec![ + static ref MEMORY_TABLES: &'static [&'static str] = &[ ENGINES, COLUMN_PRIVILEGES, COLUMN_STATISTICS @@ -72,37 +72,65 @@ macro_rules! setup_memory_table { pub struct InformationSchemaProvider { catalog_name: String, catalog_manager: Weak, + tables: Option>, } impl InformationSchemaProvider { pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { - Self { + let mut provider = Self { catalog_name, catalog_manager, - } + tables: None, + }; + + provider.build_tables(); + + provider } - /// Build a map of [TableRef] in information schema. - /// Including `tables` and `columns`. - pub fn build( - catalog_name: String, - catalog_manager: Weak, - ) -> HashMap { - let provider = Self::new(catalog_name, catalog_manager); + /// Returns table names in the order of table id. + pub fn table_names(&self) -> Vec { + let mut tables = self.tables().into_values().collect::>(); + tables.sort_by(|t1, t2| { + t1.table_info() + .table_id() + .partial_cmp(&t2.table_info().table_id()) + .unwrap() + }); + + tables + .into_iter() + .map(|t| t.table_info().name.clone()) + .collect() + } - let mut schema = HashMap::new(); - schema.insert(TABLES.to_string(), provider.table(TABLES).unwrap()); - schema.insert(COLUMNS.to_string(), provider.table(COLUMNS).unwrap()); + /// Returns a map of [TableRef] in information schema. + pub fn tables(&self) -> HashMap { + // Safety: already built in `new`. + self.tables.clone().unwrap() + } + + /// Returns the [TableRef] by table name. + pub fn table(&self, name: &str) -> Option { + self.tables().get(name).cloned() + } + + fn build_tables(&mut self) -> HashMap { + let mut tables = HashMap::new(); + tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); + tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap()); // Add memory tables for name in MEMORY_TABLES.iter() { - schema.insert((*name).to_string(), provider.table(name).unwrap()); + tables.insert((*name).to_string(), self.build_table(name).unwrap()); } - schema + self.tables = Some(tables.clone()); + + tables } - pub fn table(&self, name: &str) -> Option { + fn build_table(&self, name: &str) -> Option { self.information_table(name).map(|table| { let table_info = Self::table_info(self.catalog_name.clone(), &table); let filter_pushdown = FilterPushDownType::Unsupported; @@ -114,8 +142,7 @@ impl InformationSchemaProvider { } fn information_table(&self, name: &str) -> Option { - let name = name.to_ascii_lowercase(); - match name.clone().as_str() { + match name.to_ascii_lowercase().as_str() { TABLES => Some(Arc::new(InformationSchemaTables::new( self.catalog_name.clone(), self.catalog_manager.clone(), @@ -214,6 +241,7 @@ impl DataSource for InformationTableDataSource { stream: Box::pin(stream), output_ordering: None, }; + Ok(Box::pin(stream)) } } diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 34e9c7ef66c5..53f338783ad3 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -16,8 +16,8 @@ use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::{ - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, - SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX, + INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, + SEMANTIC_TYPE_TIME_INDEX, }; use common_error::ext::BoxedError; use common_query::physical_plan::TaskContext; @@ -33,8 +33,7 @@ use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; -use super::tables::InformationSchemaTables; -use super::{InformationTable, COLUMNS, TABLES}; +use super::{InformationTable, COLUMNS}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -102,7 +101,7 @@ impl InformationTable for InformationSchemaColumns { schema, futures::stream::once(async move { builder - .make_tables() + .make_columns() .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -148,8 +147,8 @@ impl InformationSchemaColumnsBuilder { } } - /// Construct the `information_schema.tables` virtual table - async fn make_tables(&mut self) -> Result { + /// Construct the `information_schema.columns` virtual table + async fn make_columns(&mut self) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager @@ -163,48 +162,38 @@ impl InformationSchemaColumnsBuilder { { continue; } + for table_name in catalog_manager .table_names(&catalog_name, &schema_name) .await? { - let (keys, schema) = if let Some(table) = catalog_manager + if let Some(table) = catalog_manager .table(&catalog_name, &schema_name, &table_name) .await? { let keys = &table.table_info().meta.primary_key_indices; let schema = table.schema(); - (keys.clone(), schema) - } else { - // TODO: this specific branch is only a workaround for FrontendCatalogManager. - if schema_name == INFORMATION_SCHEMA_NAME { - if table_name == COLUMNS { - (vec![], InformationSchemaColumns::schema()) - } else if table_name == TABLES { - (vec![], InformationSchemaTables::schema()) + + for (idx, column) in schema.column_schemas().iter().enumerate() { + let semantic_type = if column.is_time_index() { + SEMANTIC_TYPE_TIME_INDEX + } else if keys.contains(&idx) { + SEMANTIC_TYPE_PRIMARY_KEY } else { - continue; - } - } else { - continue; + SEMANTIC_TYPE_FIELD + }; + + self.add_column( + &catalog_name, + &schema_name, + &table_name, + &column.name, + &column.data_type.name(), + semantic_type, + ); } - }; - - for (idx, column) in schema.column_schemas().iter().enumerate() { - let semantic_type = if column.is_time_index() { - SEMANTIC_TYPE_TIME_INDEX - } else if keys.contains(&idx) { - SEMANTIC_TYPE_PRIMARY_KEY - } else { - SEMANTIC_TYPE_FIELD - }; - self.add_column( - &catalog_name, - &schema_name, - &table_name, - &column.name, - &column.data_type.name(), - semantic_type, - ); + } else { + unreachable!(); } } } @@ -238,6 +227,7 @@ impl InformationSchemaColumnsBuilder { Arc::new(self.data_types.finish()), Arc::new(self.semantic_types.finish()), ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) } } @@ -254,7 +244,7 @@ impl DfPartitionStream for InformationSchemaColumns { schema, futures::stream::once(async move { builder - .make_tables() + .make_columns() .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index a626dbfdd31a..d258dd490130 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -15,10 +15,7 @@ use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; -use common_catalog::consts::{ - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, - INFORMATION_SCHEMA_TABLES_TABLE_ID, -}; +use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID; use common_error::ext::BoxedError; use common_query::physical_plan::TaskContext; use common_recordbatch::adapter::RecordBatchStreamAdapter; @@ -33,7 +30,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; use table::metadata::TableType; -use super::{COLUMNS, TABLES}; +use super::TABLES; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -178,29 +175,8 @@ impl InformationSchemaTablesBuilder { Some(&table_info.meta.engine), ); } else { - // TODO: this specific branch is only a workaround for FrontendCatalogManager. - if schema_name == INFORMATION_SCHEMA_NAME { - if table_name == COLUMNS { - self.add_table( - &catalog_name, - &schema_name, - &table_name, - TableType::Temporary, - Some(INFORMATION_SCHEMA_COLUMNS_TABLE_ID), - None, - ); - } else if table_name == TABLES { - self.add_table( - &catalog_name, - &schema_name, - &table_name, - TableType::Temporary, - Some(INFORMATION_SCHEMA_TABLES_TABLE_ID), - None, - ); - } - } - }; + unreachable!(); + } } } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 2c5028b40908..93536ab73f41 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -38,7 +38,7 @@ use crate::error::{ self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, TableMetadataManagerSnafu, }; -use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; +use crate::information_schema::InformationSchemaProvider; use crate::CatalogManager; /// Access all existing catalog, schema and tables. @@ -81,6 +81,11 @@ impl KvBackendCatalogManager { cache_invalidator, system_catalog: SystemCatalog { catalog_manager: me.clone(), + information_schema_provider: Arc::new(InformationSchemaProvider::new( + // The catalog name is not used in system_catalog, so let it empty + "".to_string(), + me.clone(), + )), }, }) } @@ -231,11 +236,11 @@ impl CatalogManager for KvBackendCatalogManager { // a new catalog is created. /// Existing system tables: /// - public.numbers -/// - information_schema.tables -/// - information_schema.columns +/// - information_schema.{tables} #[derive(Clone)] struct SystemCatalog { catalog_manager: Weak, + information_schema_provider: Arc, } impl SystemCatalog { @@ -245,7 +250,7 @@ impl SystemCatalog { fn table_names(&self, schema: &str) -> Vec { if schema == INFORMATION_SCHEMA_NAME { - vec![TABLES.to_string(), COLUMNS.to_string()] + self.information_schema_provider.table_names() } else if schema == DEFAULT_SCHEMA_NAME { vec![NUMBERS_TABLE_NAME.to_string()] } else { @@ -259,7 +264,7 @@ impl SystemCatalog { fn table_exist(&self, schema: &str, table: &str) -> bool { if schema == INFORMATION_SCHEMA_NAME { - table == TABLES || table == COLUMNS + self.information_schema_provider.table(table).is_some() } else if schema == DEFAULT_SCHEMA_NAME { table == NUMBERS_TABLE_NAME } else { diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 5d08c1162600..d35d734f40fc 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -243,10 +243,11 @@ impl MemoryCatalogManager { } fn create_catalog_entry(self: &Arc, catalog: String) -> SchemaEntries { - let information_schema = InformationSchemaProvider::build( + let information_schema_provider = InformationSchemaProvider::new( catalog, Arc::downgrade(self) as Weak, ); + let information_schema = information_schema_provider.tables(); let mut catalog = HashMap::new(); catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema); catalog diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 6bf5fc0d4784..5054182b56d4 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -1732,7 +1732,7 @@ async fn test_information_schema_dot_tables(instance: Arc) { // User can only see information schema under current catalog. // A necessary requirement to GreptimeCloud. - let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' order by table_name"; + let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' and table_name in ('columns', 'numbers', 'tables', 'another_table') order by table_name"; let output = execute_sql(&instance, sql).await; let expected = "\ @@ -1760,6 +1760,7 @@ async fn test_information_schema_dot_tables(instance: Arc) { #[apply(both_instances_cases)] async fn test_information_schema_dot_columns(instance: Arc) { + logging::init_default_ut_logging(); let instance = instance.frontend(); let sql = "create table another_table(i timestamp time index)"; @@ -1769,7 +1770,7 @@ async fn test_information_schema_dot_columns(instance: Arc) { // User can only see information schema under current catalog. // A necessary requirement to GreptimeCloud. - let sql = "select table_catalog, table_schema, table_name, column_name, data_type, semantic_type from information_schema.columns order by table_name"; + let sql = "select table_catalog, table_schema, table_name, column_name, data_type, semantic_type from information_schema.columns where table_name in ('columns', 'numbers', 'tables', 'another_table') order by table_name"; let output = execute_sql(&instance, sql).await; let expected = "\ diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index c4e4f402603e..d68f1d991612 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -16,10 +16,13 @@ Affected Rows: 0 show tables; -+---------+ -| Tables | -+---------+ -| columns | -| tables | -+---------+ ++-------------------+ +| Tables | ++-------------------+ +| column_privileges | +| column_statistics | +| columns | +| engines | +| tables | ++-------------------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index be60fdd701e5..e9bb4535e201 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -4,33 +4,53 @@ from information_schema.tables where table_name != 'scripts' order by table_schema, table_name; -+---------------+--------------------+------------+-----------------+----------+-------------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+--------------------+------------+-----------------+----------+-------------+ -| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | | -| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | -| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | -+---------------+--------------------+------------+-----------------+----------+-------------+ ++---------------+--------------------+-------------------+-----------------+----------+-------------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+--------------------+-------------------+-----------------+----------+-------------+ +| greptime | information_schema | column_privileges | LOCAL TEMPORARY | 6 | | +| greptime | information_schema | column_statistics | LOCAL TEMPORARY | 7 | | +| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | | +| greptime | information_schema | engines | LOCAL TEMPORARY | 5 | | +| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | +| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | ++---------------+--------------------+-------------------+-----------------+----------+-------------+ select * from information_schema.columns order by table_schema, table_name; -+---------------+--------------------+------------+---------------+-----------+---------------+ -| table_catalog | table_schema | table_name | column_name | data_type | semantic_type | -+---------------+--------------------+------------+---------------+-----------+---------------+ -| greptime | information_schema | columns | table_catalog | String | FIELD | -| greptime | information_schema | columns | table_schema | String | FIELD | -| greptime | information_schema | columns | table_name | String | FIELD | -| greptime | information_schema | columns | column_name | String | FIELD | -| greptime | information_schema | columns | data_type | String | FIELD | -| greptime | information_schema | columns | semantic_type | String | FIELD | -| greptime | information_schema | tables | table_catalog | String | FIELD | -| greptime | information_schema | tables | table_schema | String | FIELD | -| greptime | information_schema | tables | table_name | String | FIELD | -| greptime | information_schema | tables | table_type | String | FIELD | -| greptime | information_schema | tables | table_id | UInt32 | FIELD | -| greptime | information_schema | tables | engine | String | FIELD | -| greptime | public | numbers | number | UInt32 | TAG | -+---------------+--------------------+------------+---------------+-----------+---------------+ ++---------------+--------------------+-------------------+----------------+-----------+---------------+ +| table_catalog | table_schema | table_name | column_name | data_type | semantic_type | ++---------------+--------------------+-------------------+----------------+-----------+---------------+ +| greptime | information_schema | column_privileges | grantee | String | FIELD | +| greptime | information_schema | column_privileges | is_grantable | String | FIELD | +| greptime | information_schema | column_privileges | privilege_type | String | FIELD | +| greptime | information_schema | column_privileges | column_name | String | FIELD | +| greptime | information_schema | column_privileges | table_name | String | FIELD | +| greptime | information_schema | column_privileges | table_schema | String | FIELD | +| greptime | information_schema | column_privileges | table_catalog | String | FIELD | +| greptime | information_schema | column_statistics | histogram | String | FIELD | +| greptime | information_schema | column_statistics | column_name | String | FIELD | +| greptime | information_schema | column_statistics | table_name | String | FIELD | +| greptime | information_schema | column_statistics | schema_name | String | FIELD | +| greptime | information_schema | columns | table_catalog | String | FIELD | +| greptime | information_schema | columns | semantic_type | String | FIELD | +| greptime | information_schema | columns | data_type | String | FIELD | +| greptime | information_schema | columns | column_name | String | FIELD | +| greptime | information_schema | columns | table_name | String | FIELD | +| greptime | information_schema | columns | table_schema | String | FIELD | +| greptime | information_schema | engines | engine | String | FIELD | +| greptime | information_schema | engines | support | String | FIELD | +| greptime | information_schema | engines | comment | String | FIELD | +| greptime | information_schema | engines | transactions | String | FIELD | +| greptime | information_schema | engines | xa | String | FIELD | +| greptime | information_schema | engines | savepoints | String | FIELD | +| greptime | information_schema | tables | table_catalog | String | FIELD | +| greptime | information_schema | tables | engine | String | FIELD | +| greptime | information_schema | tables | table_id | UInt32 | FIELD | +| greptime | information_schema | tables | table_type | String | FIELD | +| greptime | information_schema | tables | table_name | String | FIELD | +| greptime | information_schema | tables | table_schema | String | FIELD | +| greptime | public | numbers | number | UInt32 | TAG | ++---------------+--------------------+-------------------+----------------+-----------+---------------+ create database my_db;