Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pushdown filters for some information_schema tables #3091

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ testing = []
api.workspace = true
arc-swap = "1.0"
arrow-schema.workspace = true
arrow.workspace = true
async-stream.workspace = true
async-trait = "0.1"
build-data = "0.1"
Expand Down
10 changes: 6 additions & 4 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod columns;
mod key_column_usage;
mod memory_table;
mod predicate;
mod schemata;
mod table_names;
mod tables;
Expand All @@ -29,6 +30,7 @@ use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use lazy_static::lazy_static;
use paste::paste;
pub(crate) use predicate::Predicates;
use snafu::ResultExt;
use store_api::data_source::DataSource;
use store_api::storage::{ScanRequest, TableId};
Expand Down Expand Up @@ -159,7 +161,7 @@ impl InformationSchemaProvider {
fn build_table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Unsupported;
let filter_pushdown = FilterPushDownType::Inexact;
let thin_table = ThinTable::new(table_info, filter_pushdown);

let data_source = Arc::new(InformationTableDataSource::new(table));
Expand Down Expand Up @@ -238,7 +240,7 @@ trait InformationTable {

fn schema(&self) -> SchemaRef;

fn to_stream(&self) -> Result<SendableRecordBatchStream>;
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;

fn table_type(&self) -> TableType {
TableType::Temporary
Expand Down Expand Up @@ -272,15 +274,15 @@ impl DataSource for InformationTableDataSource {
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let projection = request.projection;
let projection = request.projection.clone();
let projected_schema = match &projection {
Some(projection) => self.try_project(projection)?,
None => self.table.schema(),
};

let stream = self
.table
.to_stream()
.to_stream(request)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?
Expand Down
28 changes: 23 additions & 5 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};

use super::{InformationTable, COLUMNS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::CatalogManager;

pub(super) struct InformationSchemaColumns {
Expand Down Expand Up @@ -102,14 +104,14 @@ impl InformationTable for InformationSchemaColumns {
self.schema.clone()
}

fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_columns()
.make_columns(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down Expand Up @@ -165,12 +167,13 @@ impl InformationSchemaColumnsBuilder {
}

/// Construct the `information_schema.columns` virtual table
async fn make_columns(&mut self) -> Result<RecordBatch> {
async fn make_columns(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
Expand Down Expand Up @@ -201,6 +204,7 @@ impl InformationSchemaColumnsBuilder {
};

self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table_name,
Expand All @@ -219,6 +223,7 @@ impl InformationSchemaColumnsBuilder {

fn add_column(
&mut self,
predicates: &Predicates,
catalog_name: &str,
schema_name: &str,
table_name: &str,
Expand All @@ -227,6 +232,19 @@ impl InformationSchemaColumnsBuilder {
) {
let data_type = &column_schema.data_type.name();

let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
(COLUMN_NAME, &Value::from(column_schema.name.as_str())),
(DATA_TYPE, &Value::from(data_type.as_str())),
(SEMANTIC_TYPE, &Value::from(semantic_type)),
];

if !predicates.eval(&row) {
return;
}

self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
Expand Down Expand Up @@ -279,7 +297,7 @@ impl DfPartitionStream for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_columns()
.make_columns(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down
87 changes: 48 additions & 39 deletions src/catalog/src/information_schema/key_column_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};

use super::KEY_COLUMN_USAGE;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationTable;
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const CONSTRAINT_SCHEMA: &str = "constraint_schema";
const CONSTRAINT_NAME: &str = "constraint_name";
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const ORDINAL_POSITION: &str = "ordinal_position";

/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage {
schema: SchemaRef,
Expand All @@ -60,24 +69,16 @@ impl InformationSchemaKeyColumnUsage {
false,
),
ColumnSchema::new(
"constraint_schema",
CONSTRAINT_SCHEMA,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"constraint_name",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("column_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ordinal_position",
ConcreteDataType::uint32_datatype(),
false,
),
ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(
"position_in_unique_constraint",
ConcreteDataType::uint32_datatype(),
Expand Down Expand Up @@ -123,14 +124,14 @@ impl InformationTable for InformationSchemaKeyColumnUsage {
self.schema.clone()
}

fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_key_column_usage()
.make_key_column_usage(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down Expand Up @@ -192,14 +193,14 @@ impl InformationSchemaKeyColumnUsageBuilder {
}

/// Construct the `information_schema.KEY_COLUMN_USAGE` virtual table
async fn make_key_column_usage(&mut self) -> Result<RecordBatch> {
async fn make_key_column_usage(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

let mut time_index_constraints = vec![];
let mut primary_constraints = vec![];

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
Expand All @@ -223,11 +224,15 @@ impl InformationSchemaKeyColumnUsageBuilder {

for (idx, column) in schema.column_schemas().iter().enumerate() {
if column.is_time_index() {
time_index_constraints.push((
schema_name.clone(),
table_name.clone(),
column.name.clone(),
));
self.add_key_column_usage(
&predicates,
&schema_name,
"TIME INDEX",
&schema_name,
&table_name,
&column.name,
1, //always 1 for time index
);
}
if keys.contains(&idx) {
primary_constraints.push((
Expand All @@ -244,22 +249,11 @@ impl InformationSchemaKeyColumnUsageBuilder {
}
}

for (i, (schema_name, table_name, column_name)) in
time_index_constraints.into_iter().enumerate()
{
self.add_key_column_usage(
&schema_name,
"TIME INDEX",
&schema_name,
&table_name,
&column_name,
i as u32 + 1,
);
}
for (i, (schema_name, table_name, column_name)) in
primary_constraints.into_iter().enumerate()
{
self.add_key_column_usage(
&predicates,
&schema_name,
"PRIMARY",
&schema_name,
Expand All @@ -274,15 +268,30 @@ impl InformationSchemaKeyColumnUsageBuilder {

// TODO(dimbtp): Foreign key constraint has not `None` value for last 4
// fields, but it is not supported yet.
#[allow(clippy::too_many_arguments)]
fn add_key_column_usage(
&mut self,
predicates: &Predicates,
constraint_schema: &str,
constraint_name: &str,
table_schema: &str,
table_name: &str,
column_name: &str,
ordinal_position: u32,
) {
let row = [
(CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
(CONSTRAINT_NAME, &Value::from(constraint_name)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(COLUMN_NAME, &Value::from(column_name)),
(ORDINAL_POSITION, &Value::from(ordinal_position)),
];

if !predicates.eval(&row) {
return;
}

self.constraint_catalog.push(Some("def"));
self.constraint_schema.push(Some(constraint_schema));
self.constraint_name.push(Some(constraint_name));
Expand Down Expand Up @@ -328,7 +337,7 @@ impl DfPartitionStream for InformationSchemaKeyColumnUsage {
schema,
futures::stream::once(async move {
builder
.make_key_column_usage()
.make_key_column_usage(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down
8 changes: 4 additions & 4 deletions src/catalog/src/information_schema/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};
pub use tables::get_schema_columns;

use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl InformationTable for MemoryTable {
self.schema.clone()
}

fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -169,7 +169,7 @@ mod tests {
assert_eq!("test", table.table_name());
assert_eq!(schema, InformationTable::schema(&table));

let stream = table.to_stream().unwrap();
let stream = table.to_stream(ScanRequest::default()).unwrap();

let batches = RecordBatches::try_collect(stream).await.unwrap();

Expand Down Expand Up @@ -198,7 +198,7 @@ mod tests {
assert_eq!("test", table.table_name());
assert_eq!(schema, InformationTable::schema(&table));

let stream = table.to_stream().unwrap();
let stream = table.to_stream(ScanRequest::default()).unwrap();

let batches = RecordBatches::try_collect(stream).await.unwrap();

Expand Down
Loading
Loading