Skip to content

Commit

Permalink
Support reading into pyarrow.Table (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger authored Oct 9, 2022
1 parent 015e146 commit 0d8ef40
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flaco"
version = "0.6.0-rc3"
version = "0.6.0-rc4"
edition = "2018"
license = "Unlicense/MIT"

Expand Down
57 changes: 27 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,44 @@


The easiest and perhaps most memory efficient way to get PostgreSQL data (more flavors to come?)
into Arrow (IPC/Feather) or Parquet files.

If you're trying to load data directly into Pandas then you may find that evan a 'real' 100MB can cause
bloat upwards of 1GB. Expanding this can cause significant bottle necks in processing data efficiently.
into `pyarrow.Table`, `pandas.DataFrame` or Arrow (IPC/Feather) and Parquet files.

Since [Arrow](https://github.com/apache/arrow) supports efficient and even larger-than-memory processing,
as with [dask](https://github.com/dask/dask), [duckdb](https://duckdb.org/), or others.
Just getting data onto disk is sometimes the hardest part; this aims to make that easier.
Just getting data onto disk is sometimes the hardest part; this aims to make that easier.

API:
`flaco.read_sql_to_file`: Read SQL query into Feather or Parquet file.
`flaco.read_sql_to_pyarrow`: Read SQL query into a pyarrow table.

NOTE:
This is still a WIP, and is purpose built for my needs. I intend to generalize it more to be
This is still a WIP. I intend to generalize it more to be
useful towards a wider audience. Issues and pull requests welcome!

---

### Example

```python
from flaco import read_sql_to_file, FileFormat


uri = "postgresql://postgres:postgres@localhost:5432/postgres"
stmt = "select * from my_big_table"

read_sql_to_file(uri, stmt, 'output.data', FileFormat.Parquet)

# Then with pandas...
import pandas as pd
df = pd.read_parquet('output.data')

# pyarrow... (memory mapped file, where potentially larger than memory)
import pyarrow as pa
with pa.memory_map('output.data', 'rb') as source:
table = pa.ipc.open_file(source).read_all() # mmap pyarrow.Table

# DuckDB...
import duckdb
cur = duckdb.connect()
cur.execute("select * from read_parquet('output.data')")

# Or anything else which works with Arrow and/or Parquet files
```bash
Line # Mem usage Increment Occurrences Line Contents
=============================================================
122 147.9 MiB 147.9 MiB 1 @profile
123 def memory_profile():
124 147.9 MiB 0.0 MiB 1 stmt = "select * from test_table"
125
126 # Read SQL to file
127 150.3 MiB 2.4 MiB 1 flaco.read_sql_to_file(DB_URI, stmt, 'result.feather', flaco.FileFormat.Feather)
128 150.3 MiB 0.0 MiB 1 with pa.memory_map('result.feather', 'rb') as source:
129 150.3 MiB 0.0 MiB 1 table1 = pa.ipc.open_file(source).read_all()
130 408.1 MiB 257.8 MiB 1 table1_df1 = table1.to_pandas()
131
132 # Read SQL to pyarrow.Table
133 504.3 MiB 96.2 MiB 1 table2 = flaco.read_sql_to_pyarrow(DB_URI, stmt)
134 644.1 MiB 139.8 MiB 1 table2_df = table2.to_pandas()
135
136 # Pandas
137 648.8 MiB 4.7 MiB 1 engine = create_engine(DB_URI)
138 1335.4 MiB 686.6 MiB 1 _pandas_df = pd.read_sql(stmt, engine)
```

---
Expand Down
26 changes: 16 additions & 10 deletions benchmarks/test_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
import numpy as np
import pandas as pd
import flaco
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as pf
import pyarrow.dataset as ds
from memory_profiler import profile
from sqlalchemy import create_engine

Expand Down Expand Up @@ -117,21 +122,22 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False):
@profile
def memory_profile():
stmt = "select * from test_table"

# Read SQL to file
flaco.read_sql_to_file(DB_URI, stmt, 'result.feather', flaco.FileFormat.Feather)
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as pf
import pyarrow.dataset as ds
with pa.memory_map('result.feather', 'rb') as source:
mytable = pa.ipc.open_file(source).read_all()
table_df = mytable.to_pandas()
#print(v)
print(pa.total_allocated_bytes() >> 20)
table1 = pa.ipc.open_file(source).read_all()
table1_df1 = table1.to_pandas()

# Read SQL to pyarrow.Table
table2 = flaco.read_sql_to_pyarrow(DB_URI, stmt)
table2_df = table2.to_pandas()

# Pandas
engine = create_engine(DB_URI)
_pandas_df = pd.read_sql(stmt, engine)


if __name__ == "__main__":
#_table_setup(n_rows=1_000_000, include_nulls=False)
#_table_setup(n_rows=1_000_000, include_nulls=True)
memory_profile()
38 changes: 37 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Schema};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::ffi::{export_array_to_c, export_field_to_c};
use arrow2::io::{ipc, parquet};
use arrow2::{array, array::MutableArray};
use pyo3::create_exception;
Expand All @@ -18,6 +19,7 @@ create_exception!(flaco, FlacoException, PyException);
fn flaco(py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(wrap_pyfunction!(read_sql_to_file, m)?)?;
m.add_function(wrap_pyfunction!(read_sql_to_pyarrow, m)?)?;
m.add_class::<FileFormat>()?;
m.add("FlacoException", py.get_type::<FlacoException>())?;
Ok(())
Expand All @@ -35,6 +37,40 @@ fn to_py_err(err: impl ToString) -> PyErr {
PyErr::new::<FlacoException, _>(err.to_string())
}

/// Read SQL into a pyarrow.Table object. This assumes pyarrow is installed.
#[pyfunction]
pub fn read_sql_to_pyarrow<'py>(py: Python<'py>, uri: &str, stmt: &str) -> PyResult<PyObject> {
let pyarrow = PyModule::import(py, "pyarrow")?;
let pyarrow_array = pyarrow.getattr("Array")?;

let mut client = postgres::Client::connect(uri, postgres::NoTls).map_err(to_py_err)?;
let table = postgresql::read_sql(&mut client, stmt).map_err(to_py_err)?;

let mut pyarrow_array_mapping: BTreeMap<String, PyObject> = BTreeMap::new();
for (key, ref mut value) in table.into_iter() {
let field = Field::new(
&key,
value.dtype().clone(),
value.array.validity().is_some(),
);
let field_box = Box::new(export_field_to_c(&field));
let array_box = Box::new(export_array_to_c(value.array.as_box()));
let result = pyarrow_array
.call_method1(
"_import_from_c",
(
array_box.as_ref() as *const _ as *const i64 as i64,
field_box.as_ref() as *const _ as *const i64 as i64,
),
)?
.to_object(py);
pyarrow_array_mapping.insert(key, result);
}
pyarrow
.call_method1("table", (pyarrow_array_mapping.to_object(py),))
.map(|v| v.to_object(py))
}

/// Read SQL to a file; Parquet or Feather/IPC format.
// TODO: Stream data into a file in chunks during query reading
#[pyfunction]
Expand Down

0 comments on commit 0d8ef40

Please sign in to comment.