Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into make-recursive-optional
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 21, 2024
2 parents 4689a84 + a50ed34 commit 281d737
Show file tree
Hide file tree
Showing 161 changed files with 6,524 additions and 2,445 deletions.
36 changes: 20 additions & 16 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,22 +326,26 @@ jobs:
env:
POSTGRES_PORT: ${{ job.services.postgres.ports[5432] }}

windows:
name: cargo test (win64)
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Setup Rust toolchain
uses: ./.github/actions/setup-windows-builder
- name: Run tests (excluding doctests)
shell: bash
run: |
export PATH=$PATH:$HOME/d/protoc/bin
cargo test --lib --tests --bins --features avro,json,backtrace
cd datafusion-cli
cargo test --lib --tests --bins --all-features
# Temporarily commenting out the Windows flow, the reason is enormously slow running build
# Waiting for new Windows 2025 github runner
# Details: https://github.com/apache/datafusion/issues/13726
#
# windows:
# name: cargo test (win64)
# runs-on: windows-latest
# steps:
# - uses: actions/checkout@v4
# with:
# submodules: true
# - name: Setup Rust toolchain
# uses: ./.github/actions/setup-windows-builder
# - name: Run tests (excluding doctests)
# shell: bash
# run: |
# export PATH=$PATH:$HOME/d/protoc/bin
# cargo test --lib --tests --bins --features avro,json,backtrace
# cd datafusion-cli
# cargo test --lib --tests --bins --all-features

macos:
name: cargo test (macos)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ recursive = "0.1.1"
regex = "1.8"
rstest = "0.23.0"
serde_json = "1"
sqlparser = { version = "0.52.0", features = ["visitor"] }
sqlparser = { version = "0.53.0", features = ["visitor"] }
tempfile = "3"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
url = "2.2"
Expand Down
27 changes: 20 additions & 7 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ use crate::{
};

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
Expand Down Expand Up @@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
the source finishes, but the source is unbounded"
);
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
Expand Down
9 changes: 5 additions & 4 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,13 @@ mod tests {

#[tokio::test]
async fn s3_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
// "fake" is uppercase to ensure the values are not lowercased when parsed
let access_key_id = "FAKE_access_key_id";
let secret_access_key = "FAKE_secret_access_key";
let region = "fake_us-east-2";
let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";
let session_token = "FAKE_session_token";
let location = "s3://bucket/path/FAKE/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ cargo run --example dataframe
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -214,7 +215,8 @@ impl CustomExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
Loading

0 comments on commit 281d737

Please sign in to comment.