From 0d8ef40346a5d79e4b83d335250ad068c061b496 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Sun, 9 Oct 2022 14:26:04 +0200 Subject: [PATCH] Support reading into pyarrow.Table (#31) --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 57 +++++++++++++++++------------------ benchmarks/test_benchmarks.py | 26 ++++++++++------ src/lib.rs | 38 ++++++++++++++++++++++- 5 files changed, 82 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2731086..b1c3d1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,7 +236,7 @@ checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" [[package]] name = "flaco" -version = "0.6.0-rc2" +version = "0.6.0-rc3" dependencies = [ "arrow2", "numpy", diff --git a/Cargo.toml b/Cargo.toml index d1789ab..35ec80f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flaco" -version = "0.6.0-rc3" +version = "0.6.0-rc4" edition = "2018" license = "Unlicense/MIT" diff --git a/README.md b/README.md index 997abea..4c73c98 100644 --- a/README.md +++ b/README.md @@ -8,47 +8,44 @@ The easiest and perhaps most memory efficient way to get PostgreSQL data (more flavors to come?) -into Arrow (IPC/Feather) or Parquet files. - -If you're trying to load data directly into Pandas then you may find that evan a 'real' 100MB can cause -bloat upwards of 1GB. Expanding this can cause significant bottle necks in processing data efficiently. +into `pyarrow.Table`, `pandas.DataFrame` or Arrow (IPC/Feather) and Parquet files. Since [Arrow](https://github.com/apache/arrow) supports efficient and even larger-than-memory processing, as with [dask](https://github.com/dask/dask), [duckdb](https://duckdb.org/), or others. -Just getting data onto disk is sometimes the hardest part; this aims to make that easier. +Just getting data onto disk is sometimes the hardest part; this aims to make that easier. + +API: +`flaco.read_sql_to_file`: Read SQL query into Feather or Parquet file. +`flaco.read_sql_to_pyarrow`: Read SQL query into a pyarrow table. NOTE: -This is still a WIP, and is purpose built for my needs. I intend to generalize it more to be +This is still a WIP. I intend to generalize it more to be useful towards a wider audience. Issues and pull requests welcome! --- ### Example -```python -from flaco import read_sql_to_file, FileFormat - - -uri = "postgresql://postgres:postgres@localhost:5432/postgres" -stmt = "select * from my_big_table" - -read_sql_to_file(uri, stmt, 'output.data', FileFormat.Parquet) - -# Then with pandas... -import pandas as pd -df = pd.read_parquet('output.data') - -# pyarrow... (memory mapped file, where potentially larger than memory) -import pyarrow as pa -with pa.memory_map('output.data', 'rb') as source: - table = pa.ipc.open_file(source).read_all() # mmap pyarrow.Table - -# DuckDB... -import duckdb -cur = duckdb.connect() -cur.execute("select * from read_parquet('output.data')") - -# Or anything else which works with Arrow and/or Parquet files +```bash +Line # Mem usage Increment Occurrences Line Contents +============================================================= + 122 147.9 MiB 147.9 MiB 1 @profile + 123 def memory_profile(): + 124 147.9 MiB 0.0 MiB 1 stmt = "select * from test_table" + 125 + 126 # Read SQL to file + 127 150.3 MiB 2.4 MiB 1 flaco.read_sql_to_file(DB_URI, stmt, 'result.feather', flaco.FileFormat.Feather) + 128 150.3 MiB 0.0 MiB 1 with pa.memory_map('result.feather', 'rb') as source: + 129 150.3 MiB 0.0 MiB 1 table1 = pa.ipc.open_file(source).read_all() + 130 408.1 MiB 257.8 MiB 1 table1_df1 = table1.to_pandas() + 131 + 132 # Read SQL to pyarrow.Table + 133 504.3 MiB 96.2 MiB 1 table2 = flaco.read_sql_to_pyarrow(DB_URI, stmt) + 134 644.1 MiB 139.8 MiB 1 table2_df = table2.to_pandas() + 135 + 136 # Pandas + 137 648.8 MiB 4.7 MiB 1 engine = create_engine(DB_URI) + 138 1335.4 MiB 686.6 MiB 1 _pandas_df = pd.read_sql(stmt, engine) ``` --- diff --git a/benchmarks/test_benchmarks.py b/benchmarks/test_benchmarks.py index 4daf3b1..e603310 100644 --- a/benchmarks/test_benchmarks.py +++ b/benchmarks/test_benchmarks.py @@ -2,6 +2,11 @@ import numpy as np import pandas as pd import flaco +import duckdb +import pyarrow as pa +import pyarrow.parquet as pq +import pyarrow.feather as pf +import pyarrow.dataset as ds from memory_profiler import profile from sqlalchemy import create_engine @@ -117,21 +122,22 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False): @profile def memory_profile(): stmt = "select * from test_table" + + # Read SQL to file flaco.read_sql_to_file(DB_URI, stmt, 'result.feather', flaco.FileFormat.Feather) - import duckdb - import pyarrow as pa - import pyarrow.parquet as pq - import pyarrow.feather as pf - import pyarrow.dataset as ds with pa.memory_map('result.feather', 'rb') as source: - mytable = pa.ipc.open_file(source).read_all() - table_df = mytable.to_pandas() - #print(v) - print(pa.total_allocated_bytes() >> 20) + table1 = pa.ipc.open_file(source).read_all() + table1_df1 = table1.to_pandas() + + # Read SQL to pyarrow.Table + table2 = flaco.read_sql_to_pyarrow(DB_URI, stmt) + table2_df = table2.to_pandas() + + # Pandas engine = create_engine(DB_URI) _pandas_df = pd.read_sql(stmt, engine) if __name__ == "__main__": - #_table_setup(n_rows=1_000_000, include_nulls=False) + #_table_setup(n_rows=1_000_000, include_nulls=True) memory_profile() diff --git a/src/lib.rs b/src/lib.rs index 203ead1..d8b4cee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ use arrow2::chunk::Chunk; -use arrow2::datatypes::{DataType, Schema}; +use arrow2::datatypes::{DataType, Field, Schema}; +use arrow2::ffi::{export_array_to_c, export_field_to_c}; use arrow2::io::{ipc, parquet}; use arrow2::{array, array::MutableArray}; use pyo3::create_exception; @@ -18,6 +19,7 @@ create_exception!(flaco, FlacoException, PyException); fn flaco(py: Python, m: &PyModule) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_function(wrap_pyfunction!(read_sql_to_file, m)?)?; + m.add_function(wrap_pyfunction!(read_sql_to_pyarrow, m)?)?; m.add_class::()?; m.add("FlacoException", py.get_type::())?; Ok(()) @@ -35,6 +37,40 @@ fn to_py_err(err: impl ToString) -> PyErr { PyErr::new::(err.to_string()) } +/// Read SQL into a pyarrow.Table object. This assumes pyarrow is installed. +#[pyfunction] +pub fn read_sql_to_pyarrow<'py>(py: Python<'py>, uri: &str, stmt: &str) -> PyResult { + let pyarrow = PyModule::import(py, "pyarrow")?; + let pyarrow_array = pyarrow.getattr("Array")?; + + let mut client = postgres::Client::connect(uri, postgres::NoTls).map_err(to_py_err)?; + let table = postgresql::read_sql(&mut client, stmt).map_err(to_py_err)?; + + let mut pyarrow_array_mapping: BTreeMap = BTreeMap::new(); + for (key, ref mut value) in table.into_iter() { + let field = Field::new( + &key, + value.dtype().clone(), + value.array.validity().is_some(), + ); + let field_box = Box::new(export_field_to_c(&field)); + let array_box = Box::new(export_array_to_c(value.array.as_box())); + let result = pyarrow_array + .call_method1( + "_import_from_c", + ( + array_box.as_ref() as *const _ as *const i64 as i64, + field_box.as_ref() as *const _ as *const i64 as i64, + ), + )? + .to_object(py); + pyarrow_array_mapping.insert(key, result); + } + pyarrow + .call_method1("table", (pyarrow_array_mapping.to_object(py),)) + .map(|v| v.to_object(py)) +} + /// Read SQL to a file; Parquet or Feather/IPC format. // TODO: Stream data into a file in chunks during query reading #[pyfunction]