diff --git a/Cargo.lock b/Cargo.lock index b042227a293b..0b3ece080a4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1183,6 +1183,7 @@ version = "0.5.0" dependencies = [ "api", "arc-swap", + "arrow", "arrow-schema", "async-stream", "async-trait", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 79f3603f4f19..715324e7a2db 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -11,6 +11,7 @@ testing = [] api.workspace = true arc-swap = "1.0" arrow-schema.workspace = true +arrow.workspace = true async-stream.workspace = true async-trait = "0.1" build-data = "0.1" diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 7dbc71f6f465..84be9576f610 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -15,6 +15,7 @@ mod columns; mod key_column_usage; mod memory_table; +mod predicate; mod schemata; mod table_names; mod tables; @@ -29,6 +30,7 @@ use datatypes::schema::SchemaRef; use futures_util::StreamExt; use lazy_static::lazy_static; use paste::paste; +pub(crate) use predicate::Predicates; use snafu::ResultExt; use store_api::data_source::DataSource; use store_api::storage::{ScanRequest, TableId}; @@ -159,7 +161,7 @@ impl InformationSchemaProvider { fn build_table(&self, name: &str) -> Option { self.information_table(name).map(|table| { let table_info = Self::table_info(self.catalog_name.clone(), &table); - let filter_pushdown = FilterPushDownType::Unsupported; + let filter_pushdown = FilterPushDownType::Inexact; let thin_table = ThinTable::new(table_info, filter_pushdown); let data_source = Arc::new(InformationTableDataSource::new(table)); @@ -238,7 +240,7 @@ trait InformationTable { fn schema(&self) -> SchemaRef; - fn to_stream(&self) -> Result; + fn to_stream(&self, request: ScanRequest) -> Result; fn table_type(&self) -> TableType { TableType::Temporary @@ -272,7 +274,7 @@ impl DataSource for InformationTableDataSource { &self, request: ScanRequest, ) -> std::result::Result { - let projection = request.projection; + let projection = request.projection.clone(); let projected_schema = match &projection { Some(projection) => self.try_project(projection)?, None => self.table.schema(), @@ -280,7 +282,7 @@ impl DataSource for InformationTableDataSource { let stream = self .table - .to_stream() + .to_stream(request) .map_err(BoxedError::new) .context(TablesRecordBatchSnafu) .map_err(BoxedError::new)? diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 27d4921928e8..5ca12eb73fbf 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -29,14 +29,16 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use super::{InformationTable, COLUMNS}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; +use crate::information_schema::Predicates; use crate::CatalogManager; pub(super) struct InformationSchemaColumns { @@ -102,14 +104,14 @@ impl InformationTable for InformationSchemaColumns { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_columns() + .make_columns(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -165,12 +167,13 @@ impl InformationSchemaColumnsBuilder { } /// Construct the `information_schema.columns` virtual table - async fn make_columns(&mut self) -> Result { + async fn make_columns(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager @@ -201,6 +204,7 @@ impl InformationSchemaColumnsBuilder { }; self.add_column( + &predicates, &catalog_name, &schema_name, &table_name, @@ -219,6 +223,7 @@ impl InformationSchemaColumnsBuilder { fn add_column( &mut self, + predicates: &Predicates, catalog_name: &str, schema_name: &str, table_name: &str, @@ -227,6 +232,19 @@ impl InformationSchemaColumnsBuilder { ) { let data_type = &column_schema.data_type.name(); + let row = [ + (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_SCHEMA, &Value::from(schema_name)), + (TABLE_NAME, &Value::from(table_name)), + (COLUMN_NAME, &Value::from(column_schema.name.as_str())), + (DATA_TYPE, &Value::from(data_type.as_str())), + (SEMANTIC_TYPE, &Value::from(semantic_type)), + ]; + + if !predicates.eval(&row) { + return; + } + self.catalog_names.push(Some(catalog_name)); self.schema_names.push(Some(schema_name)); self.table_names.push(Some(table_name)); @@ -279,7 +297,7 @@ impl DfPartitionStream for InformationSchemaColumns { schema, futures::stream::once(async move { builder - .make_columns() + .make_columns(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index 7c8a4995acfd..28fba3c63ced 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -25,17 +25,26 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use super::KEY_COLUMN_USAGE; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationTable; +use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; +const CONSTRAINT_SCHEMA: &str = "constraint_schema"; +const CONSTRAINT_NAME: &str = "constraint_name"; +const TABLE_CATALOG: &str = "table_catalog"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const COLUMN_NAME: &str = "column_name"; +const ORDINAL_POSITION: &str = "ordinal_position"; + /// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`. pub(super) struct InformationSchemaKeyColumnUsage { schema: SchemaRef, @@ -60,24 +69,16 @@ impl InformationSchemaKeyColumnUsage { false, ), ColumnSchema::new( - "constraint_schema", + CONSTRAINT_SCHEMA, ConcreteDataType::string_datatype(), false, ), - ColumnSchema::new( - "constraint_name", - ConcreteDataType::string_datatype(), - false, - ), - ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("column_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new( - "ordinal_position", - ConcreteDataType::uint32_datatype(), - false, - ), + ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false), ColumnSchema::new( "position_in_unique_constraint", ConcreteDataType::uint32_datatype(), @@ -123,14 +124,14 @@ impl InformationTable for InformationSchemaKeyColumnUsage { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_key_column_usage() + .make_key_column_usage(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -192,14 +193,14 @@ impl InformationSchemaKeyColumnUsageBuilder { } /// Construct the `information_schema.KEY_COLUMN_USAGE` virtual table - async fn make_key_column_usage(&mut self) -> Result { + async fn make_key_column_usage(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); - let mut time_index_constraints = vec![]; let mut primary_constraints = vec![]; for schema_name in catalog_manager.schema_names(&catalog_name).await? { @@ -223,11 +224,15 @@ impl InformationSchemaKeyColumnUsageBuilder { for (idx, column) in schema.column_schemas().iter().enumerate() { if column.is_time_index() { - time_index_constraints.push(( - schema_name.clone(), - table_name.clone(), - column.name.clone(), - )); + self.add_key_column_usage( + &predicates, + &schema_name, + "TIME INDEX", + &schema_name, + &table_name, + &column.name, + 1, //always 1 for time index + ); } if keys.contains(&idx) { primary_constraints.push(( @@ -244,22 +249,11 @@ impl InformationSchemaKeyColumnUsageBuilder { } } - for (i, (schema_name, table_name, column_name)) in - time_index_constraints.into_iter().enumerate() - { - self.add_key_column_usage( - &schema_name, - "TIME INDEX", - &schema_name, - &table_name, - &column_name, - i as u32 + 1, - ); - } for (i, (schema_name, table_name, column_name)) in primary_constraints.into_iter().enumerate() { self.add_key_column_usage( + &predicates, &schema_name, "PRIMARY", &schema_name, @@ -274,8 +268,10 @@ impl InformationSchemaKeyColumnUsageBuilder { // TODO(dimbtp): Foreign key constraint has not `None` value for last 4 // fields, but it is not supported yet. + #[allow(clippy::too_many_arguments)] fn add_key_column_usage( &mut self, + predicates: &Predicates, constraint_schema: &str, constraint_name: &str, table_schema: &str, @@ -283,6 +279,19 @@ impl InformationSchemaKeyColumnUsageBuilder { column_name: &str, ordinal_position: u32, ) { + let row = [ + (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)), + (CONSTRAINT_NAME, &Value::from(constraint_name)), + (TABLE_SCHEMA, &Value::from(table_schema)), + (TABLE_NAME, &Value::from(table_name)), + (COLUMN_NAME, &Value::from(column_name)), + (ORDINAL_POSITION, &Value::from(ordinal_position)), + ]; + + if !predicates.eval(&row) { + return; + } + self.constraint_catalog.push(Some("def")); self.constraint_schema.push(Some(constraint_schema)); self.constraint_name.push(Some(constraint_name)); @@ -328,7 +337,7 @@ impl DfPartitionStream for InformationSchemaKeyColumnUsage { schema, futures::stream::once(async move { builder - .make_key_column_usage() + .make_key_column_usage(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/information_schema/memory_table.rs index cce53c88a73c..9597fc5d12e3 100644 --- a/src/catalog/src/information_schema/memory_table.rs +++ b/src/catalog/src/information_schema/memory_table.rs @@ -26,7 +26,7 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; use snafu::ResultExt; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; pub use tables::get_schema_columns; use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; @@ -74,7 +74,7 @@ impl InformationTable for MemoryTable { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, _request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( @@ -169,7 +169,7 @@ mod tests { assert_eq!("test", table.table_name()); assert_eq!(schema, InformationTable::schema(&table)); - let stream = table.to_stream().unwrap(); + let stream = table.to_stream(ScanRequest::default()).unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -198,7 +198,7 @@ mod tests { assert_eq!("test", table.table_name()); assert_eq!(schema, InformationTable::schema(&table)); - let stream = table.to_stream().unwrap(); + let stream = table.to_stream(ScanRequest::default()).unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs new file mode 100644 index 000000000000..9afc83a389f5 --- /dev/null +++ b/src/catalog/src/information_schema/predicate.rs @@ -0,0 +1,609 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::array::StringArray; +use arrow::compute::kernels::comparison; +use common_query::logical_plan::DfExpr; +use datafusion::common::ScalarValue; +use datafusion::logical_expr::expr::Like; +use datafusion::logical_expr::Operator; +use datatypes::value::Value; +use store_api::storage::ScanRequest; + +type ColumnName = String; +/// Predicate to filter `information_schema` tables stream, +/// we only support these simple predicates currently. +/// TODO(dennis): supports more predicate types. +#[derive(Clone, PartialEq, Eq, Debug)] +enum Predicate { + Eq(ColumnName, Value), + Like(ColumnName, String, bool), + NotEq(ColumnName, Value), + InList(ColumnName, Vec), + And(Box, Box), + Or(Box, Box), + Not(Box), +} + +impl Predicate { + /// Evaluate the predicate with the row, returns: + /// - `None` when the predicate can't evaluate with the row. + /// - `Some(true)` when the predicate is satisfied, + /// - `Some(false)` when the predicate is not satisfied, + fn eval(&self, row: &[(&str, &Value)]) -> Option { + match self { + Predicate::Eq(c, v) => { + for (column, value) in row { + if c != column { + continue; + } + return Some(v == *value); + } + } + Predicate::Like(c, pattern, case_insensitive) => { + for (column, value) in row { + if c != column { + continue; + } + + let Value::String(bs) = value else { + continue; + }; + + return like_utf8(bs.as_utf8(), pattern, case_insensitive); + } + } + Predicate::NotEq(c, v) => { + for (column, value) in row { + if c != column { + continue; + } + return Some(v != *value); + } + } + Predicate::InList(c, values) => { + for (column, value) in row { + if c != column { + continue; + } + return Some(values.iter().any(|v| v == *value)); + } + } + Predicate::And(left, right) => { + let left = left.eval(row); + + // short-circuit + if matches!(left, Some(false)) { + return Some(false); + } + + return match (left, right.eval(row)) { + (Some(left), Some(right)) => Some(left && right), + (None, Some(false)) => Some(false), + _ => None, + }; + } + Predicate::Or(left, right) => { + let left = left.eval(row); + + // short-circuit + if matches!(left, Some(true)) { + return Some(true); + } + + return match (left, right.eval(row)) { + (Some(left), Some(right)) => Some(left || right), + (None, Some(true)) => Some(true), + _ => None, + }; + } + Predicate::Not(p) => { + let Some(b) = p.eval(row) else { + return None; + }; + + return Some(!b); + } + } + + // Can't evaluate predicate with the row + None + } + + /// Try to create a predicate from datafusion [`Expr`], return None if fails. + fn from_expr(expr: DfExpr) -> Option { + match expr { + // NOT expr + DfExpr::Not(expr) => { + let Some(p) = Self::from_expr(*expr) else { + return None; + }; + + Some(Predicate::Not(Box::new(p))) + } + // expr LIKE pattern + DfExpr::Like(Like { + negated, + expr, + pattern, + case_insensitive, + .. + }) if is_column(&expr) && is_string_literal(&pattern) => { + // Safety: ensured by gurad + let DfExpr::Column(c) = *expr else { + unreachable!(); + }; + let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else { + unreachable!(); + }; + + let p = Predicate::Like(c.name, pattern, case_insensitive); + + if negated { + Some(Predicate::Not(Box::new(p))) + } else { + Some(p) + } + } + // left OP right + DfExpr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) { + // left == right + (DfExpr::Literal(scalar), Operator::Eq, DfExpr::Column(c)) + | (DfExpr::Column(c), Operator::Eq, DfExpr::Literal(scalar)) => { + let Ok(v) = Value::try_from(scalar) else { + return None; + }; + + Some(Predicate::Eq(c.name, v)) + } + // left != right + (DfExpr::Literal(scalar), Operator::NotEq, DfExpr::Column(c)) + | (DfExpr::Column(c), Operator::NotEq, DfExpr::Literal(scalar)) => { + let Ok(v) = Value::try_from(scalar) else { + return None; + }; + + Some(Predicate::NotEq(c.name, v)) + } + // left AND right + (left, Operator::And, right) => { + let Some(left) = Self::from_expr(left) else { + return None; + }; + + let Some(right) = Self::from_expr(right) else { + return None; + }; + + Some(Predicate::And(Box::new(left), Box::new(right))) + } + // left OR right + (left, Operator::Or, right) => { + let Some(left) = Self::from_expr(left) else { + return None; + }; + + let Some(right) = Self::from_expr(right) else { + return None; + }; + + Some(Predicate::Or(Box::new(left), Box::new(right))) + } + _ => None, + }, + // [NOT] IN (LIST) + DfExpr::InList(list) => { + match (*list.expr, list.list, list.negated) { + // column [NOT] IN (v1, v2, v3, ...) + (DfExpr::Column(c), list, negated) if is_all_scalars(&list) => { + let mut values = Vec::with_capacity(list.len()); + for scalar in list { + // Safety: checked by `is_all_scalars` + let DfExpr::Literal(scalar) = scalar else { + unreachable!(); + }; + + let Ok(value) = Value::try_from(scalar) else { + return None; + }; + + values.push(value); + } + + let predicate = Predicate::InList(c.name, values); + + if negated { + Some(Predicate::Not(Box::new(predicate))) + } else { + Some(predicate) + } + } + _ => None, + } + } + _ => None, + } + } +} + +/// Perform SQL left LIKE right, return `None` if fail to evaluate. +/// - `s` the target string +/// - `pattern` the pattern just like '%abc' +/// - `case_insensitive` whether to perform case-insensitive like or not. +fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option { + let array = StringArray::from(vec![s]); + let patterns = StringArray::new_scalar(pattern); + + let Ok(booleans) = (if *case_insensitive { + comparison::ilike(&array, &patterns) + } else { + comparison::like(&array, &patterns) + }) else { + return None; + }; + + // Safety: at least one value in result + Some(booleans.value(0)) +} + +fn is_string_literal(expr: &DfExpr) -> bool { + matches!(expr, DfExpr::Literal(ScalarValue::Utf8(Some(_)))) +} + +fn is_column(expr: &DfExpr) -> bool { + matches!(expr, DfExpr::Column(_)) +} + +/// A list of predicate +pub struct Predicates { + predicates: Vec, +} + +impl Predicates { + /// Try its best to create predicates from [`ScanRequest`]. + pub fn from_scan_request(request: &Option) -> Predicates { + if let Some(request) = request { + let mut predicates = Vec::with_capacity(request.filters.len()); + + for filter in &request.filters { + if let Some(predicate) = Predicate::from_expr(filter.df_expr().clone()) { + predicates.push(predicate); + } + } + + Self { predicates } + } else { + Self { + predicates: Vec::new(), + } + } + } + + /// Evaluate the predicates with the row. + /// returns true when all the predicates are satisfied or can't be evaluated. + pub fn eval(&self, row: &[(&str, &Value)]) -> bool { + // fast path + if self.predicates.is_empty() { + return true; + } + + self.predicates + .iter() + .filter_map(|p| p.eval(row)) + .all(|b| b) + } +} + +/// Returns true when the values are all [`DfExpr::Literal`]. +fn is_all_scalars(list: &[DfExpr]) -> bool { + list.iter().all(|v| matches!(v, DfExpr::Literal(_))) +} + +#[cfg(test)] +mod tests { + use datafusion::common::{Column, ScalarValue}; + use datafusion::logical_expr::expr::InList; + use datafusion::logical_expr::BinaryExpr; + + use super::*; + + #[test] + fn test_predicate_eval() { + let a_col = "a".to_string(); + let b_col = "b".to_string(); + let a_value = Value::from("a_value"); + let b_value = Value::from("b_value"); + let wrong_value = Value::from("wrong_value"); + + let a_row = [(a_col.as_str(), &a_value)]; + let b_row = [("b", &wrong_value)]; + let wrong_row = [(a_col.as_str(), &wrong_value)]; + + // Predicate::Eq + let p = Predicate::Eq(a_col.clone(), a_value.clone()); + assert!(p.eval(&a_row).unwrap()); + assert!(p.eval(&b_row).is_none()); + assert!(!p.eval(&wrong_row).unwrap()); + + // Predicate::NotEq + let p = Predicate::NotEq(a_col.clone(), a_value.clone()); + assert!(!p.eval(&a_row).unwrap()); + assert!(p.eval(&b_row).is_none()); + assert!(p.eval(&wrong_row).unwrap()); + + // Predicate::InList + let p = Predicate::InList(a_col.clone(), vec![a_value.clone(), b_value.clone()]); + assert!(p.eval(&a_row).unwrap()); + assert!(p.eval(&b_row).is_none()); + assert!(!p.eval(&wrong_row).unwrap()); + assert!(p.eval(&[(&a_col, &b_value)]).unwrap()); + + let p1 = Predicate::Eq(a_col.clone(), a_value.clone()); + let p2 = Predicate::Eq(b_col.clone(), b_value.clone()); + let row = [(a_col.as_str(), &a_value), (b_col.as_str(), &b_value)]; + let wrong_row = [(a_col.as_str(), &a_value), (b_col.as_str(), &wrong_value)]; + + //Predicate::And + let p = Predicate::And(Box::new(p1.clone()), Box::new(p2.clone())); + assert!(p.eval(&row).unwrap()); + assert!(!p.eval(&wrong_row).unwrap()); + assert!(p.eval(&[]).is_none()); + assert!(p.eval(&[("c", &a_value)]).is_none()); + assert!(!p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)]) + .unwrap()); + assert!(!p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)]) + .is_none()); + assert!(!p + .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)]) + .unwrap()); + + //Predicate::Or + let p = Predicate::Or(Box::new(p1), Box::new(p2)); + assert!(p.eval(&row).unwrap()); + assert!(p.eval(&wrong_row).unwrap()); + assert!(p.eval(&[]).is_none()); + assert!(p.eval(&[("c", &a_value)]).is_none()); + assert!(!p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)]) + .unwrap()); + assert!(p + .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)]) + .is_none()); + } + + #[test] + fn test_predicate_like() { + // case insensitive + let expr = DfExpr::Like(Like { + negated: false, + expr: Box::new(column("a")), + pattern: Box::new(string_literal("%abc")), + case_insensitive: true, + escape_char: None, + }); + + let p = Predicate::from_expr(expr).unwrap(); + assert!( + matches!(&p, Predicate::Like(c, pattern, case_insensitive) if + c == "a" + && pattern == "%abc" + && *case_insensitive) + ); + + let match_row = [ + ("a", &Value::from("hello AbC")), + ("b", &Value::from("b value")), + ]; + let unmatch_row = [("a", &Value::from("bca")), ("b", &Value::from("b value"))]; + + assert!(p.eval(&match_row).unwrap()); + assert!(!p.eval(&unmatch_row).unwrap()); + assert!(p.eval(&[]).is_none()); + + // case sensitive + let expr = DfExpr::Like(Like { + negated: false, + expr: Box::new(column("a")), + pattern: Box::new(string_literal("%abc")), + case_insensitive: false, + escape_char: None, + }); + + let p = Predicate::from_expr(expr).unwrap(); + assert!( + matches!(&p, Predicate::Like(c, pattern, case_insensitive) if + c == "a" + && pattern == "%abc" + && !*case_insensitive) + ); + assert!(!p.eval(&match_row).unwrap()); + assert!(!p.eval(&unmatch_row).unwrap()); + assert!(p.eval(&[]).is_none()); + + // not like + let expr = DfExpr::Like(Like { + negated: true, + expr: Box::new(column("a")), + pattern: Box::new(string_literal("%abc")), + case_insensitive: true, + escape_char: None, + }); + + let p = Predicate::from_expr(expr).unwrap(); + assert!(!p.eval(&match_row).unwrap()); + assert!(p.eval(&unmatch_row).unwrap()); + assert!(p.eval(&[]).is_none()); + } + + fn column(name: &str) -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: name.to_string(), + }) + } + + fn string_literal(v: &str) -> DfExpr { + DfExpr::Literal(ScalarValue::Utf8(Some(v.to_string()))) + } + + fn match_string_value(v: &Value, expected: &str) -> bool { + matches!(v, Value::String(bs) if bs.as_utf8() == expected) + } + + fn match_string_values(vs: &[Value], expected: &[&str]) -> bool { + assert_eq!(vs.len(), expected.len()); + + let mut result = true; + for (i, v) in vs.iter().enumerate() { + result = result && match_string_value(v, expected[i]); + } + + result + } + + fn mock_exprs() -> (DfExpr, DfExpr) { + let expr1 = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(column("a")), + op: Operator::Eq, + right: Box::new(string_literal("a_value")), + }); + + let expr2 = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(column("b")), + op: Operator::NotEq, + right: Box::new(string_literal("b_value")), + }); + + (expr1, expr2) + } + + #[test] + fn test_predicate_from_expr() { + let (expr1, expr2) = mock_exprs(); + + let p1 = Predicate::from_expr(expr1.clone()).unwrap(); + assert!(matches!(&p1, Predicate::Eq(column, v) if column == "a" + && match_string_value(v, "a_value"))); + + let p2 = Predicate::from_expr(expr2.clone()).unwrap(); + assert!(matches!(&p2, Predicate::NotEq(column, v) if column == "b" + && match_string_value(v, "b_value"))); + + let and_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(expr1.clone()), + op: Operator::And, + right: Box::new(expr2.clone()), + }); + let or_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(expr1.clone()), + op: Operator::Or, + right: Box::new(expr2.clone()), + }); + let not_expr = DfExpr::Not(Box::new(expr1.clone())); + + let and_p = Predicate::from_expr(and_expr).unwrap(); + assert!(matches!(and_p, Predicate::And(left, right) if *left == p1 && *right == p2)); + let or_p = Predicate::from_expr(or_expr).unwrap(); + assert!(matches!(or_p, Predicate::Or(left, right) if *left == p1 && *right == p2)); + let not_p = Predicate::from_expr(not_expr).unwrap(); + assert!(matches!(not_p, Predicate::Not(p) if *p == p1)); + + let inlist_expr = DfExpr::InList(InList { + expr: Box::new(column("a")), + list: vec![string_literal("a1"), string_literal("a2")], + negated: false, + }); + + let inlist_p = Predicate::from_expr(inlist_expr).unwrap(); + assert!(matches!(&inlist_p, Predicate::InList(c, values) if c == "a" + && match_string_values(values, &["a1", "a2"]))); + + let inlist_expr = DfExpr::InList(InList { + expr: Box::new(column("a")), + list: vec![string_literal("a1"), string_literal("a2")], + negated: true, + }); + let inlist_p = Predicate::from_expr(inlist_expr).unwrap(); + assert!(matches!(inlist_p, Predicate::Not(p) if + matches!(&*p, + Predicate::InList(c, values) if c == "a" + && match_string_values(values, &["a1", "a2"])))); + } + + #[test] + fn test_predicates_from_scan_request() { + let predicates = Predicates::from_scan_request(&None); + assert!(predicates.predicates.is_empty()); + + let (expr1, expr2) = mock_exprs(); + + let request = ScanRequest { + filters: vec![expr1.into(), expr2.into()], + ..Default::default() + }; + let predicates = Predicates::from_scan_request(&Some(request)); + + assert_eq!(2, predicates.predicates.len()); + assert!( + matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a" + && match_string_value(v, "a_value")) + ); + assert!( + matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b" + && match_string_value(v, "b_value")) + ); + } + + #[test] + fn test_predicates_eval_row() { + let wrong_row = [ + ("a", &Value::from("a_value")), + ("b", &Value::from("b_value")), + ("c", &Value::from("c_value")), + ]; + let row = [ + ("a", &Value::from("a_value")), + ("b", &Value::from("not_b_value")), + ("c", &Value::from("c_value")), + ]; + let c_row = [("c", &Value::from("c_value"))]; + + // test empty predicates, always returns true + let predicates = Predicates::from_scan_request(&None); + assert!(predicates.eval(&row)); + assert!(predicates.eval(&wrong_row)); + assert!(predicates.eval(&c_row)); + + let (expr1, expr2) = mock_exprs(); + let request = ScanRequest { + filters: vec![expr1.into(), expr2.into()], + ..Default::default() + }; + let predicates = Predicates::from_scan_request(&Some(request)); + assert!(predicates.eval(&row)); + assert!(!predicates.eval(&wrong_row)); + assert!(predicates.eval(&c_row)); + } +} diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs index 8f523f7a9383..eddfb142cc77 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/information_schema/schemata.rs @@ -25,17 +25,23 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::StringVectorBuilder; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use super::SCHEMATA; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationTable; +use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; +const CATALOG_NAME: &str = "catalog_name"; +const SCHEMA_NAME: &str = "schema_name"; +const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name"; +const DEFAULT_COLLATION_NAME: &str = "default_collation_name"; + /// The `information_schema.schemata` table implementation. pub(super) struct InformationSchemaSchemata { schema: SchemaRef, @@ -54,15 +60,15 @@ impl InformationSchemaSchemata { pub(crate) fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ - ColumnSchema::new("catalog_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("schema_name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new(CATALOG_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(SCHEMA_NAME, ConcreteDataType::string_datatype(), false), ColumnSchema::new( - "default_character_set_name", + DEFAULT_CHARACTER_SET_NAME, ConcreteDataType::string_datatype(), false, ), ColumnSchema::new( - "default_collation_name", + DEFAULT_COLLATION_NAME, ConcreteDataType::string_datatype(), false, ), @@ -92,14 +98,14 @@ impl InformationTable for InformationSchemaSchemata { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_schemata() + .make_schemata(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -147,12 +153,13 @@ impl InformationSchemaSchemataBuilder { } /// Construct the `information_schema.schemata` virtual table - async fn make_schemata(&mut self) -> Result { + async fn make_schemata(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager @@ -162,13 +169,24 @@ impl InformationSchemaSchemataBuilder { continue; } - self.add_schema(&catalog_name, &schema_name); + self.add_schema(&predicates, &catalog_name, &schema_name); } self.finish() } - fn add_schema(&mut self, catalog_name: &str, schema_name: &str) { + fn add_schema(&mut self, predicates: &Predicates, catalog_name: &str, schema_name: &str) { + let row = [ + (CATALOG_NAME, &Value::from(catalog_name)), + (SCHEMA_NAME, &Value::from(schema_name)), + (DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")), + (DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")), + ]; + + if !predicates.eval(&row) { + return; + } + self.catalog_names.push(Some(catalog_name)); self.schema_names.push(Some(schema_name)); self.charset_names.push(Some("utf8")); @@ -200,7 +218,7 @@ impl DfPartitionStream for InformationSchemaSchemata { schema, futures::stream::once(async move { builder - .make_schemata() + .make_schemata(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index d258dd490130..5320e50277f0 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -25,18 +25,26 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{ScanRequest, TableId}; use table::metadata::TableType; use super::TABLES; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationTable; +use crate::information_schema::{InformationTable, Predicates}; use crate::CatalogManager; +const TABLE_CATALOG: &str = "table_catalog"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const TABLE_TYPE: &str = "table_type"; +const TABLE_ID: &str = "table_id"; +const ENGINE: &str = "engine"; + pub(super) struct InformationSchemaTables { schema: SchemaRef, catalog_name: String, @@ -54,12 +62,12 @@ impl InformationSchemaTables { pub(crate) fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ - ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("table_id", ConcreteDataType::uint32_datatype(), true), - ColumnSchema::new("engine", ConcreteDataType::string_datatype(), true), + ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true), ])) } @@ -85,14 +93,14 @@ impl InformationTable for InformationSchemaTables { self.schema.clone() } - fn to_stream(&self) -> Result { + fn to_stream(&self, request: ScanRequest) -> Result { let schema = self.schema.arrow_schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( schema, futures::stream::once(async move { builder - .make_tables() + .make_tables(Some(request)) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -142,12 +150,13 @@ impl InformationSchemaTablesBuilder { } /// Construct the `information_schema.tables` virtual table - async fn make_tables(&mut self) -> Result { + async fn make_tables(&mut self, request: Option) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager @@ -167,6 +176,7 @@ impl InformationSchemaTablesBuilder { { let table_info = table.table_info(); self.add_table( + &predicates, &catalog_name, &schema_name, &table_name, @@ -183,8 +193,10 @@ impl InformationSchemaTablesBuilder { self.finish() } + #[allow(clippy::too_many_arguments)] fn add_table( &mut self, + predicates: &Predicates, catalog_name: &str, schema_name: &str, table_name: &str, @@ -192,14 +204,27 @@ impl InformationSchemaTablesBuilder { table_id: Option, engine: Option<&str>, ) { - self.catalog_names.push(Some(catalog_name)); - self.schema_names.push(Some(schema_name)); - self.table_names.push(Some(table_name)); - self.table_types.push(Some(match table_type { + let table_type = match table_type { TableType::Base => "BASE TABLE", TableType::View => "VIEW", TableType::Temporary => "LOCAL TEMPORARY", - })); + }; + + let row = [ + (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_SCHEMA, &Value::from(schema_name)), + (TABLE_NAME, &Value::from(table_name)), + (TABLE_TYPE, &Value::from(table_type)), + ]; + + if !predicates.eval(&row) { + return; + } + + self.catalog_names.push(Some(catalog_name)); + self.schema_names.push(Some(schema_name)); + self.table_names.push(Some(table_name)); + self.table_types.push(Some(table_type)); self.table_ids.push(table_id); self.engines.push(engine); } @@ -229,7 +254,7 @@ impl DfPartitionStream for InformationSchemaTables { schema, futures::stream::once(async move { builder - .make_tables() + .make_tables(None) .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 2243265bf3a9..75a692b51f1e 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -324,6 +324,40 @@ order by table_name; | foo | +------------+ +select table_name +from information_schema.tables +where table_schema in ('my_db', 'public') +order by table_name; + ++------------+ +| table_name | ++------------+ +| foo | +| numbers | ++------------+ + +select table_name +from information_schema.tables +where table_schema like 'my%' +order by table_name; + ++------------+ +| table_name | ++------------+ +| foo | ++------------+ + +select table_name +from information_schema.tables +where table_schema not in ('my_db', 'information_schema') +order by table_name; + ++------------+ +| table_name | ++------------+ +| numbers | ++------------+ + select table_catalog, table_schema, table_name, table_type, engine from information_schema.tables where table_catalog = 'greptime' @@ -350,6 +384,22 @@ order by table_schema, table_name, column_name; | greptime | my_db | foo | ts | TimestampMillisecond | TIMESTAMP | +---------------+--------------+------------+-------------+----------------------+---------------+ +-- test query filter for columns -- +select table_catalog, table_schema, table_name, column_name, data_type, semantic_type +from information_schema.columns +where table_catalog = 'greptime' + and (table_schema in ('public') + or + table_schema == 'my_db') +order by table_schema, table_name, column_name; + ++---------------+--------------+------------+-------------+----------------------+---------------+ +| table_catalog | table_schema | table_name | column_name | data_type | semantic_type | ++---------------+--------------+------------+-------------+----------------------+---------------+ +| greptime | my_db | foo | ts | TimestampMillisecond | TIMESTAMP | +| greptime | public | numbers | number | UInt32 | TAG | ++---------------+--------------+------------+-------------+----------------------+---------------+ + use public; Affected Rows: 0 @@ -362,6 +412,44 @@ use information_schema; Affected Rows: 0 +-- test query filter for key_column_usage -- +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | public | PRIMARY | def | public | numbers | number | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX'; + ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +| def | public | PRIMARY | def | public | numbers | number | 1 | | | | | ++--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db'; + +++ +++ + -- schemata -- desc table schemata; diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index ef0fbdeb7578..0ba3508aca47 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -24,6 +24,21 @@ from information_schema.tables where table_schema = 'my_db' order by table_name; +select table_name +from information_schema.tables +where table_schema in ('my_db', 'public') +order by table_name; + +select table_name +from information_schema.tables +where table_schema like 'my%' +order by table_name; + +select table_name +from information_schema.tables +where table_schema not in ('my_db', 'information_schema') +order by table_name; + select table_catalog, table_schema, table_name, table_type, engine from information_schema.tables where table_catalog = 'greptime' @@ -38,12 +53,32 @@ where table_catalog = 'greptime' and table_schema != 'information_schema' order by table_schema, table_name, column_name; +-- test query filter for columns -- +select table_catalog, table_schema, table_name, column_name, data_type, semantic_type +from information_schema.columns +where table_catalog = 'greptime' + and (table_schema in ('public') + or + table_schema == 'my_db') +order by table_schema, table_name, column_name; + use public; drop schema my_db; use information_schema; +-- test query filter for key_column_usage -- +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX'; + +select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db'; + -- schemata -- desc table schemata;