Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds some tables to information_schema #2935

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ meta-client.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
paste = "1.0"
prometheus.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down
54 changes: 46 additions & 8 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
// limitations under the License.

mod columns;
mod memory_table;
mod table_names;
mod tables;

use std::collections::HashMap;
use std::sync::{Arc, Weak};

use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_catalog::consts::{self, INFORMATION_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use lazy_static::lazy_static;
use paste::paste;
use snafu::ResultExt;
use store_api::data_source::DataSource;
use store_api::storage::{ScanRequest, TableId};
Expand All @@ -32,14 +36,38 @@ use table::metadata::{
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
pub use table_names::*;

use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

pub const TABLES: &str = "tables";
pub const COLUMNS: &str = "columns";
lazy_static! {
// Memory tables in `information_schema`.
static ref MEMORY_TABLES: Vec<&'static str> = vec![
ENGINES,
COLUMN_PRIVILEGES,
COLUMN_STATISTICS
];
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

macro_rules! setup_memory_table {
($name: expr) => {
paste! {
{
let (schema, columns) = get_schema_columns($name);
Some(Arc::new(MemoryTable::new(
consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
$name,
schema,
columns
)) as _)
}
}
};
}

pub struct InformationSchemaProvider {
catalog_name: String,
Expand All @@ -63,8 +91,14 @@ impl InformationSchemaProvider {
let provider = Self::new(catalog_name, catalog_manager);

let mut schema = HashMap::new();
schema.insert(TABLES.to_owned(), provider.table(TABLES).unwrap());
schema.insert(COLUMNS.to_owned(), provider.table(COLUMNS).unwrap());
schema.insert(TABLES.to_string(), provider.table(TABLES).unwrap());
schema.insert(COLUMNS.to_string(), provider.table(COLUMNS).unwrap());

// Add memory tables
for name in MEMORY_TABLES.iter() {
schema.insert((*name).to_string(), provider.table(name).unwrap());
}

schema
}

Expand All @@ -80,7 +114,8 @@ impl InformationSchemaProvider {
}

fn information_table(&self, name: &str) -> Option<InformationTableRef> {
match name.to_ascii_lowercase().as_str() {
let name = name.to_ascii_lowercase();
match name.clone().as_str() {
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
TABLES => Some(Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
Expand All @@ -89,6 +124,9 @@ impl InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
ENGINES => setup_memory_table!(ENGINES),
COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
_ => None,
}
}
Expand All @@ -102,9 +140,9 @@ impl InformationSchemaProvider {
.unwrap();
let table_info = TableInfoBuilder::default()
.table_id(table.table_id())
.name(table.table_name().to_owned())
.name(table.table_name().to_string())
.catalog_name(catalog_name)
.schema_name(INFORMATION_SCHEMA_NAME.to_owned())
.schema_name(INFORMATION_SCHEMA_NAME.to_string())
.meta(table_meta)
.table_type(table.table_type())
.build()
Expand Down
214 changes: 214 additions & 0 deletions src/catalog/src/information_schema/memory_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod tables;
use std::sync::Arc;

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
use store_api::storage::TableId;
pub use tables::get_schema_columns;

use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::information_schema::InformationTable;

/// A memory table with specified schema and columns.
pub(super) struct MemoryTable {
table_id: TableId,
table_name: &'static str,
schema: SchemaRef,
columns: Vec<VectorRef>,
}

impl MemoryTable {
/// Creates a memory table with table id, name, schema and columns.
pub(super) fn new(
table_id: TableId,
table_name: &'static str,
schema: SchemaRef,
columns: Vec<VectorRef>,
) -> Self {
Self {
table_id,
table_name,
schema,
columns,
}
}

fn builder(&self) -> MemoryTableBuilder {
MemoryTableBuilder::new(self.schema.clone(), self.columns.clone())
}
}

impl InformationTable for MemoryTable {
fn table_id(&self) -> TableId {
self.table_id
}

fn table_name(&self) -> &'static str {
self.table_name
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.memory_records()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

struct MemoryTableBuilder {
schema: SchemaRef,
columns: Vec<VectorRef>,
}

impl MemoryTableBuilder {
fn new(schema: SchemaRef, columns: Vec<VectorRef>) -> Self {
Self { schema, columns }
}

/// Construct the `information_schema.{table_name}` virtual table
async fn memory_records(&mut self) -> Result<RecordBatch> {
if self.columns.is_empty() {
RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu)
} else {
RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns))
.context(CreateRecordBatchSnafu)
}
}
}

impl DfPartitionStream for MemoryTable {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.memory_records()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::StringVector;

use super::*;

#[tokio::test]
async fn test_memory_table() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
]));

let table = MemoryTable::new(
42,
"test",
schema.clone(),
vec![
Arc::new(StringVector::from(vec!["a1", "a2"])),
Arc::new(StringVector::from(vec!["b1", "b2"])),
],
);

assert_eq!(42, table.table_id());
assert_eq!("test", table.table_name());
assert_eq!(schema, InformationTable::schema(&table));

let stream = table.to_stream().unwrap();

let batches = RecordBatches::try_collect(stream).await.unwrap();

assert_eq!(
"\
+----+----+
| a | b |
+----+----+
| a1 | b1 |
| a2 | b2 |
+----+----+",
batches.pretty_print().unwrap()
);
}

#[tokio::test]
async fn test_empty_memory_table() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
]));

let table = MemoryTable::new(42, "test", schema.clone(), vec![]);

assert_eq!(42, table.table_id());
assert_eq!("test", table.table_name());
assert_eq!(schema, InformationTable::schema(&table));

let stream = table.to_stream().unwrap();

let batches = RecordBatches::try_collect(stream).await.unwrap();

assert_eq!(
"\
+---+---+
| a | b |
+---+---+
+---+---+",
batches.pretty_print().unwrap()
);
}
}
Loading
Loading