Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ConfigOptions to ScalarFunctionArgs #13527

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ async fn main() {

// deserialize proto back to execution plan
let runtime = ctx.runtime_env();
let state = ctx.state();
let config_options = state.config_options();
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx, runtime.deref(), &composed_codec)
.try_into_physical_plan(&ctx, config_options, runtime.deref(), &composed_codec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an API change, but I think it is required to thread the config options through as each argument is specific

We could potentially improve the try_into_physical_plan API (as a follow on PR) to make it easier to update the API in the future using a trait or something like

https://github.com/apache/datafusion/blob/e99e02b9b9093ceb0c13a2dd32a2a89beba47930/datafusion/expr/src/expr_schema.rs#L39-L38

So this would look something like

pub trait ProtobufContext { 
  /// return a function registry
  fn function_registry(&self) -> &dyn FunctionRegistry;
  /// return the runtime env
  fn runtime_env(&self) -> &RuntimeEnv;
  /// return the config options
  fn config_options(&self) -> &ConfigOptions;
  /// return extension codec
  fn extension_codec(&self) -> &dyn PhysicalExtensionCodec;
}
impl AsExecutionPlan for protobuf::PhysicalPlanNode {
...
    fn try_into_physical_plan(
        &self,
        registry: &dyn FunctionRegistry,
        config_options: &ConfigOptions,
        runtime: &RuntimeEnv,
        extension_codec: &dyn PhysicalExtensionCodec,
    ) -> Result<Arc<dyn ExecutionPlan>> {

.expect("from proto");

// assert that the original and deserialized execution plans are equal
Expand Down
22 changes: 17 additions & 5 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -169,7 +170,8 @@ fn simplify_demo() -> Result<()> {
// expressions, such as the current time (to evaluate `now()`
// correctly)
let props = ExecutionProps::new();
let context = SimplifyContext::new(&props).with_schema(schema);
let config_options = ConfigOptions::default();
let context = SimplifyContext::new(&props, &config_options).with_schema(schema);
let simplifier = ExprSimplifier::new(context);

// And then call the simplify_expr function:
Expand All @@ -184,7 +186,8 @@ fn simplify_demo() -> Result<()> {

// here are some other examples of what DataFusion is capable of
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let context =
SimplifyContext::new(&props, &config_options).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
Expand Down Expand Up @@ -356,8 +359,13 @@ fn type_coercion_demo() -> Result<()> {

// Evaluation with an expression that has not been type coerced cannot succeed.
let props = ExecutionProps::default();
let physical_expr =
datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
let config_options = ConfigOptions::default();
let physical_expr = datafusion_physical_expr::create_physical_expr(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems somewhat inevitable that creating a physical expr will require the config options

However, I also think threading through the config options down through to the physical creation will (finally) permit people to pass things from the session down to function implementations (I think @cisaacson also was trying to do this in the past)

&expr,
&df_schema,
&props,
&config_options,
)?;
let e = physical_expr.evaluate(&batch).unwrap_err();
assert!(e
.find_root()
Expand All @@ -370,13 +378,15 @@ fn type_coercion_demo() -> Result<()> {
assert!(physical_expr.evaluate(&batch).is_ok());

// 2. Type coercion with `ExprSimplifier::coerce`.
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
let context = SimplifyContext::new(&props, &config_options)
.with_schema(Arc::new(df_schema.clone()));
let simplifier = ExprSimplifier::new(context);
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
let physical_expr = datafusion_physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
&config_options,
)?;
assert!(physical_expr.evaluate(&batch).is_ok());

Expand All @@ -389,6 +399,7 @@ fn type_coercion_demo() -> Result<()> {
&coerced_expr,
&df_schema,
&props,
&config_options,
)?;
assert!(physical_expr.evaluate(&batch).is_ok());

Expand Down Expand Up @@ -417,6 +428,7 @@ fn type_coercion_demo() -> Result<()> {
&coerced_expr,
&df_schema,
&props,
&config_options,
)?;
assert!(physical_expr.evaluate(&batch).is_ok());

Expand Down
5 changes: 4 additions & 1 deletion datafusion-examples/examples/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::prelude::*;
use datafusion_common::config::ConfigOptions;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -187,7 +188,9 @@ impl PruningStatistics for MyCatalog {
fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate {
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
let config_options = ConfigOptions::default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something we could potentially do to make this API slightly easier to use might be to create a static default ConfigOptions

Something like

impl ConfigOptions {
  /// returns a reference to default ConfigOptions
  pub fn default_singleton() -> &'static ConfigOptions 
}

This would then make it easier to return &ConfigOptions in various places when only the default was needed

For example, then in LocalCsvTableFunc you could avoid having to thread the ConfigOptions through as in that example having the actual config options isn't important

let physical_expr =
create_physical_expr(&expr, &df_schema, &props, &config_options).unwrap();
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
}

Expand Down
12 changes: 9 additions & 3 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{Expr, TableType};
Expand All @@ -48,7 +49,9 @@ async fn main() -> Result<()> {
let ctx = SessionContext::new();

// register the table function that will be called in SQL statements by `read_csv`
ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {}));
let state = ctx.state();
let config_options = Arc::new(state.config_options().clone());
ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc { config_options }));

let testdata = datafusion::test_util::arrow_test_data();
let csv_file = format!("{testdata}/csv/aggregate_test_100.csv");
Expand Down Expand Up @@ -129,7 +132,9 @@ impl TableProvider for LocalCsvTable {
}

#[derive(Debug)]
struct LocalCsvTableFunc {}
struct LocalCsvTableFunc {
config_options: Arc<ConfigOptions>,
}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
Expand All @@ -142,7 +147,8 @@ impl TableFunctionImpl for LocalCsvTableFunc {
.map(|expr| {
// try to simplify the expression, so 1+2 becomes 3, for example
let execution_props = ExecutionProps::new();
let info = SimplifyContext::new(&execution_props);
let config_options = Arc::unwrap_or_clone(self.config_options.clone());
let info = SimplifyContext::new(&execution_props, &config_options);
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;

if let Expr::Literal(ScalarValue::Int64(Some(limit))) = expr {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ config_namespace! {
}

/// A key value pair, with a corresponding description
#[derive(Debug)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct ConfigEntry {
/// A unique string to identify this config value
pub key: String,
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ async fn prune_partitions(
partitions: Vec<Partition>,
filters: &[Expr],
partition_cols: &[(String, DataType)],
ctx: &SessionState,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend passing &ConfigOptions directly here as the function only needs the config, not the entire session state

) -> Result<Vec<Partition>> {
if filters.is_empty() {
return Ok(partitions);
Expand Down Expand Up @@ -286,13 +287,12 @@ async fn prune_partitions(
)?;

let batch = RecordBatch::try_new(schema, arrays)?;

// TODO: Plumb this down
let props = ExecutionProps::new();
let config_options = ctx.config_options();

// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Result<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props)?;
let expr = create_physical_expr(filter, &df_schema, &props, config_options)?;
expr.evaluate(&batch)?.into_array(partitions.len())
};

Expand Down Expand Up @@ -436,7 +436,7 @@ pub async fn pruned_partition_list<'a>(
debug!("Listed {} partitions", partitions.len());

let pruned =
prune_partitions(table_path, partitions, filters, partition_cols).await?;
prune_partitions(table_path, partitions, filters, partition_cols, ctx).await?;

debug!("Pruning yielded {} partitions", pruned.len());

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,7 @@ impl TableProvider for ListingTable {
&expr,
&table_df_schema,
state.execution_props(),
session_state.config_options(),
)?;
Some(filters)
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use crate::execution::context::SessionState;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_optimizer::OptimizerConfig;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
Expand Down Expand Up @@ -234,7 +233,7 @@ impl ListingTableUrl {
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.options().execution;
let exec_options = &ctx.config_options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
// If the prefix is a file, use a head request, otherwise list
let list = match self.is_collection() {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl TableProvider for MemTable {
sort_exprs,
&df_schema,
state.execution_props(),
state.config_options(),
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,10 @@ fn create_output_array(

#[cfg(test)]
mod tests {
use arrow_array::Int32Array;

use super::*;
use crate::{test::columns, test_util::aggr_test_schema};
use arrow_array::Int32Array;
use datafusion_common::config::ConfigOptions;

#[test]
fn physical_plan_config_no_projection() {
Expand Down Expand Up @@ -1121,6 +1121,7 @@ mod tests {
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&ExecutionProps::default(),
&ConfigOptions::default(),
)
})
.collect::<Result<Vec<_>>>()?,
Expand Down
20 changes: 15 additions & 5 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ impl SessionState {
// analyze & capture output of each rule
let analyzer_result = self.analyzer.execute_and_check(
e.plan.as_ref().clone(),
self.options(),
self.config_options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
Expand Down Expand Up @@ -708,7 +708,7 @@ impl SessionState {
} else {
let analyzed_plan = self.analyzer.execute_and_check(
plan.clone(),
self.options(),
self.config_options(),
|_, _| {},
)?;
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
Expand Down Expand Up @@ -760,13 +760,19 @@ impl SessionState {
let mut expr = simplifier.coerce(expr, df_schema)?;

// rewrite Exprs to functions if necessary
let config_options = self.config_options();
for rewrite in self.analyzer.function_rewrites() {
expr = expr
.transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))?
.transform_up(|expr| {
rewrite.rewrite(expr, df_schema, self.config_options())
})?
.data;
}
create_physical_expr(&expr, df_schema, self.execution_props())
create_physical_expr(
&expr,
df_schema,
self.execution_props(),
self.config_options(),
)
}

/// Return the session ID
Expand Down Expand Up @@ -1968,6 +1974,10 @@ impl SimplifyInfo for SessionSimplifyProvider<'_> {
self.state.execution_props()
}

fn config_options(&self) -> &ConfigOptions {
self.state.config_options()
}

fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
expr.get_type(self.df_schema)
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ mod tests {
)),
],
DataType::Int32,
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1486,6 +1487,7 @@ mod tests {
)),
],
DataType::Int32,
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -1554,6 +1556,7 @@ mod tests {
)),
],
DataType::Int32,
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1619,6 +1622,7 @@ mod tests {
)),
],
DataType::Int32,
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
Loading
Loading