diff --git a/datafusion-examples/examples/composed_extension_codec.rs b/datafusion-examples/examples/composed_extension_codec.rs index 5c34eccf26e1..42643badd5b5 100644 --- a/datafusion-examples/examples/composed_extension_codec.rs +++ b/datafusion-examples/examples/composed_extension_codec.rs @@ -71,8 +71,10 @@ async fn main() { // deserialize proto back to execution plan let runtime = ctx.runtime_env(); + let state = ctx.state(); + let config_options = state.config_options(); let result_exec_plan: Arc = proto - .try_into_physical_plan(&ctx, runtime.deref(), &composed_codec) + .try_into_physical_plan(&ctx, config_options, runtime.deref(), &composed_codec) .expect("from proto"); // assert that the original and deserialized execution plans are equal diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 943e5d5e027c..d1c0f3fda594 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -28,6 +28,7 @@ use datafusion::functions_aggregate::first_last::first_value_udaf; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; use datafusion::prelude::*; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; @@ -169,7 +170,8 @@ fn simplify_demo() -> Result<()> { // expressions, such as the current time (to evaluate `now()` // correctly) let props = ExecutionProps::new(); - let context = SimplifyContext::new(&props).with_schema(schema); + let config_options = ConfigOptions::default(); + let context = SimplifyContext::new(&props, &config_options).with_schema(schema); let simplifier = ExprSimplifier::new(context); // And then call the simplify_expr function: @@ -184,7 +186,8 @@ fn simplify_demo() -> Result<()> { // here are some other examples of what DataFusion is capable of let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?; - let context = SimplifyContext::new(&props).with_schema(schema.clone()); + let context = + SimplifyContext::new(&props, &config_options).with_schema(schema.clone()); let simplifier = ExprSimplifier::new(context); // basic arithmetic simplification @@ -356,8 +359,13 @@ fn type_coercion_demo() -> Result<()> { // Evaluation with an expression that has not been type coerced cannot succeed. let props = ExecutionProps::default(); - let physical_expr = - datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?; + let config_options = ConfigOptions::default(); + let physical_expr = datafusion_physical_expr::create_physical_expr( + &expr, + &df_schema, + &props, + &config_options, + )?; let e = physical_expr.evaluate(&batch).unwrap_err(); assert!(e .find_root() @@ -370,13 +378,15 @@ fn type_coercion_demo() -> Result<()> { assert!(physical_expr.evaluate(&batch).is_ok()); // 2. Type coercion with `ExprSimplifier::coerce`. - let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone())); + let context = SimplifyContext::new(&props, &config_options) + .with_schema(Arc::new(df_schema.clone())); let simplifier = ExprSimplifier::new(context); let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?; let physical_expr = datafusion_physical_expr::create_physical_expr( &coerced_expr, &df_schema, &props, + &config_options, )?; assert!(physical_expr.evaluate(&batch).is_ok()); @@ -389,6 +399,7 @@ fn type_coercion_demo() -> Result<()> { &coerced_expr, &df_schema, &props, + &config_options, )?; assert!(physical_expr.evaluate(&batch).is_ok()); @@ -417,6 +428,7 @@ fn type_coercion_demo() -> Result<()> { &coerced_expr, &df_schema, &props, + &config_options, )?; assert!(physical_expr.evaluate(&batch).is_ok()); diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs index c090cd2bcca9..50692e6a279f 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/pruning.rs @@ -22,6 +22,7 @@ use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::prelude::*; +use datafusion_common::config::ConfigOptions; use std::collections::HashSet; use std::sync::Arc; @@ -187,7 +188,9 @@ impl PruningStatistics for MyCatalog { fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate { let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap(); let props = ExecutionProps::new(); - let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); + let config_options = ConfigOptions::default(); + let physical_expr = + create_physical_expr(&expr, &df_schema, &props, &config_options).unwrap(); PruningPredicate::try_new(physical_expr, schema.clone()).unwrap() } diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 7cf1ce87690e..8dacb76975bc 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -28,6 +28,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_catalog::TableFunctionImpl; +use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{Expr, TableType}; @@ -48,7 +49,9 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); // register the table function that will be called in SQL statements by `read_csv` - ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {})); + let state = ctx.state(); + let config_options = Arc::new(state.config_options().clone()); + ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc { config_options })); let testdata = datafusion::test_util::arrow_test_data(); let csv_file = format!("{testdata}/csv/aggregate_test_100.csv"); @@ -129,7 +132,9 @@ impl TableProvider for LocalCsvTable { } #[derive(Debug)] -struct LocalCsvTableFunc {} +struct LocalCsvTableFunc { + config_options: Arc, +} impl TableFunctionImpl for LocalCsvTableFunc { fn call(&self, exprs: &[Expr]) -> Result> { @@ -142,7 +147,8 @@ impl TableFunctionImpl for LocalCsvTableFunc { .map(|expr| { // try to simplify the expression, so 1+2 becomes 3, for example let execution_props = ExecutionProps::new(); - let info = SimplifyContext::new(&execution_props); + let config_options = Arc::unwrap_or_clone(self.config_options.clone()); + let info = SimplifyContext::new(&execution_props, &config_options); let expr = ExprSimplifier::new(info).simplify(expr.clone())?; if let Expr::Literal(ScalarValue::Int64(Some(limit))) = expr { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 942aa308e200..2e81f9763f1d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -679,7 +679,7 @@ config_namespace! { } /// A key value pair, with a corresponding description -#[derive(Debug)] +#[derive(Debug, Hash, PartialEq, Eq)] pub struct ConfigEntry { /// A unique string to identify this config value pub key: String, diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 228b9a4e9f6b..9d897363965a 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -241,6 +241,7 @@ async fn prune_partitions( partitions: Vec, filters: &[Expr], partition_cols: &[(String, DataType)], + ctx: &SessionState, ) -> Result> { if filters.is_empty() { return Ok(partitions); @@ -286,13 +287,12 @@ async fn prune_partitions( )?; let batch = RecordBatch::try_new(schema, arrays)?; - - // TODO: Plumb this down let props = ExecutionProps::new(); + let config_options = ctx.config_options(); // Applies `filter` to `batch` returning `None` on error let do_filter = |filter| -> Result { - let expr = create_physical_expr(filter, &df_schema, &props)?; + let expr = create_physical_expr(filter, &df_schema, &props, config_options)?; expr.evaluate(&batch)?.into_array(partitions.len()) }; @@ -436,7 +436,7 @@ pub async fn pruned_partition_list<'a>( debug!("Listed {} partitions", partitions.len()); let pruned = - prune_partitions(table_path, partitions, filters, partition_cols).await?; + prune_partitions(table_path, partitions, filters, partition_cols, ctx).await?; debug!("Pruning yielded {} partitions", pruned.len()); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 791b15704d09..85b2b2bb03d5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -894,6 +894,7 @@ impl TableProvider for ListingTable { &expr, &table_df_schema, state.execution_props(), + session_state.config_options(), )?; Some(filters) } diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 6fb536ca2f05..e4bc48a7b7ad 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -18,7 +18,6 @@ use crate::execution::context::SessionState; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_optimizer::OptimizerConfig; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; @@ -234,7 +233,7 @@ impl ListingTableUrl { store: &'a dyn ObjectStore, file_extension: &'a str, ) -> Result>> { - let exec_options = &ctx.options().execution; + let exec_options = &ctx.config_options().execution; let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; // If the prefix is a file, use a head request, otherwise list let list = match self.is_collection() { diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index c1e0bea0b3ff..33e6560fdb4f 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -238,6 +238,7 @@ impl TableProvider for MemTable { sort_exprs, &df_schema, state.execution_props(), + state.config_options(), ) }) .collect::>>()?; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index a5f2bd1760b3..abeaae6cafd2 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -617,10 +617,10 @@ fn create_output_array( #[cfg(test)] mod tests { - use arrow_array::Int32Array; - use super::*; use crate::{test::columns, test_util::aggr_test_schema}; + use arrow_array::Int32Array; + use datafusion_common::config::ConfigOptions; #[test] fn physical_plan_config_no_projection() { @@ -1121,6 +1121,7 @@ mod tests { &expr, &DFSchema::try_from(table_schema.as_ref().clone())?, &ExecutionProps::default(), + &ConfigOptions::default(), ) }) .collect::>>()?, diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index c5874deb6ed5..a717e88a4395 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -648,7 +648,7 @@ impl SessionState { // analyze & capture output of each rule let analyzer_result = self.analyzer.execute_and_check( e.plan.as_ref().clone(), - self.options(), + self.config_options(), |analyzed_plan, analyzer| { let analyzer_name = analyzer.name().to_string(); let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name }; @@ -708,7 +708,7 @@ impl SessionState { } else { let analyzed_plan = self.analyzer.execute_and_check( plan.clone(), - self.options(), + self.config_options(), |_, _| {}, )?; self.optimizer.optimize(analyzed_plan, self, |_, _| {}) @@ -760,13 +760,19 @@ impl SessionState { let mut expr = simplifier.coerce(expr, df_schema)?; // rewrite Exprs to functions if necessary - let config_options = self.config_options(); for rewrite in self.analyzer.function_rewrites() { expr = expr - .transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))? + .transform_up(|expr| { + rewrite.rewrite(expr, df_schema, self.config_options()) + })? .data; } - create_physical_expr(&expr, df_schema, self.execution_props()) + create_physical_expr( + &expr, + df_schema, + self.execution_props(), + self.config_options(), + ) } /// Return the session ID @@ -1968,6 +1974,10 @@ impl SimplifyInfo for SessionSimplifyProvider<'_> { self.state.execution_props() } + fn config_options(&self) -> &ConfigOptions { + self.state.config_options() + } + fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result { expr.get_type(self.df_schema) } diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index d2d35c3877c1..5802efdb0cfc 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1421,6 +1421,7 @@ mod tests { )), ], DataType::Int32, + Arc::new(ConfigOptions::default()), )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1486,6 +1487,7 @@ mod tests { )), ], DataType::Int32, + Arc::new(ConfigOptions::default()), )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 3))), @@ -1554,6 +1556,7 @@ mod tests { )), ], DataType::Int32, + Arc::new(ConfigOptions::default()), )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1619,6 +1622,7 @@ mod tests { )), ], DataType::Int32, + Arc::new(ConfigOptions::default()), )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d_new", 3))), diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d..d854cf41af96 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -88,6 +88,7 @@ use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; use async_trait::async_trait; +use datafusion_common::config::ConfigOptions; use datafusion_physical_optimizer::PhysicalOptimizerRule; use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; @@ -198,7 +199,12 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { input_dfschema: &DFSchema, session_state: &SessionState, ) -> Result> { - create_physical_expr(expr, input_dfschema, session_state.execution_props()) + create_physical_expr( + expr, + input_dfschema, + session_state.execution_props(), + session_state.config_options(), + ) } } @@ -620,6 +626,7 @@ impl DefaultPhysicalPlanner { e, logical_schema, session_state.execution_props(), + session_state.config_options(), ) }) .collect::>>()?; @@ -711,6 +718,7 @@ impl DefaultPhysicalPlanner { logical_input_schema, &physical_input_schema, session_state.execution_props(), + session_state.config_options(), ) }) .collect::>>()?; @@ -824,6 +832,7 @@ impl DefaultPhysicalPlanner { expr, input_dfschema, session_state.execution_props(), + session_state.config_options(), )?; let new_sort = SortExec::new(sort_expr, physical_input).with_fetch(*fetch); @@ -1003,12 +1012,22 @@ impl DefaultPhysicalPlanner { let left_df_schema = left.schema(); let right_df_schema = right.schema(); let execution_props = session_state.execution_props(); + let config_options = session_state.config_options(); let join_on = keys .iter() .map(|(l, r)| { - let l = create_physical_expr(l, left_df_schema, execution_props)?; - let r = - create_physical_expr(r, right_df_schema, execution_props)?; + let l = create_physical_expr( + l, + left_df_schema, + execution_props, + config_options, + )?; + let r = create_physical_expr( + r, + right_df_schema, + execution_props, + config_options, + )?; Ok((l, r)) }) .collect::>()?; @@ -1080,6 +1099,7 @@ impl DefaultPhysicalPlanner { expr, &filter_df_schema, session_state.execution_props(), + config_options, )?; let column_indices = join_utils::JoinFilter::build_column_indices( left_field_indices, @@ -1464,8 +1484,12 @@ fn get_null_physical_expr_pair( input_schema: &Schema, session_state: &SessionState, ) -> Result<(Arc, String)> { - let physical_expr = - create_physical_expr(expr, input_dfschema, session_state.execution_props())?; + let physical_expr = create_physical_expr( + expr, + input_dfschema, + session_state.execution_props(), + session_state.config_options(), + )?; let physical_name = physical_name(&expr.clone())?; let data_type = physical_expr.data_type(input_schema)?; @@ -1480,8 +1504,12 @@ fn get_physical_expr_pair( input_dfschema: &DFSchema, session_state: &SessionState, ) -> Result<(Arc, String)> { - let physical_expr = - create_physical_expr(expr, input_dfschema, session_state.execution_props())?; + let physical_expr = create_physical_expr( + expr, + input_dfschema, + session_state.execution_props(), + session_state.config_options(), + )?; let physical_name = physical_name(expr)?; Ok((physical_expr, physical_name)) } @@ -1512,6 +1540,7 @@ pub fn create_window_expr_with_name( name: impl Into, logical_schema: &DFSchema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result> { let name = name.into(); let physical_schema: &Schema = &logical_schema.into(); @@ -1524,12 +1553,24 @@ pub fn create_window_expr_with_name( window_frame, null_treatment, }) => { - let physical_args = - create_physical_exprs(args, logical_schema, execution_props)?; - let partition_by = - create_physical_exprs(partition_by, logical_schema, execution_props)?; - let order_by = - create_physical_sort_exprs(order_by, logical_schema, execution_props)?; + let physical_args = create_physical_exprs( + args, + logical_schema, + execution_props, + config_options, + )?; + let partition_by = create_physical_exprs( + partition_by, + logical_schema, + execution_props, + config_options, + )?; + let order_by = create_physical_sort_exprs( + order_by, + logical_schema, + execution_props, + config_options, + )?; if !is_window_frame_bound_valid(window_frame) { return plan_err!( @@ -1561,13 +1602,14 @@ pub fn create_window_expr( e: &Expr, logical_schema: &DFSchema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result> { // unpack aliased logical expressions, e.g. "sum(col) over () as total" let (name, e) = match e { Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()), _ => (e.schema_name().to_string(), e), }; - create_window_expr_with_name(e, name, logical_schema, execution_props) + create_window_expr_with_name(e, name, logical_schema, execution_props, config_options) } type AggregateExprWithOptionalArgs = ( @@ -1585,6 +1627,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( logical_input_schema: &DFSchema, physical_input_schema: &Schema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result { match e { Expr::AggregateFunction(AggregateFunction { @@ -1601,13 +1644,18 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( physical_name(e)? }; - let physical_args = - create_physical_exprs(args, logical_input_schema, execution_props)?; + let physical_args = create_physical_exprs( + args, + logical_input_schema, + execution_props, + config_options, + )?; let filter = match filter { Some(e) => Some(create_physical_expr( e, logical_input_schema, execution_props, + config_options, )?), None => None, }; @@ -1621,6 +1669,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( exprs, logical_input_schema, execution_props, + config_options, )?), None => None, }; @@ -1653,6 +1702,7 @@ pub fn create_aggregate_expr_and_maybe_filter( logical_input_schema: &DFSchema, physical_input_schema: &Schema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result { // unpack (nested) aliased logical expressions, e.g. "sum(col) as total" let (name, e) = match e { @@ -1667,6 +1717,7 @@ pub fn create_aggregate_expr_and_maybe_filter( logical_input_schema, physical_input_schema, execution_props, + config_options, ) } @@ -1675,6 +1726,7 @@ pub fn create_physical_sort_expr( e: &SortExpr, input_dfschema: &DFSchema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result { let SortExpr { expr, @@ -1682,7 +1734,12 @@ pub fn create_physical_sort_expr( nulls_first, } = e; Ok(PhysicalSortExpr { - expr: create_physical_expr(expr, input_dfschema, execution_props)?, + expr: create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?, options: SortOptions { descending: !asc, nulls_first: *nulls_first, @@ -1695,10 +1752,18 @@ pub fn create_physical_sort_exprs( exprs: &[SortExpr], input_dfschema: &DFSchema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result { exprs .iter() - .map(|expr| create_physical_sort_expr(expr, input_dfschema, execution_props)) + .map(|expr| { + create_physical_sort_expr( + expr, + input_dfschema, + execution_props, + config_options, + ) + }) .collect::>() } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 685ed14777b4..2eb2b221c3c4 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -169,13 +169,19 @@ impl TestParquetFile { // run coercion on the filters to coerce types etc. let props = ExecutionProps::new(); - let context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema)); + let config_options = ConfigOptions::default(); + let context = SimplifyContext::new(&props, &config_options) + .with_schema(Arc::clone(&df_schema)); let parquet_options = ctx.copied_table_options().parquet; if let Some(filter) = maybe_filter { let simplifier = ExprSimplifier::new(context); let filter = simplifier.coerce(filter, &df_schema).unwrap(); - let physical_filter_expr = - create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; + let physical_filter_expr = create_physical_expr( + &filter, + &df_schema, + &ExecutionProps::default(), + &ConfigOptions::default(), + )?; let parquet_exec = ParquetExecBuilder::new_with_options(scan_config, parquet_options) diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index 1e6ff8088d0a..3677257ffc14 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -23,6 +23,7 @@ use arrow_buffer::IntervalDayTime; use chrono::{DateTime, TimeZone, Utc}; use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; use datafusion_common::cast::as_int32_array; +use datafusion_common::config::ConfigOptions; use datafusion_common::ScalarValue; use datafusion_common::{DFSchemaRef, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; @@ -50,6 +51,9 @@ struct MyInfo { /// Execution specific details needed for constant evaluation such /// as the current time for `now()` and [VariableProviders] execution_props: ExecutionProps, + + /// config options needed for scalar function evaluation + config_options: ConfigOptions, } impl SimplifyInfo for MyInfo { @@ -68,6 +72,10 @@ impl SimplifyInfo for MyInfo { &self.execution_props } + fn config_options(&self) -> &ConfigOptions { + &self.config_options + } + fn get_data_type(&self, expr: &Expr) -> Result { expr.get_type(self.schema.as_ref()) } @@ -78,6 +86,7 @@ impl From for MyInfo { Self { schema, execution_props: ExecutionProps::new(), + config_options: ConfigOptions::default(), } } } @@ -132,10 +141,11 @@ fn test_evaluate_with_start_time( ) { let execution_props = ExecutionProps::new().with_query_execution_start_time(*date_time); - + let config_options = ConfigOptions::default(); let info: MyInfo = MyInfo { schema: schema(), execution_props, + config_options, }; let simplifier = ExprSimplifier::new(info); let simplified_expr = simplifier @@ -498,6 +508,7 @@ fn test_simplify(input_expr: Expr, expected_expr: Expr) { let info: MyInfo = MyInfo { schema: expr_test_schema(), execution_props: ExecutionProps::new(), + config_options: ConfigOptions::default(), }; let simplifier = ExprSimplifier::new(info); let simplified_expr = simplifier @@ -517,6 +528,7 @@ fn test_simplify_with_cycle_count( let info: MyInfo = MyInfo { schema: expr_test_schema(), execution_props: ExecutionProps::new(), + config_options: ConfigOptions::default(), }; let simplifier = ExprSimplifier::new(info); let (simplified_expr, count) = simplifier diff --git a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs index 525baadd14a5..a9d286a8128a 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs @@ -21,6 +21,7 @@ use crate::fuzz_cases::equivalence::utils::{ is_table_same_after_sort, TestScalarUDF, }; use arrow_schema::SortOptions; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{Operator, ScalarUDF}; use datafusion_physical_expr::expressions::{col, BinaryExpr}; @@ -110,6 +111,7 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; let a_plus_b = Arc::new(BinaryExpr::new( col("a", &test_schema)?, diff --git a/datafusion/core/tests/fuzz_cases/equivalence/projection.rs b/datafusion/core/tests/fuzz_cases/equivalence/projection.rs index 3df3e0348e42..343fa5d60650 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/projection.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/projection.rs @@ -20,6 +20,7 @@ use crate::fuzz_cases::equivalence::utils::{ is_table_same_after_sort, TestScalarUDF, }; use arrow_schema::SortOptions; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{Operator, ScalarUDF}; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -49,6 +50,7 @@ fn project_orderings_random() -> Result<()> { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; // a + b let a_plus_b = Arc::new(BinaryExpr::new( @@ -127,6 +129,7 @@ fn ordering_satisfy_after_projection_random() -> Result<()> { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; // a + b let a_plus_b = Arc::new(BinaryExpr::new( diff --git a/datafusion/core/tests/fuzz_cases/equivalence/properties.rs b/datafusion/core/tests/fuzz_cases/equivalence/properties.rs index 82586bd79eda..c1197eac3d9c 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/properties.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/properties.rs @@ -19,6 +19,7 @@ use crate::fuzz_cases::equivalence::utils::{ create_random_schema, generate_table_for_eq_properties, is_table_same_after_sort, TestScalarUDF, }; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{Operator, ScalarUDF}; use datafusion_physical_expr::expressions::{col, BinaryExpr}; @@ -47,6 +48,7 @@ fn test_find_longest_permutation_random() -> Result<()> { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; let a_plus_b = Arc::new(BinaryExpr::new( col("a", &test_schema)?, diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 65bfd0340125..fd43f58b781c 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -28,7 +28,6 @@ use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, ToDFSchema}; -use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; @@ -68,8 +67,11 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { }; let df_schema = schema.clone().to_dfschema().unwrap(); - let execution_props = ExecutionProps::new(); - let predicate = create_physical_expr(&filter, &df_schema, &execution_props).unwrap(); + let execution_props = state.execution_props(); + let config_options = state.config_options(); + let predicate = + create_physical_expr(&filter, &df_schema, execution_props, config_options) + .unwrap(); ParquetExec::builder( FileScanConfig::new(object_store_url, schema).with_file(partitioned_file), diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index 467ce8bf53e2..f26f6a160a99 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -17,11 +17,11 @@ //! Structs and traits to provide the information needed for expression simplification. +use crate::{execution_props::ExecutionProps, Expr, ExprSchemable}; use arrow::datatypes::DataType; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchemaRef, DataFusionError, Result}; -use crate::{execution_props::ExecutionProps, Expr, ExprSchemable}; - /// Provides the information necessary to apply algebraic simplification to an /// [Expr]. See [SimplifyContext] for one concrete implementation. /// @@ -38,6 +38,9 @@ pub trait SimplifyInfo { /// Returns details needed for partial expression evaluation fn execution_props(&self) -> &ExecutionProps; + /// Returns the config options + fn config_options(&self) -> &ConfigOptions; + /// Returns data type of this expr needed for determining optimized int type of a value fn get_data_type(&self, expr: &Expr) -> Result; } @@ -53,14 +56,16 @@ pub trait SimplifyInfo { pub struct SimplifyContext<'a> { schema: Option, props: &'a ExecutionProps, + config_options: &'a ConfigOptions, } impl<'a> SimplifyContext<'a> { /// Create a new SimplifyContext - pub fn new(props: &'a ExecutionProps) -> Self { + pub fn new(props: &'a ExecutionProps, config_options: &'a ConfigOptions) -> Self { Self { schema: None, props, + config_options, } } @@ -106,6 +111,10 @@ impl SimplifyInfo for SimplifyContext<'_> { fn execution_props(&self) -> &ExecutionProps { self.props } + + fn config_options(&self) -> &ConfigOptions { + self.config_options + } } /// Was the expression simplified? diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 51c42b5c4c30..d079e0c4a177 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -24,6 +24,7 @@ use crate::{ ColumnarValue, Documentation, Expr, ScalarFunctionImplementation, Signature, }; use arrow::datatypes::DataType; +use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, ExprSchema, Result}; use datafusion_expr_common::interval_arithmetic::Interval; use std::any::Any; @@ -340,6 +341,8 @@ pub struct ScalarFunctionArgs<'a> { /// The return type of the scalar function returned (from `return_type` or `return_type_from_exprs`) /// when creating the physical expression from the logical expression pub return_type: &'a DataType, + // The config options which can be used to lookup configuration properties + pub config_options: Arc, } /// Trait for implementing user defined scalar functions. diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 3c7bc886aede..1b76bd2d28fa 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -208,6 +208,7 @@ impl TryFrom for PlanProperties { Some(parse_physical_sort_exprs( &proto_output_ordering.physical_sort_expr_nodes, &default_ctx, + default_ctx.state().config_options(), &schema, &codex, )?) @@ -224,6 +225,7 @@ impl TryFrom for PlanProperties { parse_protobuf_partitioning( Some(&proto_output_partitioning), &default_ctx, + default_ctx.state().config_options(), &schema, &codex, )? diff --git a/datafusion/functions/benches/initcap.rs b/datafusion/functions/benches/initcap.rs index 97c76831b33c..e0616af3187c 100644 --- a/datafusion/functions/benches/initcap.rs +++ b/datafusion/functions/benches/initcap.rs @@ -23,6 +23,7 @@ use arrow::util::bench_util::{ create_string_array_with_len, create_string_view_array_with_len, }; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion_functions::unicode; use std::sync::Arc; @@ -57,6 +58,7 @@ fn criterion_benchmark(c: &mut Criterion) { args: args.clone(), number_rows: size, return_type: &DataType::Utf8View, + config_options: Arc::new(ConfigOptions::default()), })) }) }, @@ -71,6 +73,7 @@ fn criterion_benchmark(c: &mut Criterion) { args: args.clone(), number_rows: size, return_type: &DataType::Utf8View, + config_options: Arc::new(ConfigOptions::default()), })) }) }, @@ -83,6 +86,7 @@ fn criterion_benchmark(c: &mut Criterion) { args: args.clone(), number_rows: size, return_type: &DataType::Utf8, + config_options: Arc::new(ConfigOptions::default()), })) }) }); diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index 9f95b780ea4f..2c3774902414 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -433,6 +433,7 @@ mod tests { use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::{DataType, TimeUnit}; use chrono::NaiveDateTime; + use datafusion_common::config::ConfigOptions; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; @@ -565,6 +566,7 @@ mod tests { args: vec![ColumnarValue::Scalar(input)], number_rows: 1, return_type: &expected.data_type(), + config_options: Arc::new(ConfigOptions::default()), }) .unwrap(); match res { diff --git a/datafusion/functions/src/math/log.rs b/datafusion/functions/src/math/log.rs index d4bb8ec13b0b..46f75af15c9b 100644 --- a/datafusion/functions/src/math/log.rs +++ b/datafusion/functions/src/math/log.rs @@ -267,6 +267,7 @@ mod tests { use arrow::array::{Float32Array, Float64Array, Int64Array}; use arrow::compute::SortOptions; use datafusion_common::cast::{as_float32_array, as_float64_array}; + use datafusion_common::config::ConfigOptions; use datafusion_common::DFSchema; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; @@ -513,9 +514,10 @@ mod tests { // Test log() simplification errors fn test_log_simplify_errors() { let props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new()).unwrap()); - let context = SimplifyContext::new(&props).with_schema(schema); + let context = SimplifyContext::new(&props, &config_options).with_schema(schema); // Expect 0 args to error let _ = LogFunc::new().simplify(vec![], &context).unwrap_err(); // Expect 3 args to error @@ -528,9 +530,10 @@ mod tests { // Test that non-simplifiable log() expressions are unchanged after simplification fn test_log_simplify_original() { let props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new()).unwrap()); - let context = SimplifyContext::new(&props).with_schema(schema); + let context = SimplifyContext::new(&props, &config_options).with_schema(schema); // One argument with no simplifications let result = LogFunc::new().simplify(vec![lit(2)], &context).unwrap(); let ExprSimplifyResult::Original(args) = result else { diff --git a/datafusion/functions/src/utils.rs b/datafusion/functions/src/utils.rs index 39d8aeeda460..ddd3380bcf26 100644 --- a/datafusion/functions/src/utils.rs +++ b/datafusion/functions/src/utils.rs @@ -142,6 +142,7 @@ pub mod test { }) .unwrap_or(1); let return_type = func.return_type(&type_array); + let cfg_options = std::sync::Arc::new(datafusion_common::config::ConfigOptions::default()); match expected { Ok(expected) => { @@ -149,7 +150,7 @@ pub mod test { let return_type = return_type.unwrap(); assert_eq!(return_type, $EXPECTED_DATA_TYPE); - let result = func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{args: $ARGS, number_rows: cardinality, return_type: &return_type}); + let result = func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{args: $ARGS, number_rows: cardinality, return_type: &return_type, config_options: cfg_options}); assert_eq!(result.is_ok(), true, "function returned an error: {}", result.unwrap_err()); let result = result.unwrap().to_array(cardinality).expect("Failed to convert to array"); @@ -171,7 +172,7 @@ pub mod test { } else { // invoke is expected error - cannot use .expect_err() due to Debug not being implemented - match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{args: $ARGS, number_rows: cardinality, return_type: &return_type.unwrap()}) { + match func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{args: $ARGS, number_rows: cardinality, return_type: &return_type.unwrap(), config_options: cfg_options,}) { Ok(_) => assert!(false, "expected error"), Err(error) => { assert!(expected_error.strip_backtrace().starts_with(&error.strip_backtrace())); diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index ee6ea08b43bf..623ffd02df9d 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use crate::simplify_expressions::ExprSimplifier; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; @@ -44,7 +45,7 @@ use datafusion_physical_expr::execution_props::ExecutionProps; /// 'Aggregate' of the subquery if they are missing, so that they can be /// evaluated by the parent operator as the join condition. #[derive(Debug)] -pub struct PullUpCorrelatedExpr { +pub struct PullUpCorrelatedExpr<'a> { pub join_filters: Vec, /// mapping from the plan to its holding correlated columns pub correlated_subquery_cols_map: HashMap>, @@ -64,16 +65,12 @@ pub struct PullUpCorrelatedExpr { pub collected_count_expr_map: HashMap, /// pull up having expr, which must be evaluated after the Join pub pull_up_having_expr: Option, + /// config options + pub config_options: &'a ConfigOptions, } -impl Default for PullUpCorrelatedExpr { - fn default() -> Self { - Self::new() - } -} - -impl PullUpCorrelatedExpr { - pub fn new() -> Self { +impl<'a> PullUpCorrelatedExpr<'a> { + pub fn new(config_options: &'a ConfigOptions) -> Self { Self { join_filters: vec![], correlated_subquery_cols_map: HashMap::new(), @@ -84,6 +81,7 @@ impl PullUpCorrelatedExpr { need_handle_count_bug: false, collected_count_expr_map: HashMap::new(), pull_up_having_expr: None, + config_options, } } @@ -106,6 +104,12 @@ impl PullUpCorrelatedExpr { self.exists_sub_query = exists_sub_query; self } + + /// Set the config options + pub fn with_config_options(mut self, config_options: &'a ConfigOptions) -> Self { + self.config_options = config_options; + self + } } /// Used to indicate the unmatched rows from the inner(subquery) table after the left out Join @@ -119,7 +123,7 @@ pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true"; /// 'ScalarValue(2)') pub type ExprResultMap = HashMap; -impl TreeNodeRewriter for PullUpCorrelatedExpr { +impl TreeNodeRewriter for PullUpCorrelatedExpr<'_> { type Node = LogicalPlan; fn f_down(&mut self, plan: LogicalPlan) -> Result> { @@ -189,6 +193,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { Arc::clone(plan_filter.input.schema()), expr_result_map, &mut expr_result_map_for_count_bug, + self.config_options, )? } else { None @@ -247,6 +252,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { projection.input.schema(), expr_result_map, &mut expr_result_map_for_count_bug, + self.config_options, )?; if !expr_result_map_for_count_bug.is_empty() { // has count bug @@ -298,6 +304,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { &aggregate.aggr_expr, aggregate.input.schema(), &mut expr_result_map_for_count_bug, + self.config_options, )?; if !expr_result_map_for_count_bug.is_empty() { // has count bug @@ -368,7 +375,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { } } -impl PullUpCorrelatedExpr { +impl PullUpCorrelatedExpr<'_> { fn collect_missing_exprs( &self, exprs: &[Expr], @@ -470,6 +477,7 @@ fn agg_exprs_evaluation_result_on_empty_batch( agg_expr: &[Expr], schema: &DFSchemaRef, expr_result_map_for_count_bug: &mut ExprResultMap, + config_options: &ConfigOptions, ) -> Result<()> { for e in agg_expr.iter() { let result_expr = e @@ -491,7 +499,8 @@ fn agg_exprs_evaluation_result_on_empty_batch( let result_expr = result_expr.unalias(); let props = ExecutionProps::new(); - let info = SimplifyContext::new(&props).with_schema(Arc::clone(schema)); + let info = + SimplifyContext::new(&props, config_options).with_schema(Arc::clone(schema)); let simplifier = ExprSimplifier::new(info); let result_expr = simplifier.simplify(result_expr)?; if matches!(result_expr, Expr::Literal(ScalarValue::Int64(_))) { @@ -507,6 +516,7 @@ fn proj_exprs_evaluation_result_on_empty_batch( schema: &DFSchemaRef, input_expr_result_map_for_count_bug: &ExprResultMap, expr_result_map_for_count_bug: &mut ExprResultMap, + config_options: &ConfigOptions, ) -> Result<()> { for expr in proj_expr.iter() { let result_expr = expr @@ -528,7 +538,8 @@ fn proj_exprs_evaluation_result_on_empty_batch( if result_expr.ne(expr) { let props = ExecutionProps::new(); - let info = SimplifyContext::new(&props).with_schema(Arc::clone(schema)); + let info = SimplifyContext::new(&props, config_options) + .with_schema(Arc::clone(schema)); let simplifier = ExprSimplifier::new(info); let result_expr = simplifier.simplify(result_expr)?; let expr_name = match expr { @@ -547,6 +558,7 @@ fn filter_exprs_evaluation_result_on_empty_batch( schema: DFSchemaRef, input_expr_result_map_for_count_bug: &ExprResultMap, expr_result_map_for_count_bug: &mut ExprResultMap, + config_options: &ConfigOptions, ) -> Result> { let result_expr = filter_expr .clone() @@ -565,7 +577,7 @@ fn filter_exprs_evaluation_result_on_empty_batch( let pull_up_expr = if result_expr.ne(filter_expr) { let props = ExecutionProps::new(); - let info = SimplifyContext::new(&props).with_schema(schema); + let info = SimplifyContext::new(&props, config_options).with_schema(schema); let simplifier = ExprSimplifier::new(info); let result_expr = simplifier.simplify(result_expr)?; match &result_expr { diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index a87688c1a317..bb2725e319e9 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -37,6 +37,7 @@ use datafusion_expr::{ LogicalPlan, LogicalPlanBuilder, Operator, }; +use datafusion_common::config::ConfigOptions; use log::debug; /// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins @@ -65,7 +66,6 @@ impl OptimizerRule for DecorrelatePredicateSubquery { subquery.transform_down(|p| self.rewrite(p, config)) })? .data; - let LogicalPlan::Filter(filter) = plan else { return Ok(Transformed::no(plan)); }; @@ -91,8 +91,12 @@ impl OptimizerRule for DecorrelatePredicateSubquery { match extract_subquery_info(subquery_expr) { // The subquery expression is at the top level of the filter SubqueryPredicate::Top(subquery) => { - match build_join_top(&subquery, &cur_input, config.alias_generator())? - { + match build_join_top( + &subquery, + &cur_input, + config.alias_generator(), + config.options(), + )? { Some(plan) => cur_input = plan, // If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter None => other_exprs.push(subquery.expr()), @@ -136,7 +140,14 @@ fn rewrite_inner_subqueries( Expr::Exists(Exists { subquery: Subquery { subquery, .. }, negated, - }) => match mark_join(&cur_input, Arc::clone(&subquery), None, negated, alias)? { + }) => match mark_join( + &cur_input, + Arc::clone(&subquery), + None, + negated, + alias, + config.options(), + )? { Some((plan, exists_expr)) => { cur_input = plan; Ok(Transformed::yes(exists_expr)) @@ -160,6 +171,7 @@ fn rewrite_inner_subqueries( Some(in_predicate), negated, alias, + config.options(), )? { Some((plan, exists_expr)) => { cur_input = plan; @@ -254,6 +266,7 @@ fn build_join_top( query_info: &SubqueryInfo, left: &LogicalPlan, alias: &Arc, + config_options: &ConfigOptions, ) -> Result> { let where_in_expr_opt = &query_info.where_in_expr; let in_predicate_opt = where_in_expr_opt @@ -275,7 +288,14 @@ fn build_join_top( }; let subquery = query_info.query.subquery.as_ref(); let subquery_alias = alias.next("__correlated_sq"); - build_join(left, subquery, in_predicate_opt, join_type, subquery_alias) + build_join( + left, + subquery, + in_predicate_opt, + join_type, + subquery_alias, + config_options, + ) } /// This is used to handle the case when the subquery is embedded in a more complex boolean @@ -299,16 +319,22 @@ fn mark_join( in_predicate_opt: Option, negated: bool, alias_generator: &Arc, + config_options: &ConfigOptions, ) -> Result> { let alias = alias_generator.next("__correlated_sq"); let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark")); let exists_expr = if negated { !exists_col } else { exists_col }; - Ok( - build_join(left, &subquery, in_predicate_opt, JoinType::LeftMark, alias)? - .map(|plan| (plan, exists_expr)), - ) + Ok(build_join( + left, + &subquery, + in_predicate_opt, + JoinType::LeftMark, + alias, + config_options, + )? + .map(|plan| (plan, exists_expr))) } fn build_join( @@ -317,10 +343,12 @@ fn build_join( in_predicate_opt: Option, join_type: JoinType, alias: String, + config_options: &ConfigOptions, ) -> Result> { - let mut pull_up = PullUpCorrelatedExpr::new() + let mut pull_up = PullUpCorrelatedExpr::new(config_options) .with_in_predicate_opt(in_predicate_opt.clone()) - .with_exists_sub_query(in_predicate_opt.is_none()); + .with_exists_sub_query(in_predicate_opt.is_none()) + .with_config_options(config_options); let new_plan = subquery.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 0b328ad39f55..ca49b5af6457 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use indexmap::IndexSet; use itertools::Itertools; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -522,6 +523,7 @@ fn push_down_all_join( fn push_down_join( join: Join, parent_predicate: Option<&Expr>, + config_options: &ConfigOptions, ) -> Result> { // Split the parent predicate into individual conjunctive parts. let predicates = parent_predicate @@ -535,7 +537,7 @@ fn push_down_join( // Are there any new join predicates that can be inferred from the filter expressions? let inferred_join_predicates = - infer_join_predicates(&join, &predicates, &on_filters)?; + infer_join_predicates(&join, &predicates, &on_filters, config_options)?; if on_filters.is_empty() && predicates.is_empty() @@ -561,6 +563,7 @@ fn infer_join_predicates( join: &Join, predicates: &[Expr], on_filters: &[Expr], + config_options: &ConfigOptions, ) -> Result> { // Only allow both side key is column. let join_col_keys = join @@ -581,6 +584,7 @@ fn infer_join_predicates( &join_col_keys, predicates, &mut inferred_predicates, + config_options, )?; infer_join_predicates_from_on_filters( @@ -588,6 +592,7 @@ fn infer_join_predicates( join_type, on_filters, &mut inferred_predicates, + config_options, )?; Ok(inferred_predicates.predicates) @@ -618,12 +623,14 @@ impl InferredPredicates { &mut self, predicate: Expr, replace_map: &HashMap<&Column, &Column>, + config_options: &ConfigOptions, ) -> Result<()> { if self.is_inner_join || matches!( is_restrict_null_predicate( predicate.clone(), - replace_map.keys().cloned() + replace_map.keys().cloned(), + config_options, ), Ok(true) ) @@ -648,11 +655,13 @@ fn infer_join_predicates_from_predicates( join_col_keys: &[(&Column, &Column)], predicates: &[Expr], inferred_predicates: &mut InferredPredicates, + config_options: &ConfigOptions, ) -> Result<()> { infer_join_predicates_impl::( join_col_keys, predicates, inferred_predicates, + config_options, ) } @@ -673,6 +682,7 @@ fn infer_join_predicates_from_on_filters( join_type: JoinType, on_filters: &[Expr], inferred_predicates: &mut InferredPredicates, + config_options: &ConfigOptions, ) -> Result<()> { match join_type { JoinType::Full | JoinType::LeftAnti | JoinType::RightAnti => Ok(()), @@ -680,12 +690,14 @@ fn infer_join_predicates_from_on_filters( join_col_keys, on_filters, inferred_predicates, + config_options, ), JoinType::Left | JoinType::LeftSemi | JoinType::LeftMark => { infer_join_predicates_impl::( join_col_keys, on_filters, inferred_predicates, + config_options, ) } JoinType::Right | JoinType::RightSemi => { @@ -693,6 +705,7 @@ fn infer_join_predicates_from_on_filters( join_col_keys, on_filters, inferred_predicates, + config_options, ) } } @@ -721,6 +734,7 @@ fn infer_join_predicates_impl< join_col_keys: &[(&Column, &Column)], input_predicates: &[Expr], inferred_predicates: &mut InferredPredicates, + config_options: &ConfigOptions, ) -> Result<()> { for predicate in input_predicates { let mut join_cols_to_replace = HashMap::new(); @@ -741,8 +755,11 @@ fn infer_join_predicates_impl< continue; } - inferred_predicates - .try_build_predicate(predicate.clone(), &join_cols_to_replace)?; + inferred_predicates.try_build_predicate( + predicate.clone(), + &join_cols_to_replace, + config_options, + )?; } Ok(()) } @@ -763,10 +780,10 @@ impl OptimizerRule for PushDownFilter { fn rewrite( &self, plan: LogicalPlan, - _config: &dyn OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { if let LogicalPlan::Join(join) = plan { - return push_down_join(join, None); + return push_down_join(join, None, config.options()); }; let plan_schema = Arc::clone(plan.schema()); @@ -796,7 +813,7 @@ impl OptimizerRule for PushDownFilter { new_predicate, child_filter.input, )?); - self.rewrite(new_filter, _config) + self.rewrite(new_filter, config) } LogicalPlan::Repartition(repartition) => { let new_filter = @@ -985,7 +1002,9 @@ impl OptimizerRule for PushDownFilter { } }) } - LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)), + LogicalPlan::Join(join) => { + push_down_join(join, Some(&filter.predicate), config.options()) + } LogicalPlan::TableScan(scan) => { let filter_predicates = split_conjunction(&filter.predicate); diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 3a8aef267be5..ee37dd4f85ba 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -26,6 +26,7 @@ use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::alias::AliasGenerator; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; @@ -99,7 +100,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { let mut cur_input = filter.input.as_ref().clone(); for (subquery, alias) in subqueries { if let Some((optimized_subquery, expr_check_map)) = - build_join(&subquery, &cur_input, &alias)? + build_join(&subquery, &cur_input, &alias, config.options())? { if !expr_check_map.is_empty() { rewrite_expr = rewrite_expr @@ -153,7 +154,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { let mut cur_input = projection.input.as_ref().clone(); for (subquery, alias) in all_subqueries { if let Some((optimized_subquery, expr_check_map)) = - build_join(&subquery, &cur_input, &alias)? + build_join(&subquery, &cur_input, &alias, config.options())? { cur_input = optimized_subquery; if !expr_check_map.is_empty() { @@ -295,9 +296,12 @@ fn build_join( subquery: &Subquery, filter_input: &LogicalPlan, subquery_alias: &str, + config_options: &ConfigOptions, ) -> Result)>> { let subquery_plan = subquery.subquery.as_ref(); - let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true); + let mut pull_up = PullUpCorrelatedExpr::new(config_options) + .with_need_handle_count_bug(true) + .with_config_options(config_options); let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { return Ok(None); diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 74d2ce0b6be9..675d055a841c 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -17,15 +17,15 @@ //! Expression simplification API -use std::borrow::Cow; -use std::collections::HashSet; -use std::ops::Not; - use arrow::{ array::{new_null_array, AsArray}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; +use std::borrow::Cow; +use std::collections::HashSet; +use std::ops::Not; +use std::sync::Arc; use datafusion_common::{ cast::{as_large_list_array, as_list_array}, @@ -44,16 +44,16 @@ use datafusion_expr::{ }; use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps}; +use super::inlist_simplifier::ShortenInListSimplifier; +use super::utils::*; use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; use crate::simplify_expressions::regex::simplify_regex_expr; use crate::simplify_expressions::SimplifyInfo; +use datafusion_common::config::ConfigOptions; use indexmap::IndexSet; use regex::Regex; -use super::inlist_simplifier::ShortenInListSimplifier; -use super::utils::*; - /// This structure handles API for expression simplification /// /// Provides simplification information based on DFSchema and @@ -64,6 +64,7 @@ use super::utils::*; /// use arrow::datatypes::{Schema, Field, DataType}; /// use datafusion_expr::{col, lit}; /// use datafusion_common::{DataFusionError, ToDFSchema}; +/// use datafusion_common::config::ConfigOptions; /// use datafusion_expr::execution_props::ExecutionProps; /// use datafusion_expr::simplify::SimplifyContext; /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; @@ -76,7 +77,8 @@ use super::utils::*; /// /// // Create the simplifier /// let props = ExecutionProps::new(); -/// let context = SimplifyContext::new(&props) +/// let config_options = ConfigOptions::default(); +/// let context = SimplifyContext::new(&props, &config_options) /// .with_schema(schema); /// let simplifier = ExprSimplifier::new(context); /// @@ -136,6 +138,7 @@ impl ExprSimplifier { /// /// ``` /// use arrow::datatypes::DataType; + /// use datafusion_common::config::ConfigOptions; /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_common::Result; /// use datafusion_expr::execution_props::ExecutionProps; @@ -143,14 +146,14 @@ impl ExprSimplifier { /// use datafusion_expr::simplify::SimplifyInfo; /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// use datafusion_common::DFSchema; - /// use std::sync::Arc; /// /// /// Simple implementation that provides `Simplifier` the information it needs /// /// See SimplifyContext for a structure that does this. /// #[derive(Default)] /// struct Info { /// execution_props: ExecutionProps, - /// }; + /// config_options: ConfigOptions, + /// } /// /// impl SimplifyInfo for Info { /// fn is_boolean_type(&self, expr: &Expr) -> Result { @@ -162,6 +165,9 @@ impl ExprSimplifier { /// fn execution_props(&self) -> &ExecutionProps { /// &self.execution_props /// } + /// fn config_options(&self) -> &ConfigOptions { + /// &self.config_options + /// } /// fn get_data_type(&self, expr: &Expr) -> Result { /// Ok(DataType::Int32) /// } @@ -193,7 +199,10 @@ impl ExprSimplifier { /// pub fn simplify_with_cycle_count(&self, mut expr: Expr) -> Result<(Expr, u32)> { let mut simplifier = Simplifier::new(&self.info); - let mut const_evaluator = ConstEvaluator::try_new(self.info.execution_props())?; + let mut const_evaluator = ConstEvaluator::try_new( + self.info.execution_props(), + self.info.config_options(), + )?; let mut shorten_in_list_simplifier = ShortenInListSimplifier::new(); let mut guarantee_rewriter = GuaranteeRewriter::new(&self.guarantees); @@ -248,6 +257,7 @@ impl ExprSimplifier { /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_expr::interval_arithmetic::{Interval, NullableInterval}; /// use datafusion_common::{Result, ScalarValue, ToDFSchema}; + /// use datafusion_common::config::ConfigOptions; /// use datafusion_expr::execution_props::ExecutionProps; /// use datafusion_expr::simplify::SimplifyContext; /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; @@ -261,7 +271,8 @@ impl ExprSimplifier { /// /// // Create the simplifier /// let props = ExecutionProps::new(); - /// let context = SimplifyContext::new(&props) + /// let config_options = ConfigOptions::default(); + /// let context = SimplifyContext::new(&props, &config_options) /// .with_schema(schema); /// /// // Expression: (x >= 3) AND (y + 2 < 10) AND (z > 5) @@ -307,6 +318,7 @@ impl ExprSimplifier { /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_expr::interval_arithmetic::{Interval, NullableInterval}; /// use datafusion_common::{Result, ScalarValue, ToDFSchema}; + /// use datafusion_common::config::ConfigOptions; /// use datafusion_expr::execution_props::ExecutionProps; /// use datafusion_expr::simplify::SimplifyContext; /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; @@ -320,7 +332,8 @@ impl ExprSimplifier { /// /// // Create the simplifier /// let props = ExecutionProps::new(); - /// let context = SimplifyContext::new(&props) + /// let config_options = ConfigOptions::default(); + /// let context = SimplifyContext::new(&props, &config_options) /// .with_schema(schema); /// let simplifier = ExprSimplifier::new(context); /// @@ -363,9 +376,11 @@ impl ExprSimplifier { /// instead. /// /// ```rust + /// use std::sync::Arc; /// use arrow::datatypes::{DataType, Field, Schema}; /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_common::{Result, ScalarValue, ToDFSchema}; + /// use datafusion_common::config::ConfigOptions; /// use datafusion_expr::execution_props::ExecutionProps; /// use datafusion_expr::simplify::SimplifyContext; /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; @@ -377,7 +392,8 @@ impl ExprSimplifier { /// /// // Create the simplifier /// let props = ExecutionProps::new(); - /// let context = SimplifyContext::new(&props) + /// let config_options = ConfigOptions::default(); + /// let context = SimplifyContext::new(&props, &config_options) /// .with_schema(schema); /// let simplifier = ExprSimplifier::new(context); /// @@ -474,6 +490,7 @@ struct ConstEvaluator<'a> { can_evaluate: Vec, execution_props: &'a ExecutionProps, + config_options: &'a ConfigOptions, input_schema: DFSchema, input_batch: RecordBatch, } @@ -549,7 +566,10 @@ impl<'a> ConstEvaluator<'a> { /// Create a new `ConstantEvaluator`. Session constants (such as /// the time for `now()` are taken from the passed /// `execution_props`. - pub fn try_new(execution_props: &'a ExecutionProps) -> Result { + pub fn try_new( + execution_props: &'a ExecutionProps, + config_options: &'a ConfigOptions, + ) -> Result { // The dummy column name is unused and doesn't matter as only // expressions without column references can be evaluated static DUMMY_COL_NAME: &str = "."; @@ -557,11 +577,12 @@ impl<'a> ConstEvaluator<'a> { let input_schema = DFSchema::try_from(schema.clone())?; // Need a single "input" row to produce a single output row let col = new_null_array(&DataType::Null, 1); - let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?; + let input_batch = RecordBatch::try_new(Arc::new(schema), vec![col])?; Ok(Self { can_evaluate: vec![], execution_props, + config_options, input_schema, input_batch, }) @@ -631,11 +652,15 @@ impl<'a> ConstEvaluator<'a> { return ConstSimplifyResult::NotSimplified(s); } - let phys_expr = - match create_physical_expr(&expr, &self.input_schema, self.execution_props) { - Ok(e) => e, - Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), - }; + let phys_expr = match create_physical_expr( + &expr, + &self.input_schema, + self.execution_props, + self.config_options, + ) { + Ok(e) => e, + Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), + }; let col_val = match phys_expr.evaluate(&self.input_batch) { Ok(v) => v, Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), @@ -1910,8 +1935,10 @@ mod tests { #[test] fn api_basic() { let props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(test_schema())); + let config_options = ConfigOptions::default(); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&props, &config_options).with_schema(test_schema()), + ); let expr = lit(1) + lit(2); let expected = lit(3); @@ -1922,8 +1949,10 @@ mod tests { fn basic_coercion() { let schema = test_schema(); let props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let simplifier = ExprSimplifier::new( - SimplifyContext::new(&props).with_schema(Arc::clone(&schema)), + SimplifyContext::new(&props, &config_options) + .with_schema(Arc::clone(&schema)), ); // Note expr type is int32 (not int64) @@ -1949,8 +1978,10 @@ mod tests { #[test] fn simplify_and_constant_prop() { let props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(test_schema())); + let config_options = ConfigOptions::default(); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&props, &config_options).with_schema(test_schema()), + ); // should be able to simplify to false // (i * (1 - 2)) > 0 @@ -1962,8 +1993,10 @@ mod tests { #[test] fn simplify_and_constant_prop_with_case() { let props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(test_schema())); + let config_options = ConfigOptions::default(); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&props, &config_options).with_schema(test_schema()), + ); // CASE // WHEN i>5 AND false THEN i > 5 @@ -3063,8 +3096,9 @@ mod tests { fn try_simplify(expr: Expr) -> Result { let schema = expr_test_schema(); let execution_props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let simplifier = ExprSimplifier::new( - SimplifyContext::new(&execution_props).with_schema(schema), + SimplifyContext::new(&execution_props, &config_options).with_schema(schema), ); simplifier.simplify(expr) } @@ -3076,8 +3110,9 @@ mod tests { fn try_simplify_with_cycle_count(expr: Expr) -> Result<(Expr, u32)> { let schema = expr_test_schema(); let execution_props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let simplifier = ExprSimplifier::new( - SimplifyContext::new(&execution_props).with_schema(schema), + SimplifyContext::new(&execution_props, &config_options).with_schema(schema), ); simplifier.simplify_with_cycle_count(expr) } @@ -3092,8 +3127,9 @@ mod tests { ) -> Expr { let schema = expr_test_schema(); let execution_props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let simplifier = ExprSimplifier::new( - SimplifyContext::new(&execution_props).with_schema(schema), + SimplifyContext::new(&execution_props, &config_options).with_schema(schema), ) .with_guarantees(guarantees); simplifier.simplify(expr).unwrap() @@ -3933,9 +3969,11 @@ mod tests { #[test] fn simplify_common_factor_conjunction_in_disjunction() { let props = ExecutionProps::new(); + let config_options = ConfigOptions::default(); let schema = boolean_test_schema(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(schema)); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&props, &config_options).with_schema(schema), + ); let a = || col("A"); let b = || col("B"); diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 200f1f159d81..51133ca25506 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::execution_props::ExecutionProps; @@ -70,7 +71,7 @@ impl OptimizerRule for SimplifyExpressions { ) -> Result, DataFusionError> { let mut execution_props = ExecutionProps::new(); execution_props.query_execution_start_time = config.query_execution_start_time(); - Self::optimize_internal(plan, &execution_props) + Self::optimize_internal(plan, &execution_props, config.options()) } } @@ -78,6 +79,7 @@ impl SimplifyExpressions { fn optimize_internal( plan: LogicalPlan, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result> { let schema = if !plan.inputs().is_empty() { DFSchemaRef::new(merge_schema(&plan.inputs())) @@ -100,7 +102,8 @@ impl SimplifyExpressions { Arc::new(DFSchema::empty()) }; - let info = SimplifyContext::new(execution_props).with_schema(schema); + let info = + SimplifyContext::new(execution_props, config_options).with_schema(schema); // Inputs have already been rewritten (due to bottom-up traversal handled by Optimizer) // Just need to rewrite our own expressions diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 39f8cf285d17..52d5ca2110ac 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -25,6 +25,7 @@ use crate::analyzer::type_coercion::TypeCoercionRewriter; use arrow::array::{new_null_array, Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_boolean_array; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::{Column, DFSchema, Result, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; @@ -114,6 +115,7 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) { pub fn is_restrict_null_predicate<'a>( predicate: Expr, join_cols_of_predicate: impl IntoIterator, + config_options: &ConfigOptions, ) -> Result { if matches!(predicate, Expr::Column(_)) { return Ok(true); @@ -134,8 +136,12 @@ pub fn is_restrict_null_predicate<'a>( let replaced_predicate = replace_col(predicate, &join_cols_to_replace)?; let coerced_predicate = coerce(replaced_predicate, &input_schema)?; - let phys_expr = - create_physical_expr(&coerced_predicate, &input_schema, &execution_props)?; + let phys_expr = create_physical_expr( + &coerced_predicate, + &input_schema, + &execution_props, + config_options, + )?; let result_type = phys_expr.data_type(&schema)?; if !matches!(&result_type, DataType::Boolean) { @@ -257,8 +263,12 @@ mod tests { let column_a = Column::from_name("a"); for (predicate, expected) in test_cases { let join_cols_of_predicate = std::iter::once(&column_a); - let actual = - is_restrict_null_predicate(predicate.clone(), join_cols_of_predicate)?; + let config_options = &ConfigOptions::default(); + let actual = is_restrict_null_predicate( + predicate.clone(), + join_cols_of_predicate, + config_options, + )?; assert_eq!(actual, expected, "{}", predicate); } diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 24e2fc7dbaf5..2abeaa30408a 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -266,6 +266,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; + use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{Operator, ScalarUDF}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -320,6 +321,7 @@ mod tests { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; let floor_f = &crate::udf::create_physical_expr( &test_fun, @@ -327,6 +329,7 @@ mod tests { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; let exp_a = &crate::udf::create_physical_expr( &test_fun, @@ -334,6 +337,7 @@ mod tests { &test_schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; let a_plus_b = Arc::new(BinaryExpr::new( Arc::clone(col_a), diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 25a05a2a5918..1b2c321672ab 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -148,6 +148,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SortOptions, TimeUnit}; + use datafusion_common::config::ConfigOptions; use datafusion_common::DFSchema; use datafusion_expr::{Operator, ScalarUDF}; @@ -674,6 +675,7 @@ mod tests { &schema, &[], &DFSchema::empty(), + &ConfigOptions::default(), )?; let option_asc = SortOptions { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index c3d458103285..3afbbdf994a3 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -2271,6 +2271,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, TimeUnit}; + use datafusion_common::config::ConfigOptions; use datafusion_common::ScalarValue; use datafusion_expr::Operator; @@ -3830,6 +3831,7 @@ mod tests { concat(), vec![Arc::clone(&col_a), Arc::clone(&col_b)], DataType::Utf8, + Arc::new(ConfigOptions::default()), )); // Assume existing ordering is [c ASC, a ASC, b ASC] @@ -3921,6 +3923,7 @@ mod tests { concat(), vec![Arc::clone(&col_a), Arc::clone(&col_b)], DataType::Utf8, + Arc::new(ConfigOptions::default()), )); // Assume existing ordering is [concat(a, b) ASC, a ASC, b ASC] diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 906ca9fd1093..0661deb4e917 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -24,6 +24,7 @@ use crate::{ }; use arrow::datatypes::Schema; +use datafusion_common::config::ConfigOptions; use datafusion_common::{ exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema, }; @@ -51,6 +52,7 @@ use datafusion_expr::{ /// # Example: Create `PhysicalExpr` from `Expr` /// ``` /// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::config::ConfigOptions; /// # use datafusion_common::DFSchema; /// # use datafusion_expr::{Expr, col, lit}; /// # use datafusion_physical_expr::create_physical_expr; @@ -62,8 +64,10 @@ use datafusion_expr::{ /// let df_schema = DFSchema::try_from(schema).unwrap(); /// // 2. ExecutionProps /// let props = ExecutionProps::new(); +/// // 3. ConfigOptions +/// let config_options = ConfigOptions::default(); /// // We can now create a PhysicalExpr: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props, &config_options).unwrap(); /// ``` /// /// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] @@ -72,6 +76,7 @@ use datafusion_expr::{ /// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; /// # use arrow::datatypes::{DataType, Field, Schema}; /// # use datafusion_common::{assert_batches_eq, DFSchema}; +/// # use datafusion_common::config::ConfigOptions; /// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; /// # use datafusion_physical_expr::create_physical_expr; /// # use datafusion_expr::execution_props::ExecutionProps; @@ -79,8 +84,9 @@ use datafusion_expr::{ /// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); /// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); /// # let props = ExecutionProps::new(); +/// # let config_options = ConfigOptions::default(); /// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props, &config_options).unwrap(); /// // Input of [1,2,3] /// let input_batch = RecordBatch::try_from_iter(vec![ /// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) @@ -107,13 +113,17 @@ pub fn create_physical_expr( e: &Expr, input_dfschema: &DFSchema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result> { let input_schema: &Schema = &input_dfschema.into(); match e { - Expr::Alias(Alias { expr, .. }) => { - Ok(create_physical_expr(expr, input_dfschema, execution_props)?) - } + Expr::Alias(Alias { expr, .. }) => Ok(create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?), Expr::Column(c) => { let idx = input_dfschema.index_of_column(c)?; Ok(Arc::new(Column::new(&c.name, idx))) @@ -144,12 +154,22 @@ pub fn create_physical_expr( Operator::IsNotDistinctFrom, lit(true), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr( + &binary_op, + input_dfschema, + execution_props, + config_options, + ) } Expr::IsNotTrue(expr) => { let binary_op = binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true)); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr( + &binary_op, + input_dfschema, + execution_props, + config_options, + ) } Expr::IsFalse(expr) => { let binary_op = binary_expr( @@ -157,12 +177,22 @@ pub fn create_physical_expr( Operator::IsNotDistinctFrom, lit(false), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr( + &binary_op, + input_dfschema, + execution_props, + config_options, + ) } Expr::IsNotFalse(expr) => { let binary_op = binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false)); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr( + &binary_op, + input_dfschema, + execution_props, + config_options, + ) } Expr::IsUnknown(expr) => { let binary_op = binary_expr( @@ -170,7 +200,12 @@ pub fn create_physical_expr( Operator::IsNotDistinctFrom, Expr::Literal(ScalarValue::Boolean(None)), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr( + &binary_op, + input_dfschema, + execution_props, + config_options, + ) } Expr::IsNotUnknown(expr) => { let binary_op = binary_expr( @@ -178,12 +213,27 @@ pub fn create_physical_expr( Operator::IsDistinctFrom, Expr::Literal(ScalarValue::Boolean(None)), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr( + &binary_op, + input_dfschema, + execution_props, + config_options, + ) } Expr::BinaryExpr(BinaryExpr { left, op, right }) => { // Create physical expressions for left and right operands - let lhs = create_physical_expr(left, input_dfschema, execution_props)?; - let rhs = create_physical_expr(right, input_dfschema, execution_props)?; + let lhs = create_physical_expr( + left, + input_dfschema, + execution_props, + config_options, + )?; + let rhs = create_physical_expr( + right, + input_dfschema, + execution_props, + config_options, + )?; // Note that the logical planner is responsible // for type coercion on the arguments (e.g. if one // argument was originally Int32 and one was @@ -206,10 +256,18 @@ pub fn create_physical_expr( "LIKE does not support escape_char other than the backslash (\\)" ); } - let physical_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - let physical_pattern = - create_physical_expr(pattern, input_dfschema, execution_props)?; + let physical_expr = create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?; + let physical_pattern = create_physical_expr( + pattern, + input_dfschema, + execution_props, + config_options, + )?; like( *negated, *case_insensitive, @@ -228,10 +286,18 @@ pub fn create_physical_expr( if escape_char.is_some() { return exec_err!("SIMILAR TO does not support escape_char yet"); } - let physical_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - let physical_pattern = - create_physical_expr(pattern, input_dfschema, execution_props)?; + let physical_expr = create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?; + let physical_pattern = create_physical_expr( + pattern, + input_dfschema, + execution_props, + config_options, + )?; similar_to(*negated, *case_insensitive, physical_expr, physical_pattern) } Expr::Case(case) => { @@ -240,6 +306,7 @@ pub fn create_physical_expr( e.as_ref(), input_dfschema, execution_props, + config_options, )?) } else { None @@ -249,10 +316,18 @@ pub fn create_physical_expr( .iter() .map(|(w, t)| (w.as_ref(), t.as_ref())) .unzip(); - let when_expr = - create_physical_exprs(when_expr, input_dfschema, execution_props)?; - let then_expr = - create_physical_exprs(then_expr, input_dfschema, execution_props)?; + let when_expr = create_physical_exprs( + when_expr, + input_dfschema, + execution_props, + config_options, + )?; + let then_expr = create_physical_exprs( + then_expr, + input_dfschema, + execution_props, + config_options, + )?; let when_then_expr: Vec<(Arc, Arc)> = when_expr .iter() @@ -265,6 +340,7 @@ pub fn create_physical_expr( e.as_ref(), input_dfschema, execution_props, + config_options, )?) } else { None @@ -272,35 +348,44 @@ pub fn create_physical_expr( Ok(expressions::case(expr, when_then_expr, else_expr)?) } Expr::Cast(Cast { expr, data_type }) => expressions::cast( - create_physical_expr(expr, input_dfschema, execution_props)?, + create_physical_expr(expr, input_dfschema, execution_props, config_options)?, input_schema, data_type.clone(), ), Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast( - create_physical_expr(expr, input_dfschema, execution_props)?, + create_physical_expr(expr, input_dfschema, execution_props, config_options)?, input_schema, data_type.clone(), ), - Expr::Not(expr) => { - expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?) - } + Expr::Not(expr) => expressions::not(create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?), Expr::Negative(expr) => expressions::negative( - create_physical_expr(expr, input_dfschema, execution_props)?, + create_physical_expr(expr, input_dfschema, execution_props, config_options)?, input_schema, ), Expr::IsNull(expr) => expressions::is_null(create_physical_expr( expr, input_dfschema, execution_props, + config_options, )?), Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( expr, input_dfschema, execution_props, + config_options, )?), Expr::ScalarFunction(ScalarFunction { func, args }) => { - let physical_args = - create_physical_exprs(args, input_dfschema, execution_props)?; + let physical_args = create_physical_exprs( + args, + input_dfschema, + execution_props, + config_options, + )?; scalar_function::create_physical_expr( Arc::clone(func).as_ref(), @@ -308,6 +393,7 @@ pub fn create_physical_expr( input_schema, args, input_dfschema, + config_options, ) } Expr::Between(Between { @@ -316,9 +402,24 @@ pub fn create_physical_expr( low, high, }) => { - let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; - let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; - let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; + let value_expr = create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?; + let low_expr = create_physical_expr( + low, + input_dfschema, + execution_props, + config_options, + )?; + let high_expr = create_physical_expr( + high, + input_dfschema, + execution_props, + config_options, + )?; // rewrite the between into the two binary operators let binary_expr = binary( @@ -353,11 +454,19 @@ pub fn create_physical_expr( Ok(expressions::lit(ScalarValue::Boolean(None))) } _ => { - let value_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; + let value_expr = create_physical_expr( + expr, + input_dfschema, + execution_props, + config_options, + )?; - let list_exprs = - create_physical_exprs(list, input_dfschema, execution_props)?; + let list_exprs = create_physical_exprs( + list, + input_dfschema, + execution_props, + config_options, + )?; expressions::in_list(value_expr, list_exprs, negated, input_schema) } }, @@ -375,13 +484,16 @@ pub fn create_physical_exprs<'a, I>( exprs: I, input_dfschema: &DFSchema, execution_props: &ExecutionProps, + config_options: &ConfigOptions, ) -> Result>> where I: IntoIterator, { exprs .into_iter() - .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .map(|expr| { + create_physical_expr(expr, input_dfschema, execution_props, config_options) + }) .collect::>>() } @@ -389,7 +501,9 @@ where pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc { let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); - create_physical_expr(expr, &df_schema, &execution_props).unwrap() + // usages of this are only in tests so this should be acceptable + let config_options = ConfigOptions::default(); + create_physical_expr(expr, &df_schema, &execution_props, &config_options).unwrap() } #[cfg(test)] @@ -407,7 +521,12 @@ mod tests { let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]); let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?; - let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?; + let p = create_physical_expr( + &expr, + &df_schema, + &ExecutionProps::new(), + &ConfigOptions::default(), + )?; let batch = RecordBatch::try_new( Arc::new(schema), diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 0ae4115de67a..f8d524eaaaee 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -31,7 +31,7 @@ use std::any::Any; use std::fmt::{self, Debug, Formatter}; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::PhysicalExpr; @@ -39,20 +39,23 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::Array; +use datafusion_common::config::ConfigOptions; use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF}; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +use itertools::Itertools; /// Physical expression of a scalar function -#[derive(Eq, PartialEq, Hash)] pub struct ScalarFunctionExpr { fun: Arc, name: String, args: Vec>, return_type: DataType, nullable: bool, + config_options: Arc, } impl Debug for ScalarFunctionExpr { @@ -73,6 +76,7 @@ impl ScalarFunctionExpr { fun: Arc, args: Vec>, return_type: DataType, + config_options: Arc, ) -> Self { Self { fun, @@ -80,6 +84,7 @@ impl ScalarFunctionExpr { args, return_type, nullable: true, + config_options, } } @@ -119,6 +124,47 @@ impl fmt::Display for ScalarFunctionExpr { } } +impl DynEq for ScalarFunctionExpr { + fn dyn_eq(&self, other: &dyn Any) -> bool { + other.downcast_ref::().map_or(false, |o| { + let eq = self.fun.eq(&o.fun); + let eq = eq && self.name.eq(&o.name); + let eq = eq && self.args.eq(&o.args); + let eq = eq && self.return_type.eq(&o.return_type); + let eq = eq && self.nullable.eq(&o.nullable); + let eq = eq + && self + .config_options + .entries() + .iter() + .sorted_by(|&l, &r| l.key.cmp(&r.key)) + .zip( + o.config_options + .entries() + .iter() + .sorted_by(|&l, &r| l.key.cmp(&r.key)), + ) + .filter(|(l, r)| l.ne(r)) + .count() + == 0; + + eq + }) + } +} + +impl DynHash for ScalarFunctionExpr { + fn dyn_hash(&self, mut state: &mut dyn Hasher) { + self.type_id().hash(&mut state); + self.fun.hash(&mut state); + self.name.hash(&mut state); + self.args.hash(&mut state); + self.return_type.hash(&mut state); + self.nullable.hash(&mut state); + self.config_options.entries().hash(&mut state); + } +} + impl PhysicalExpr for ScalarFunctionExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -150,6 +196,7 @@ impl PhysicalExpr for ScalarFunctionExpr { args, number_rows: batch.num_rows(), return_type: &self.return_type, + config_options: Arc::clone(&self.config_options), })?; if let ColumnarValue::Array(array) = &output { @@ -183,6 +230,7 @@ impl PhysicalExpr for ScalarFunctionExpr { Arc::clone(&self.fun), children, self.return_type().clone(), + Arc::clone(&self.config_options), ) .with_nullable(self.nullable), )) @@ -224,6 +272,7 @@ pub fn create_physical_expr( input_schema: &Schema, args: &[Expr], input_dfschema: &DFSchema, + config_options: &ConfigOptions, ) -> Result> { let input_expr_types = input_phy_exprs .iter() @@ -243,6 +292,7 @@ pub fn create_physical_expr( Arc::new(fun.clone()), input_phy_exprs.to_vec(), return_type, + Arc::new(config_options.clone()), ) .with_nullable(fun.is_nullable(args, input_dfschema)), )) diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 12ddb4cb2e32..2e53ff47508c 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -289,7 +289,12 @@ pub fn physical_plan_from_json( let back: protobuf::PhysicalPlanNode = serde_json::from_str(json) .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; let extension_codec = DefaultPhysicalExtensionCodec {}; - back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec) + back.try_into_physical_plan( + ctx, + ctx.state().config_options(), + &ctx.runtime_env(), + &extension_codec, + ) } /// Deserialize a PhysicalPlan from bytes @@ -309,5 +314,10 @@ pub fn physical_plan_from_bytes_with_extension_codec( ) -> Result> { let protobuf = protobuf::PhysicalPlanNode::decode(bytes) .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?; - protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec) + protobuf.try_into_physical_plan( + ctx, + ctx.state().config_options(), + &ctx.runtime_env(), + extension_codec, + ) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d1fe48cfec74..6fba11843c69 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -42,6 +42,7 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_proto_common::common::proto_error; @@ -70,11 +71,18 @@ impl From<&protobuf::PhysicalColumn> for Column { pub fn parse_physical_sort_expr( proto: &protobuf::PhysicalSortExprNode, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result { if let Some(expr) = &proto.expr { - let expr = parse_physical_expr(expr.as_ref(), registry, input_schema, codec)?; + let expr = parse_physical_expr( + expr.as_ref(), + registry, + config_options, + input_schema, + codec, + )?; let options = SortOptions { descending: !proto.asc, nulls_first: proto.nulls_first, @@ -97,13 +105,20 @@ pub fn parse_physical_sort_expr( pub fn parse_physical_sort_exprs( proto: &[protobuf::PhysicalSortExprNode], registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result { proto .iter() .map(|sort_expr| { - parse_physical_sort_expr(sort_expr, registry, input_schema, codec) + parse_physical_sort_expr( + sort_expr, + registry, + config_options, + input_schema, + codec, + ) }) .collect::>() } @@ -121,16 +136,27 @@ pub fn parse_physical_sort_exprs( pub fn parse_physical_window_expr( proto: &protobuf::PhysicalWindowExprNode, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { let window_node_expr = - parse_physical_exprs(&proto.args, registry, input_schema, codec)?; - let partition_by = - parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?; - - let order_by = - parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?; + parse_physical_exprs(&proto.args, registry, config_options, input_schema, codec)?; + let partition_by = parse_physical_exprs( + &proto.partition_by, + registry, + config_options, + input_schema, + codec, + )?; + + let order_by = parse_physical_sort_exprs( + &proto.order_by, + registry, + config_options, + input_schema, + codec, + )?; let window_frame = proto .window_frame @@ -182,6 +208,7 @@ pub fn parse_physical_window_expr( pub fn parse_physical_exprs<'a, I>( protos: I, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result>> @@ -190,7 +217,7 @@ where { protos .into_iter() - .map(|p| parse_physical_expr(p, registry, input_schema, codec)) + .map(|p| parse_physical_expr(p, registry, config_options, input_schema, codec)) .collect::>>() } @@ -206,6 +233,7 @@ where pub fn parse_physical_expr( proto: &protobuf::PhysicalExprNode, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -225,6 +253,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( binary_expr.l.as_deref(), registry, + config_options, "left", input_schema, codec, @@ -233,6 +262,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( binary_expr.r.as_deref(), registry, + config_options, "right", input_schema, codec, @@ -255,6 +285,7 @@ pub fn parse_physical_expr( Arc::new(IsNullExpr::new(parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -264,6 +295,7 @@ pub fn parse_physical_expr( Arc::new(IsNotNullExpr::new(parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -272,6 +304,7 @@ pub fn parse_physical_expr( ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -280,6 +313,7 @@ pub fn parse_physical_expr( Arc::new(NegativeExpr::new(parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -289,18 +323,27 @@ pub fn parse_physical_expr( parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, )?, - parse_physical_exprs(&e.list, registry, input_schema, codec)?, + parse_physical_exprs(&e.list, registry, config_options, input_schema, codec)?, &e.negated, input_schema, )?, ExprType::Case(e) => Arc::new(CaseExpr::try_new( e.expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec)) + .map(|e| { + parse_physical_expr( + e.as_ref(), + registry, + config_options, + input_schema, + codec, + ) + }) .transpose()?, e.when_then_expr .iter() @@ -309,6 +352,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( e.when_expr.as_ref(), registry, + config_options, "when_expr", input_schema, codec, @@ -316,6 +360,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( e.then_expr.as_ref(), registry, + config_options, "then_expr", input_schema, codec, @@ -325,13 +370,22 @@ pub fn parse_physical_expr( .collect::>>()?, e.else_expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema, codec)) + .map(|e| { + parse_physical_expr( + e.as_ref(), + registry, + config_options, + input_schema, + codec, + ) + }) .transpose()?, )?), ExprType::Cast(e) => Arc::new(CastExpr::new( parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -343,6 +397,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( e.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -356,7 +411,13 @@ pub fn parse_physical_expr( }; let scalar_fun_def = Arc::clone(&udf); - let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?; + let args = parse_physical_exprs( + &e.args, + registry, + config_options, + input_schema, + codec, + )?; Arc::new( ScalarFunctionExpr::new( @@ -364,6 +425,7 @@ pub fn parse_physical_expr( scalar_fun_def, args, convert_required!(e.return_type)?, + Arc::new(config_options.clone()), ) .with_nullable(e.nullable), ) @@ -374,6 +436,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( like_expr.expr.as_deref(), registry, + config_options, "expr", input_schema, codec, @@ -381,6 +444,7 @@ pub fn parse_physical_expr( parse_required_physical_expr( like_expr.pattern.as_deref(), registry, + config_options, "pattern", input_schema, codec, @@ -390,7 +454,9 @@ pub fn parse_physical_expr( let inputs: Vec> = extension .inputs .iter() - .map(|e| parse_physical_expr(e, registry, input_schema, codec)) + .map(|e| { + parse_physical_expr(e, registry, config_options, input_schema, codec) + }) .collect::>()?; (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _ } @@ -402,11 +468,12 @@ pub fn parse_physical_expr( fn parse_required_physical_expr( expr: Option<&protobuf::PhysicalExprNode>, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, field: &str, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { - expr.map(|e| parse_physical_expr(e, registry, input_schema, codec)) + expr.map(|e| parse_physical_expr(e, registry, config_options, input_schema, codec)) .transpose()? .ok_or_else(|| { DataFusionError::Internal(format!("Missing required field {field:?}")) @@ -416,6 +483,7 @@ fn parse_required_physical_expr( pub fn parse_protobuf_hash_partitioning( partitioning: Option<&protobuf::PhysicalHashRepartition>, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -424,6 +492,7 @@ pub fn parse_protobuf_hash_partitioning( let expr = parse_physical_exprs( &hash_part.hash_expr, registry, + config_options, input_schema, codec, )?; @@ -440,6 +509,7 @@ pub fn parse_protobuf_hash_partitioning( pub fn parse_protobuf_partitioning( partitioning: Option<&protobuf::Partitioning>, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -454,6 +524,7 @@ pub fn parse_protobuf_partitioning( parse_protobuf_hash_partitioning( Some(hash_repartition), registry, + config_options, input_schema, codec, ) @@ -472,6 +543,7 @@ pub fn parse_protobuf_partitioning( pub fn parse_protobuf_file_scan_config( proto: &protobuf::FileScanExecConf, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, codec: &dyn PhysicalExtensionCodec, ) -> Result { let schema: Arc = Arc::new(convert_required!(proto.schema)?); @@ -522,6 +594,7 @@ pub fn parse_protobuf_file_scan_config( let sort_expr = parse_physical_sort_exprs( &node_collection.physical_sort_expr_nodes, registry, + config_options, &schema, codec, )?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 292ce13d0ede..2d168763a9b6 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -63,6 +63,7 @@ use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; +use datafusion_common::config::ConfigOptions; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; @@ -112,6 +113,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { fn try_into_physical_plan( &self, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, runtime: &RuntimeEnv, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -134,6 +136,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input: Arc = into_physical_plan( &projection.input, registry, + config_options, runtime, extension_codec, )?; @@ -146,6 +149,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, + config_options, input.schema().as_ref(), extension_codec, )?, @@ -159,6 +163,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input: Arc = into_physical_plan( &filter.input, registry, + config_options, runtime, extension_codec, )?; @@ -169,6 +174,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, + config_options, input.schema().as_ref(), extension_codec, ) @@ -207,6 +213,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { CsvExec::builder(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), registry, + config_options, extension_codec, )?) .with_has_header(scan.has_header) @@ -243,6 +250,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let base_config = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), registry, + config_options, extension_codec, )?; let predicate = scan @@ -252,6 +260,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, + config_options, base_config.file_schema.as_ref(), extension_codec, ) @@ -270,6 +279,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), registry, + config_options, extension_codec, )?))) } @@ -277,6 +287,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input: Arc = into_physical_plan( &coalesce_batches.input, registry, + config_options, runtime, extension_codec, )?; @@ -289,20 +300,27 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { )) } PhysicalPlanType::Merge(merge) => { - let input: Arc = - into_physical_plan(&merge.input, registry, runtime, extension_codec)?; + let input: Arc = into_physical_plan( + &merge.input, + registry, + config_options, + runtime, + extension_codec, + )?; Ok(Arc::new(CoalescePartitionsExec::new(input))) } PhysicalPlanType::Repartition(repart) => { let input: Arc = into_physical_plan( &repart.input, registry, + config_options, runtime, extension_codec, )?; let partitioning = parse_protobuf_partitioning( repart.partitioning.as_ref(), registry, + config_options, input.schema().as_ref(), extension_codec, )?; @@ -312,8 +330,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { )?)) } PhysicalPlanType::GlobalLimit(limit) => { - let input: Arc = - into_physical_plan(&limit.input, registry, runtime, extension_codec)?; + let input: Arc = into_physical_plan( + &limit.input, + registry, + config_options, + runtime, + extension_codec, + )?; let fetch = if limit.fetch >= 0 { Some(limit.fetch as usize) } else { @@ -326,14 +349,20 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ))) } PhysicalPlanType::LocalLimit(limit) => { - let input: Arc = - into_physical_plan(&limit.input, registry, runtime, extension_codec)?; + let input: Arc = into_physical_plan( + &limit.input, + registry, + config_options, + runtime, + extension_codec, + )?; Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize))) } PhysicalPlanType::Window(window_agg) => { let input: Arc = into_physical_plan( &window_agg.input, registry, + config_options, runtime, extension_codec, )?; @@ -346,6 +375,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_window_expr( window_expr, registry, + config_options, input_schema.as_ref(), extension_codec, ) @@ -359,6 +389,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, + config_options, input.schema().as_ref(), extension_codec, ) @@ -398,6 +429,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input: Arc = into_physical_plan( &hash_agg.input, registry, + config_options, runtime, extension_codec, )?; @@ -431,6 +463,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, + config_options, input.schema().as_ref(), extension_codec, ) @@ -446,6 +479,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, + config_options, input.schema().as_ref(), extension_codec, ) @@ -480,6 +514,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_expr( e, registry, + config_options, &physical_schema, extension_codec, ) @@ -500,9 +535,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { match expr_type { ExprType::AggregateExpr(agg_node) => { let input_phy_expr: Vec> = agg_node.expr.iter() - .map(|e| parse_physical_expr(e, registry, &physical_schema, extension_codec)).collect::>>()?; + .map(|e| parse_physical_expr(e, registry, config_options, &physical_schema, extension_codec)).collect::>>()?; let ordering_req: LexOrdering = agg_node.ordering_req.iter() - .map(|e| parse_physical_sort_expr(e, registry, &physical_schema, extension_codec)) + .map(|e| parse_physical_sort_expr(e, registry, config_options, &physical_schema, extension_codec)) .collect::>()?; agg_node.aggregate_function.as_ref().map(|func| { match func { @@ -555,12 +590,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let left: Arc = into_physical_plan( &hashjoin.left, registry, + config_options, runtime, extension_codec, )?; let right: Arc = into_physical_plan( &hashjoin.right, registry, + config_options, runtime, extension_codec, )?; @@ -573,12 +610,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let left = parse_physical_expr( &col.left.clone().unwrap(), registry, + config_options, left_schema.as_ref(), extension_codec, )?; let right = parse_physical_expr( &col.right.clone().unwrap(), registry, + config_options, right_schema.as_ref(), extension_codec, )?; @@ -606,7 +645,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - registry, &schema, + registry, config_options, &schema, extension_codec, )?; let column_indices = f.column_indices @@ -669,12 +708,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let left = into_physical_plan( &sym_join.left, registry, + config_options, runtime, extension_codec, )?; let right = into_physical_plan( &sym_join.right, registry, + config_options, runtime, extension_codec, )?; @@ -687,12 +728,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let left = parse_physical_expr( &col.left.clone().unwrap(), registry, + config_options, left_schema.as_ref(), extension_codec, )?; let right = parse_physical_expr( &col.right.clone().unwrap(), registry, + config_options, right_schema.as_ref(), extension_codec, )?; @@ -720,7 +763,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - registry, &schema, + registry, config_options, &schema, extension_codec, )?; let column_indices = f.column_indices @@ -746,6 +789,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let left_sort_exprs = parse_physical_sort_exprs( &sym_join.left_sort_exprs, registry, + config_options, &left_schema, extension_codec, )?; @@ -758,6 +802,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let right_sort_exprs = parse_physical_sort_exprs( &sym_join.right_sort_exprs, registry, + config_options, &right_schema, extension_codec, )?; @@ -800,6 +845,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { for input in &union.inputs { inputs.push(input.try_into_physical_plan( registry, + config_options, runtime, extension_codec, )?); @@ -811,6 +857,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { for input in &interleave.inputs { inputs.push(input.try_into_physical_plan( registry, + config_options, runtime, extension_codec, )?); @@ -821,12 +868,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let left: Arc = into_physical_plan( &crossjoin.left, registry, + config_options, runtime, extension_codec, )?; let right: Arc = into_physical_plan( &crossjoin.right, registry, + config_options, runtime, extension_codec, )?; @@ -841,8 +890,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Ok(Arc::new(PlaceholderRowExec::new(schema))) } PhysicalPlanType::Sort(sort) => { - let input: Arc = - into_physical_plan(&sort.input, registry, runtime, extension_codec)?; + let input: Arc = into_physical_plan( + &sort.input, + registry, + config_options, + runtime, + extension_codec, + )?; let exprs = sort .expr .iter() @@ -863,7 +917,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { })? .as_ref(); Ok(PhysicalSortExpr { - expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?, + expr: parse_physical_expr(expr, registry, config_options, input.schema().as_ref(), extension_codec)?, options: SortOptions { descending: !sort_expr.asc, nulls_first: sort_expr.nulls_first, @@ -888,8 +942,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Ok(Arc::new(new_sort)) } PhysicalPlanType::SortPreservingMerge(sort) => { - let input: Arc = - into_physical_plan(&sort.input, registry, runtime, extension_codec)?; + let input: Arc = into_physical_plan( + &sort.input, + registry, + config_options, + runtime, + extension_codec, + )?; let exprs = sort .expr .iter() @@ -910,7 +969,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { })? .as_ref(); Ok(PhysicalSortExpr { - expr: parse_physical_expr(expr, registry, input.schema().as_ref(), extension_codec)?, + expr: parse_physical_expr(expr, registry, config_options, input.schema().as_ref(), extension_codec)?, options: SortOptions { descending: !sort_expr.asc, nulls_first: sort_expr.nulls_first, @@ -936,7 +995,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let inputs: Vec> = extension .inputs .iter() - .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec)) + .map(|i| { + i.try_into_physical_plan( + registry, + config_options, + runtime, + extension_codec, + ) + }) .collect::>()?; let extension_node = extension_codec.try_decode( @@ -948,10 +1014,20 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Ok(extension_node) } PhysicalPlanType::NestedLoopJoin(join) => { - let left: Arc = - into_physical_plan(&join.left, registry, runtime, extension_codec)?; - let right: Arc = - into_physical_plan(&join.right, registry, runtime, extension_codec)?; + let left: Arc = into_physical_plan( + &join.left, + registry, + config_options, + runtime, + extension_codec, + )?; + let right: Arc = into_physical_plan( + &join.right, + registry, + config_options, + runtime, + extension_codec, + )?; let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| { proto_error(format!( @@ -973,7 +1049,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - registry, &schema, + registry, config_options, &schema, extension_codec, )?; let column_indices = f.column_indices @@ -1007,6 +1083,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input: Arc = into_physical_plan( &analyze.input, registry, + config_options, runtime, extension_codec, )?; @@ -1018,8 +1095,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ))) } PhysicalPlanType::JsonSink(sink) => { - let input = - into_physical_plan(&sink.input, registry, runtime, extension_codec)?; + let input = into_physical_plan( + &sink.input, + registry, + config_options, + runtime, + extension_codec, + )?; let data_sink: JsonSink = sink .sink @@ -1034,6 +1116,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_sort_exprs( &collection.physical_sort_expr_nodes, registry, + config_options, &sink_schema, extension_codec, ) @@ -1048,8 +1131,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ))) } PhysicalPlanType::CsvSink(sink) => { - let input = - into_physical_plan(&sink.input, registry, runtime, extension_codec)?; + let input = into_physical_plan( + &sink.input, + registry, + config_options, + runtime, + extension_codec, + )?; let data_sink: CsvSink = sink .sink @@ -1064,6 +1152,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_sort_exprs( &collection.physical_sort_expr_nodes, registry, + config_options, &sink_schema, extension_codec, ) @@ -1084,6 +1173,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input = into_physical_plan( &sink.input, registry, + config_options, runtime, extension_codec, )?; @@ -1101,6 +1191,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { parse_physical_sort_exprs( &collection.physical_sort_expr_nodes, registry, + config_options, &sink_schema, extension_codec, ) @@ -1121,6 +1212,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let input = into_physical_plan( &unnest.input, registry, + config_options, runtime, extension_codec, )?; @@ -2064,6 +2156,7 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { fn try_into_physical_plan( &self, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, runtime: &RuntimeEnv, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result>; @@ -2154,11 +2247,12 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { fn into_physical_plan( node: &Option>, registry: &dyn FunctionRegistry, + config_options: &ConfigOptions, runtime: &RuntimeEnv, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result, DataFusionError> { if let Some(field) = node { - field.try_into_physical_plan(registry, runtime, extension_codec) + field.try_into_physical_plan(registry, config_options, runtime, extension_codec) } else { Err(proto_error("Missing required field in protobuf")) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0a6ea6c7ff85..fdbd99ca9f14 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -88,7 +88,7 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; -use datafusion_common::config::TableParquetOptions; +use datafusion_common::config::{ConfigOptions, TableParquetOptions}; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; @@ -135,7 +135,7 @@ fn roundtrip_test_and_return( .expect("to proto"); let runtime = ctx.runtime_env(); let result_exec_plan: Arc = proto - .try_into_physical_plan(ctx, runtime.deref(), codec) + .try_into_physical_plan(ctx, &ConfigOptions::default(), runtime.deref(), codec) .expect("from proto"); assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}")); Ok(result_exec_plan) @@ -936,6 +936,7 @@ fn roundtrip_scalar_udf() -> Result<()> { fun_def, vec![col("a", &schema)?], DataType::Int64, + Arc::new(ConfigOptions::default()), ); let project = @@ -1064,6 +1065,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { Arc::new(ScalarUDF::from(MyRegexUdf::new(".*".to_string()))), vec![col("text", &schema)?], DataType::Int64, + Arc::new(ConfigOptions::default()), )); let filter = Arc::new(FilterExec::try_new( @@ -1168,6 +1170,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { Arc::new(ScalarUDF::from(MyRegexUdf::new(".*".to_string()))), vec![col("text", &schema)?], DataType::Int64, + Arc::new(ConfigOptions::default()), )); let udaf = Arc::new(AggregateUDF::from(MyAggregateUDF::new( @@ -1548,6 +1551,7 @@ async fn roundtrip_coalesce() -> Result<()> { .map_err(|e| DataFusionError::External(Box::new(e)))?; let restored = node.try_into_physical_plan( &ctx, + ctx.state().config_options(), ctx.runtime_env().as_ref(), &DefaultPhysicalExtensionCodec {}, )?; diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 54b662514c88..73cbbce38c9f 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -17,6 +17,7 @@ extern crate wasm_bindgen; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::lit; @@ -59,8 +60,10 @@ pub fn basic_exprs() { // Simplify Expr (using datafusion-phys-expr and datafusion-optimizer) let schema = Arc::new(DFSchema::empty()); let execution_props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&execution_props).with_schema(schema)); + let config_options = ConfigOptions::default(); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&execution_props, &config_options).with_schema(schema), + ); let simplified_expr = simplifier.simplify(expr).unwrap(); log(&format!("Simplified Expr: {simplified_expr:?}")); }