Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds some tables to information_schema #2935

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ meta-client.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
paste = "1.0"
prometheus.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down
104 changes: 85 additions & 19 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
// limitations under the License.

mod columns;
mod memory_table;
mod table_names;
mod tables;

use std::collections::HashMap;
use std::sync::{Arc, Weak};

use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_catalog::consts::{self, INFORMATION_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use lazy_static::lazy_static;
use paste::paste;
use snafu::ResultExt;
use store_api::data_source::DataSource;
use store_api::storage::{ScanRequest, TableId};
Expand All @@ -32,43 +36,101 @@ use table::metadata::{
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
pub use table_names::*;

use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

pub const TABLES: &str = "tables";
pub const COLUMNS: &str = "columns";
lazy_static! {
// Memory tables in `information_schema`.
static ref MEMORY_TABLES: &'static [&'static str] = &[
ENGINES,
COLUMN_PRIVILEGES,
COLUMN_STATISTICS
];
}

macro_rules! setup_memory_table {
($name: expr) => {
paste! {
{
let (schema, columns) = get_schema_columns($name);
Some(Arc::new(MemoryTable::new(
consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
$name,
schema,
columns
)) as _)
}
}
};
}

/// The `information_schema` tables info provider.
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
tables: HashMap<String, TableRef>,
}

impl InformationSchemaProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
let mut provider = Self {
catalog_name,
catalog_manager,
}
tables: HashMap::new(),
};

provider.build_tables();

provider
}

/// Build a map of [TableRef] in information schema.
/// Including `tables` and `columns`.
pub fn build(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> HashMap<String, TableRef> {
let provider = Self::new(catalog_name, catalog_manager);

let mut schema = HashMap::new();
schema.insert(TABLES.to_owned(), provider.table(TABLES).unwrap());
schema.insert(COLUMNS.to_owned(), provider.table(COLUMNS).unwrap());
schema
/// Returns table names in the order of table id.
pub fn table_names(&self) -> Vec<String> {
let mut tables = self.tables.values().clone().collect::<Vec<_>>();

tables.sort_by(|t1, t2| {
t1.table_info()
.table_id()
.partial_cmp(&t2.table_info().table_id())
.unwrap()
});
tables
.into_iter()
.map(|t| t.table_info().name.clone())
.collect()
}

/// Returns a map of [TableRef] in information schema.
pub fn tables(&self) -> &HashMap<String, TableRef> {
assert!(!self.tables.is_empty());

&self.tables
}

/// Returns the [TableRef] by table name.
pub fn table(&self, name: &str) -> Option<TableRef> {
self.tables.get(name).cloned()
}

fn build_tables(&mut self) {
let mut tables = HashMap::new();
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());

// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).unwrap());
}

self.tables = tables;
}

fn build_table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Unsupported;
Expand All @@ -89,6 +151,9 @@ impl InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
ENGINES => setup_memory_table!(ENGINES),
COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
_ => None,
}
}
Expand All @@ -102,9 +167,9 @@ impl InformationSchemaProvider {
.unwrap();
let table_info = TableInfoBuilder::default()
.table_id(table.table_id())
.name(table.table_name().to_owned())
.name(table.table_name().to_string())
.catalog_name(catalog_name)
.schema_name(INFORMATION_SCHEMA_NAME.to_owned())
.schema_name(INFORMATION_SCHEMA_NAME.to_string())
.meta(table_meta)
.table_type(table.table_type())
.build()
Expand Down Expand Up @@ -176,6 +241,7 @@ impl DataSource for InformationTableDataSource {
stream: Box::pin(stream),
output_ordering: None,
};

Ok(Box::pin(stream))
}
}
68 changes: 29 additions & 39 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD,
SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
Expand All @@ -33,8 +33,7 @@ use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;

use super::tables::InformationSchemaTables;
use super::{InformationTable, COLUMNS, TABLES};
use super::{InformationTable, COLUMNS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
Expand Down Expand Up @@ -102,7 +101,7 @@ impl InformationTable for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_columns()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down Expand Up @@ -148,8 +147,8 @@ impl InformationSchemaColumnsBuilder {
}
}

/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
/// Construct the `information_schema.columns` virtual table
async fn make_columns(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
Expand All @@ -163,48 +162,38 @@ impl InformationSchemaColumnsBuilder {
{
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let (keys, schema) = if let Some(table) = catalog_manager
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
(keys.clone(), schema)
} else {
// TODO: this specific branch is only a workaround for FrontendCatalogManager.
if schema_name == INFORMATION_SCHEMA_NAME {
if table_name == COLUMNS {
(vec![], InformationSchemaColumns::schema())
} else if table_name == TABLES {
(vec![], InformationSchemaTables::schema())

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
continue;
}
} else {
continue;
SEMANTIC_TYPE_FIELD
};

self.add_column(
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
);
}
};

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
self.add_column(
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
);
} else {
unreachable!();
}
}
}
Expand Down Expand Up @@ -238,6 +227,7 @@ impl InformationSchemaColumnsBuilder {
Arc::new(self.data_types.finish()),
Arc::new(self.semantic_types.finish()),
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
Expand All @@ -254,7 +244,7 @@ impl DfPartitionStream for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_columns()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down
Loading
Loading