From a65e9268be265a3f975b22bde3333c50528f3eb2 Mon Sep 17 00:00:00 2001 From: Costi Ciudatu Date: Fri, 13 Dec 2024 23:44:02 +0200 Subject: [PATCH] cargo fmt --- datafusion/expr/src/registry.rs | 27 +++++---- .../substrait/src/logical_plan/consumer.rs | 28 ++++++--- .../substrait/src/logical_plan/producer.rs | 59 +++++++++++++------ 3 files changed, 77 insertions(+), 37 deletions(-) diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index ec9198e118c3a..f3ac906712029 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -20,7 +20,9 @@ use crate::expr_rewriter::FunctionRewrite; use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, TableSource, UserDefinedLogicalNode, WindowUDF}; -use datafusion_common::{not_impl_err, plan_datafusion_err, DataFusionError, HashMap, Result}; +use datafusion_common::{ + not_impl_err, plan_datafusion_err, DataFusionError, HashMap, Result, +}; use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; @@ -133,7 +135,9 @@ pub trait SerializerRegistry: Debug + Send + Sync { &self, _node: &dyn UserDefinedLogicalNode, ) -> Result> { - Err(DataFusionError::Plan("UserDefinedLogicalNode serialization not supported".into())) + Err(DataFusionError::Plan( + "UserDefinedLogicalNode serialization not supported".into(), + )) } /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from @@ -143,24 +147,25 @@ pub trait SerializerRegistry: Debug + Send + Sync { _name: &str, _bytes: &[u8], ) -> Result> { - Err(DataFusionError::Plan("UserDefinedLogicalNode deserialization not supported".into())) + Err(DataFusionError::Plan( + "UserDefinedLogicalNode deserialization not supported".into(), + )) } /// Binary representation for custom tables, to be converted to substrait extension tables. /// Should only return success for table implementations that cannot be found by name /// in the destination execution context, such as UDTFs or manually registered table providers. - fn serialize_custom_table( - &self, - _table: &dyn TableSource, - ) -> Result> { - Err(DataFusionError::Plan("Custom table serialization not supported".into())) + fn serialize_custom_table(&self, _table: &dyn TableSource) -> Result> { + Err(DataFusionError::Plan( + "Custom table serialization not supported".into(), + )) } /// Deserialize the custom table with the given name. /// The name may not be useful as a discriminator if multiple UDTF/TableProvider /// implementations are expected. This is particularly true for UDTFs in DataFusion, /// which are always registered under the same name: `tmp_table`, so one should - /// use the binary payload to distinguish between multiple table types. + /// use the binary payload to distinguish between multiple potential table types. /// A potential future improvement would be to return a (name, bytes) tuple from /// [SerializerRegistry::serialize_custom_table] to allow the implementors to assign /// different names to different table provider implementations (e.g. in the case of proto, @@ -172,7 +177,9 @@ pub trait SerializerRegistry: Debug + Send + Sync { _name: &str, _bytes: &[u8], ) -> Result> { - Err(DataFusionError::Plan("Custom table deserialization not supported".into())) + Err(DataFusionError::Plan( + "Custom table deserialization not supported".into(), + )) } } diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 41442bf9f6b1f..963c8d9642671 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -29,7 +29,10 @@ use datafusion::common::{ use datafusion::datasource::provider_as_source; use datafusion::logical_expr::expr::{Exists, InSubquery, Sort}; -use datafusion::logical_expr::{Aggregate, BinaryExpr, Case, EmptyRelation, Expr, ExprSchemable, LogicalPlan, Operator, Projection, SortExpr, TableScan, TryCast, Values}; +use datafusion::logical_expr::{ + Aggregate, BinaryExpr, Case, EmptyRelation, Expr, ExprSchemable, LogicalPlan, + Operator, Projection, SortExpr, TableScan, TryCast, Values, +}; use substrait::proto::aggregate_rel::Grouping; use substrait::proto::expression::subquery::set_predicate::PredicateOp; use substrait::proto::expression_reference::ExprType; @@ -993,23 +996,32 @@ pub async fn from_substrait_rel( } Some(ReadType::ExtensionTable(ext)) => { if let Some(ext_detail) = &ext.detail { - let source = state.serializer_registry() - .deserialize_custom_table(&ext_detail.type_url, &ext_detail.value)?; - let table_name = if let Some((_, name)) = ext_detail.type_url.rsplit_once('/') { + let source = + state.serializer_registry().deserialize_custom_table( + &ext_detail.type_url, + &ext_detail.value, + )?; + let table_name = if let Some((_, name)) = + ext_detail.type_url.rsplit_once('/') + { name } else { &ext_detail.type_url }; - let plan = LogicalPlan::TableScan( - TableScan::try_new(table_name, source, None, vec![], None)? - ); + let plan = LogicalPlan::TableScan(TableScan::try_new( + table_name, + source, + None, + vec![], + None, + )?); let schema = apply_masking(substrait_schema, &read.projection)?; ensure_schema_compatability(plan.schema(), schema.clone())?; apply_projection(plan, schema) } else { substrait_err!("Unexpected empty detail in ExtensionTable") } - }, + } None => { substrait_err!("Unexpected empty read_type") } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 45b79be3db38a..0779956561172 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -213,12 +213,13 @@ pub fn to_substrait_rel( let table = if let Ok(bytes) = state .serializer_registry() - .serialize_custom_table(scan.source.as_ref()) { + .serialize_custom_table(scan.source.as_ref()) + { ReadType::ExtensionTable(ExtensionTable { detail: Some(ProtoAny { type_url: scan.table_name.to_string(), value: bytes.into(), - }) + }), }) } else { ReadType::NamedTable(NamedTable { @@ -2213,7 +2214,11 @@ fn substrait_field_ref(index: usize) -> Result { #[cfg(test)] mod test { use super::*; - use crate::logical_plan::consumer::{from_substrait_extended_expr, from_substrait_literal_without_names, from_substrait_named_struct, from_substrait_plan, from_substrait_type_without_names}; + use crate::logical_plan::consumer::{ + from_substrait_extended_expr, from_substrait_literal_without_names, + from_substrait_named_struct, from_substrait_plan, + from_substrait_type_without_names, + }; use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion::arrow::array::{ GenericListArray, Int64Builder, MapBuilder, StringBuilder, @@ -2221,8 +2226,8 @@ mod test { use datafusion::arrow::datatypes::{Field, Fields, Schema}; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::common::{assert_contains, DFSchema}; - use datafusion::datasource::{DefaultTableSource, TableProvider}; use datafusion::datasource::empty::EmptyTable; + use datafusion::datasource::{DefaultTableSource, TableProvider}; use datafusion::execution::registry::SerializerRegistry; use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::logical_expr::TableSource; @@ -2541,10 +2546,17 @@ mod test { table: Arc, } impl SerializerRegistry for Registry { - fn serialize_custom_table(&self, _table: &dyn TableSource) -> Result> { + fn serialize_custom_table( + &self, + _table: &dyn TableSource, + ) -> Result> { Ok("expected payload".as_bytes().to_vec()) } - fn deserialize_custom_table(&self, _name: &str, _bytes: &[u8]) -> Result> { + fn deserialize_custom_table( + &self, + _name: &str, + _bytes: &[u8], + ) -> Result> { Ok(Arc::new(DefaultTableSource::new(self.table.clone()))) } } @@ -2552,10 +2564,11 @@ mod test { async fn round_trip_logical_plans( local: &SessionContext, remote: &SessionContext, - table: Arc + table: Arc, ) -> Result<(LogicalPlan, LogicalPlan)> { local.register_table("custom_table", table)?; - let initial_plan = local.sql("select id from custom_table") + let initial_plan = local + .sql("select id from custom_table") .await? .logical_plan() .clone(); @@ -2575,18 +2588,17 @@ mod test { Ok((initial_plan, restored)) } - let empty = Arc::new(EmptyTable::new(Arc::new( - Schema::new([ - Arc::new(Field::new("id", DataType::Int32, false)), - Arc::new(Field::new("name", DataType::Utf8, false)), - ]) - ))); + let empty = Arc::new(EmptyTable::new(Arc::new(Schema::new([ + Arc::new(Field::new("id", DataType::Int32, false)), + Arc::new(Field::new("name", DataType::Utf8, false)), + ])))); let first_attempt = round_trip_logical_plans( &SessionContext::new(), &SessionContext::new(), - empty.clone() - ).await; + empty.clone(), + ) + .await; assert_eq!( first_attempt.unwrap_err().to_string(), "Error during planning: No table named 'custom_table'" @@ -2606,7 +2618,8 @@ mod test { assert_eq!( initial_plan.to_string(), - restored.to_string() + restored + .to_string() // substrait will add an explicit projection with the full schema .replace( "TableScan: custom_table projection=[id, name]", @@ -2614,8 +2627,16 @@ mod test { ) ); assert_eq!( - local.execute_logical_plan(initial_plan).await?.collect().await?, - remote.execute_logical_plan(restored).await?.collect().await?, + local + .execute_logical_plan(initial_plan) + .await? + .collect() + .await?, + remote + .execute_logical_plan(restored) + .await? + .collect() + .await?, ); Ok(()) }