Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: polars creation #64

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
build:
cargo build
cargo build --release
maturin develop --release

test: build
Expand Down
8 changes: 3 additions & 5 deletions src/execution_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use arrow::{
pyarrow::{PyArrowType, ToPyArrow},
};
use datafusion::prelude::DataFrame;
use pyo3::{pyclass, pymethods, types::PyTuple, IntoPy, PyObject, PyResult, Python, ToPyObject};
use pyo3::{pyclass, pymethods, types::PyTuple, PyObject, PyResult, Python, ToPyObject};

use crate::{error, runtime::wait_for_future};

Expand Down Expand Up @@ -52,12 +52,11 @@ impl PyExecutionResult {
/// Convert to Arrow Table
fn to_arrow_table(&self, py: Python) -> PyResult<PyObject> {
let batches = self.collect(py)?.to_object(py);
let schema: PyObject = self.schema().into_py(py);

Python::with_gil(|py| {
// Instantiate pyarrow Table object and use its from_batches method
let table_class = py.import("pyarrow")?.getattr("Table")?;
let args = PyTuple::new(py, &[batches, schema]);
let args = PyTuple::new(py, &[batches]);
let table: PyObject = table_class.call_method1("from_batches", args)?.into();
Ok(table)
})
Expand All @@ -66,11 +65,10 @@ impl PyExecutionResult {
/// Convert to a Polars DataFrame
fn to_polars(&self, py: Python) -> PyResult<PyObject> {
let batches = self.collect(py)?.to_object(py);
let schema: PyObject = self.schema().into_py(py);

Python::with_gil(|py| {
let table_class = py.import("pyarrow")?.getattr("Table")?;
let args = PyTuple::new(py, &[batches, schema]);
let args = PyTuple::new(py, &[batches]);
let table: PyObject = table_class.call_method1("from_batches", args)?.into();

let table_class = py.import("polars")?.getattr("DataFrame")?;
Expand Down
12 changes: 11 additions & 1 deletion src/session_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use datafusion::prelude::SessionContext;
use exon::ExonSessionExt;
use exon::{ExonRuntimeEnvExt, ExonSessionExt};

use pyo3::prelude::*;

Expand Down Expand Up @@ -41,12 +41,22 @@ impl ExonSessionContext {
Ok(Self::default())
}

/// Execute a SQL query and return the result as a [`PyExecutionResult`].
fn sql(&mut self, query: &str, py: Python) -> PyResult<PyExecutionResult> {
let result = self.ctx.sql(query);
let df = wait_for_future(py, result).map_err(error::BioBearError::from)?;

Ok(PyExecutionResult::new(df))
}

/// Register an object store with the given URI.
fn register_object_store_from_url(&mut self, url: &str, py: Python) -> PyResult<()> {
let runtime = self.ctx.runtime_env();
let registration = runtime.exon_register_object_store_uri(url);
wait_for_future(py, registration).map_err(error::BioBearError::from)?;

Ok(())
}
}

#[pyfunction]
Expand Down
Loading