Skip to content

Commit

Permalink
fix: by cr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Dec 15, 2023
1 parent 40c78e3 commit 241f8c8
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 123 deletions.
64 changes: 46 additions & 18 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::CatalogManager;

lazy_static! {
// Memory tables in `information_schema`.
static ref MEMORY_TABLES: Vec<&'static str> = vec![
static ref MEMORY_TABLES: &'static [&'static str] = &[
ENGINES,
COLUMN_PRIVILEGES,
COLUMN_STATISTICS
Expand All @@ -72,37 +72,65 @@ macro_rules! setup_memory_table {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
tables: Option<HashMap<String, TableRef>>,
}

impl InformationSchemaProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
let mut provider = Self {
catalog_name,
catalog_manager,
}
tables: None,
};

provider.build_tables();

provider
}

/// Build a map of [TableRef] in information schema.
/// Including `tables` and `columns`.
pub fn build(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> HashMap<String, TableRef> {
let provider = Self::new(catalog_name, catalog_manager);
/// Returns table names in the order of table id.
pub fn table_names(&self) -> Vec<String> {
let mut tables = self.tables().into_values().collect::<Vec<_>>();
tables.sort_by(|t1, t2| {
t1.table_info()
.table_id()
.partial_cmp(&t2.table_info().table_id())
.unwrap()
});

tables
.into_iter()
.map(|t| t.table_info().name.clone())
.collect()
}

let mut schema = HashMap::new();
schema.insert(TABLES.to_string(), provider.table(TABLES).unwrap());
schema.insert(COLUMNS.to_string(), provider.table(COLUMNS).unwrap());
/// Returns a map of [TableRef] in information schema.
pub fn tables(&self) -> HashMap<String, TableRef> {
// Safety: already built in `new`.
self.tables.clone().unwrap()
}

/// Returns the [TableRef] by table name.
pub fn table(&self, name: &str) -> Option<TableRef> {
self.tables().get(name).cloned()
}

fn build_tables(&mut self) -> HashMap<String, TableRef> {
let mut tables = HashMap::new();
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());

// Add memory tables
for name in MEMORY_TABLES.iter() {
schema.insert((*name).to_string(), provider.table(name).unwrap());
tables.insert((*name).to_string(), self.build_table(name).unwrap());
}

schema
self.tables = Some(tables.clone());

tables
}

pub fn table(&self, name: &str) -> Option<TableRef> {
fn build_table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Unsupported;
Expand All @@ -114,8 +142,7 @@ impl InformationSchemaProvider {
}

fn information_table(&self, name: &str) -> Option<InformationTableRef> {
let name = name.to_ascii_lowercase();
match name.clone().as_str() {
match name.to_ascii_lowercase().as_str() {
TABLES => Some(Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
Expand Down Expand Up @@ -214,6 +241,7 @@ impl DataSource for InformationTableDataSource {
stream: Box::pin(stream),
output_ordering: None,
};

Ok(Box::pin(stream))
}
}
68 changes: 29 additions & 39 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD,
SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
Expand All @@ -33,8 +33,7 @@ use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;

use super::tables::InformationSchemaTables;
use super::{InformationTable, COLUMNS, TABLES};
use super::{InformationTable, COLUMNS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
Expand Down Expand Up @@ -102,7 +101,7 @@ impl InformationTable for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_columns()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down Expand Up @@ -148,8 +147,8 @@ impl InformationSchemaColumnsBuilder {
}
}

/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
/// Construct the `information_schema.columns` virtual table
async fn make_columns(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
Expand All @@ -163,48 +162,38 @@ impl InformationSchemaColumnsBuilder {
{
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let (keys, schema) = if let Some(table) = catalog_manager
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
(keys.clone(), schema)
} else {
// TODO: this specific branch is only a workaround for FrontendCatalogManager.
if schema_name == INFORMATION_SCHEMA_NAME {
if table_name == COLUMNS {
(vec![], InformationSchemaColumns::schema())
} else if table_name == TABLES {
(vec![], InformationSchemaTables::schema())

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
continue;
}
} else {
continue;
SEMANTIC_TYPE_FIELD
};

self.add_column(
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
);
}
};

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
self.add_column(
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
);
} else {
unreachable!();
}
}
}
Expand Down Expand Up @@ -238,6 +227,7 @@ impl InformationSchemaColumnsBuilder {
Arc::new(self.data_types.finish()),
Arc::new(self.semantic_types.finish()),
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
Expand All @@ -254,7 +244,7 @@ impl DfPartitionStream for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_columns()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down
32 changes: 4 additions & 28 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME,
INFORMATION_SCHEMA_TABLES_TABLE_ID,
};
use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
Expand All @@ -33,7 +30,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::metadata::TableType;

use super::{COLUMNS, TABLES};
use super::TABLES;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
Expand Down Expand Up @@ -178,29 +175,8 @@ impl InformationSchemaTablesBuilder {
Some(&table_info.meta.engine),
);
} else {
// TODO: this specific branch is only a workaround for FrontendCatalogManager.
if schema_name == INFORMATION_SCHEMA_NAME {
if table_name == COLUMNS {
self.add_table(
&catalog_name,
&schema_name,
&table_name,
TableType::Temporary,
Some(INFORMATION_SCHEMA_COLUMNS_TABLE_ID),
None,
);
} else if table_name == TABLES {
self.add_table(
&catalog_name,
&schema_name,
&table_name,
TableType::Temporary,
Some(INFORMATION_SCHEMA_TABLES_TABLE_ID),
None,
);
}
}
};
unreachable!();
}
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult,
TableMetadataManagerSnafu,
};
use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogManager;

/// Access all existing catalog, schema and tables.
Expand Down Expand Up @@ -81,6 +81,11 @@ impl KvBackendCatalogManager {
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
// The catalog name is not used in system_catalog, so let it empty
"".to_string(),
me.clone(),
)),
},
})
}
Expand Down Expand Up @@ -231,11 +236,11 @@ impl CatalogManager for KvBackendCatalogManager {
// a new catalog is created.
/// Existing system tables:
/// - public.numbers
/// - information_schema.tables
/// - information_schema.columns
/// - information_schema.{tables}
#[derive(Clone)]
struct SystemCatalog {
catalog_manager: Weak<KvBackendCatalogManager>,
information_schema_provider: Arc<InformationSchemaProvider>,
}

impl SystemCatalog {
Expand All @@ -245,7 +250,7 @@ impl SystemCatalog {

fn table_names(&self, schema: &str) -> Vec<String> {
if schema == INFORMATION_SCHEMA_NAME {
vec![TABLES.to_string(), COLUMNS.to_string()]
self.information_schema_provider.table_names()
} else if schema == DEFAULT_SCHEMA_NAME {
vec![NUMBERS_TABLE_NAME.to_string()]
} else {
Expand All @@ -259,7 +264,7 @@ impl SystemCatalog {

fn table_exist(&self, schema: &str, table: &str) -> bool {
if schema == INFORMATION_SCHEMA_NAME {
table == TABLES || table == COLUMNS
self.information_schema_provider.table(table).is_some()
} else if schema == DEFAULT_SCHEMA_NAME {
table == NUMBERS_TABLE_NAME
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,11 @@ impl MemoryCatalogManager {
}

fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
let information_schema = InformationSchemaProvider::build(
let information_schema_provider = InformationSchemaProvider::new(
catalog,
Arc::downgrade(self) as Weak<dyn CatalogManager>,
);
let information_schema = information_schema_provider.tables();
let mut catalog = HashMap::new();
catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
catalog
Expand Down
5 changes: 3 additions & 2 deletions tests-integration/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {

// User can only see information schema under current catalog.
// A necessary requirement to GreptimeCloud.
let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' order by table_name";
let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' and table_name in ('columns', 'numbers', 'tables', 'another_table') order by table_name";

let output = execute_sql(&instance, sql).await;
let expected = "\
Expand Down Expand Up @@ -1760,6 +1760,7 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {

#[apply(both_instances_cases)]
async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
let instance = instance.frontend();

let sql = "create table another_table(i timestamp time index)";
Expand All @@ -1769,7 +1770,7 @@ async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {

// User can only see information schema under current catalog.
// A necessary requirement to GreptimeCloud.
let sql = "select table_catalog, table_schema, table_name, column_name, data_type, semantic_type from information_schema.columns order by table_name";
let sql = "select table_catalog, table_schema, table_name, column_name, data_type, semantic_type from information_schema.columns where table_name in ('columns', 'numbers', 'tables', 'another_table') order by table_name";

let output = execute_sql(&instance, sql).await;
let expected = "\
Expand Down
Loading

0 comments on commit 241f8c8

Please sign in to comment.