Skip to content

Commit

Permalink
Improve temporal types (#28)
Browse files Browse the repository at this point in the history
Remove all use of Utf8Array when handling temporal types, in favor or proper DataType::Timestamp and the like.
  • Loading branch information
milesgranger authored Sep 26, 2022
1 parent a27ff8f commit 61a96ca
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 100 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flaco"
version = "0.6.0-rc2"
version = "0.6.0-rc3"
edition = "2018"
license = "Unlicense/MIT"

Expand All @@ -18,7 +18,7 @@ serde_json = "^1"
numpy = "0.17"
arrow2 = { version = "^0.13", features = ["io_ipc", "io_parquet"] }
rust_decimal = { version = "1.16.0", features = ["db-postgres"] }
time = { version = "0.3.3", features = ["formatting"] }
time = { version = "0.3.3", features = ["formatting", "parsing"] }
postgres = { version = "0.19.1", features = ["with-time-0_3", "with-serde_json-1", "with-uuid-0_8"] }
postgres-protocol = "0.6.2"
pyo3 = { version = "0.17", default-features = false, features = ["macros"] }
Expand Down
12 changes: 3 additions & 9 deletions benchmarks/test_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False):
df["col8"] = pd.to_datetime(df.col7)
df["col9"] = pd.to_datetime(df.col7, utc=True)
df["col10"] = df.col9.dt.time
df.to_sql(table, index=False, con=engine, chunksize=10_000, if_exists="append")
df.to_sql(table, index=False, con=engine, chunksize=50_000, if_exists="replace")

if include_nulls:
df = df[:20]
Expand All @@ -118,26 +118,20 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False):
def memory_profile():
stmt = "select * from test_table"
flaco.read_sql_to_file(DB_URI, stmt, 'result.feather', flaco.FileFormat.Feather)
data = flaco.read_sql_to_numpy(DB_URI, stmt)
df = pd.DataFrame(data, copy=False).convert_dtypes()
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as pf
import pyarrow.dataset as ds
with pa.memory_map('result.feather', 'rb') as source:
mytable = pa.ipc.open_file(source).read_all()
table = mytable.rename_columns([f"col_{i}" for i in range(10)])
table_df = table.to_pandas()
table_df = mytable.to_pandas()
#print(v)
print(type(mytable), len(mytable))
print(type(table), len(table))
print(pa.total_allocated_bytes() >> 20)
breakpoint()
engine = create_engine(DB_URI)
_pandas_df = pd.read_sql(stmt, engine)


if __name__ == "__main__":
_table_setup(n_rows=10_000, include_nulls=False)
#_table_setup(n_rows=1_000_000, include_nulls=False)
memory_profile()
160 changes: 71 additions & 89 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use arrow2::array::{
BinaryArray, BooleanArray, FixedSizeBinaryArray, MutableBinaryArray, MutableBooleanArray,
MutableFixedSizeBinaryArray, MutablePrimitiveArray, MutableUtf8Array, PrimitiveArray,
Utf8Array,
};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Schema};
use arrow2::io::{ipc, parquet};
use arrow2::{array, array::MutableArray};
use numpy::IntoPyArray;
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::prelude::*;
Expand All @@ -24,7 +18,6 @@ create_exception!(flaco, FlacoException, PyException);
fn flaco(py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(wrap_pyfunction!(read_sql_to_file, m)?)?;
m.add_function(wrap_pyfunction!(read_sql_to_numpy, m)?)?;
m.add_class::<FileFormat>()?;
m.add("FlacoException", py.get_type::<FlacoException>())?;
Ok(())
Expand Down Expand Up @@ -55,23 +48,6 @@ pub fn read_sql_to_file(uri: &str, stmt: &str, path: &str, format: FileFormat) -
Ok(())
}

/// Read SQL to a dict of numpy arrays, where keys are column names.
/// NOTE: This is not very efficient currently, likely should not use it.
#[pyfunction]
pub fn read_sql_to_numpy<'py>(
py: Python<'py>,
uri: &str,
stmt: &str,
) -> PyResult<BTreeMap<String, PyObject>> {
let mut client = postgres::Client::connect(uri, postgres::NoTls).map_err(to_py_err)?;
let table = postgresql::read_sql(&mut client, stmt).map_err(to_py_err)?;
let mut result = BTreeMap::new();
for (name, column) in table {
result.insert(name, column.into_pyarray(py));
}
Ok(result)
}

pub type Table = BTreeMap<String, Column>;

pub struct Column {
Expand Down Expand Up @@ -99,43 +75,6 @@ impl Column {
self.inner_mut::<T>().try_push(value)?;
Ok(())
}
pub fn into_pyarray(mut self, py: Python) -> PyObject {
macro_rules! to_pyarray {
($mut_arr:ty, $arr:ty) => {{
self.inner_mut::<$mut_arr>()
.as_arc()
.as_ref()
.as_any()
.downcast_ref::<$arr>()
.unwrap()
.iter()
.map(|v| v.to_object(py))
.collect::<Vec<_>>()
.into_pyarray(py)
.to_object(py)
}};
}
match self.dtype {
DataType::Boolean => to_pyarray!(MutableBooleanArray, BooleanArray),
DataType::Binary => to_pyarray!(MutableBinaryArray<i32>, BinaryArray<i32>),
DataType::Utf8 => to_pyarray!(MutableUtf8Array<i32>, Utf8Array<i32>),
DataType::Int8 => to_pyarray!(MutablePrimitiveArray<i8>, PrimitiveArray<i8>),
DataType::Int16 => to_pyarray!(MutablePrimitiveArray<i16>, PrimitiveArray<i16>),
DataType::Int32 => to_pyarray!(MutablePrimitiveArray<i32>, PrimitiveArray<i32>),
DataType::UInt32 => to_pyarray!(MutablePrimitiveArray<u32>, PrimitiveArray<u32>),
DataType::Int64 => to_pyarray!(MutablePrimitiveArray<i64>, PrimitiveArray<i64>),
DataType::UInt64 => to_pyarray!(MutablePrimitiveArray<u64>, PrimitiveArray<u64>),
DataType::Float32 => to_pyarray!(MutablePrimitiveArray<f32>, PrimitiveArray<f32>),
DataType::Float64 => to_pyarray!(MutablePrimitiveArray<f64>, PrimitiveArray<f64>),
DataType::FixedSizeBinary(_) => {
to_pyarray!(MutableFixedSizeBinaryArray, FixedSizeBinaryArray)
}
_ => unimplemented!(
"Dtype: {:?} not implemented for conversion to numpy",
&self.dtype
),
}
}
}

fn write_table_to_parquet(table: Table, path: &str) -> Result<()> {
Expand Down Expand Up @@ -198,14 +137,17 @@ pub mod postgresql {
MutableBinaryArray, MutableBooleanArray, MutableFixedSizeBinaryArray,
MutablePrimitiveArray, MutableUtf8Array,
};
use arrow2::datatypes::{DataType, TimeUnit};

use postgres as pg;
use postgres::fallible_iterator::FallibleIterator;
use postgres::types::Type;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use std::collections::BTreeMap;
use std::{iter::Iterator, net::IpAddr};
use time;
use time::{self, format_description};

const UNIX_EPOCH: time::OffsetDateTime = time::OffsetDateTime::UNIX_EPOCH;

pub fn read_sql(client: &mut pg::Client, sql: &str) -> Result<Table> {
let mut row_iter = client.query_raw::<_, &i32, _>(sql, &[])?;
Expand Down Expand Up @@ -305,49 +247,89 @@ pub mod postgresql {
&Type::TIMESTAMP => {
table
.entry(column_name)
.or_insert_with(|| Column::new(MutableUtf8Array::<i32>::new()))
.push::<_, MutableUtf8Array<i32>>(
row.get::<_, Option<time::PrimitiveDateTime>>(idx)
.map(|v| v.to_string()),
.or_insert_with(|| {
Column::new(
MutablePrimitiveArray::<i64>::new()
.to(DataType::Timestamp(TimeUnit::Microsecond, None)),
)
})
.push::<_, MutablePrimitiveArray<i64>>(
row.get::<_, Option<time::PrimitiveDateTime>>(idx).map(|v| {
let diff =
(UNIX_EPOCH - v.assume_utc()).whole_microseconds() as i64;
if v.assume_utc() > UNIX_EPOCH {
diff.abs()
} else {
diff
}
}),
)?;
}
&Type::TIMESTAMPTZ => {
let format = time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]:[offset_minute]").unwrap();
let value = row.get::<_, Option<time::OffsetDateTime>>(idx);
let format =
format_description::parse("[offset_hour sign:mandatory]:[offset_minute]")
.unwrap();
table
.entry(column_name)
.or_insert_with(|| Column::new(MutableUtf8Array::<i32>::new()))
.push::<_, MutableUtf8Array<i32>>(
row.get::<_, Option<time::OffsetDateTime>>(idx)
.map(|v| v.format(&format).unwrap()),
)?;
.or_insert_with(|| {
if value.is_none() {
unimplemented!(
"Handle case where first row of TZ aware timestamp is null."
)
}
Column::new(MutablePrimitiveArray::<i64>::new().to(
DataType::Timestamp(
TimeUnit::Microsecond,
value.map(|v| v.offset().format(&format).unwrap()),
),
))
})
.push::<_, MutablePrimitiveArray<i64>>(value.map(|v| {
let diff = (UNIX_EPOCH - v).whole_microseconds() as i64;
if v > UNIX_EPOCH {
diff.abs()
} else {
diff
}
}))?;
}
&Type::DATE => {
table
.entry(column_name)
.or_insert_with(|| Column::new(MutableUtf8Array::<i32>::new()))
.push::<_, MutableUtf8Array<i32>>(
row.get::<_, Option<time::Date>>(idx).map(|v| v.to_string()),
.or_insert_with(|| {
Column::new(MutablePrimitiveArray::<i32>::new().to(DataType::Date32))
})
.push::<_, MutablePrimitiveArray<i32>>(
row.get::<_, Option<time::Date>>(idx).map(|v| {
let days = (UNIX_EPOCH.date() - v).whole_days() as i32;
if v > UNIX_EPOCH.date() {
days.abs()
} else {
days
}
}),
)?;
}
&Type::TIME => {
&Type::TIME | &Type::TIMETZ => {
table
.entry(column_name)
.or_insert_with(|| Column::new(MutableUtf8Array::<i32>::new()))
.push::<_, MutableUtf8Array<i32>>(
row.get::<_, Option<time::Time>>(idx).map(|v| v.to_string()),
)?;
}
&Type::TIMETZ => {
// TIMETZ is 12 bytes; Fixed size binary array then since no DataType matches
table
.entry(column_name)
.or_insert_with(|| Column::new(MutableUtf8Array::<i32>::new()))
.push::<_, MutableUtf8Array<i32>>(
row.get::<_, Option<time::Time>>(idx).map(|v| v.to_string()),
.or_insert_with(|| {
Column::new(
MutablePrimitiveArray::<i64>::new()
.to(DataType::Time64(TimeUnit::Microsecond)),
)
})
.push::<_, MutablePrimitiveArray<i64>>(
row.get::<_, Option<time::Time>>(idx).map(|v| {
let (h, m, s, micro) = v.as_hms_micro();
let seconds = (h as i64 * 60 * 60) + (m as i64 * 60) + s as i64;
micro as i64 + seconds * 1_000_000
}),
)?;
}
&Type::INTERVAL => {
// INTERVAL is 16 bytes; Fixed size binary array then sinece i128 not impl FromSql
// INTERVAL is 16 bytes; Fixed size binary array then since i128 not impl FromSql
table
.entry(column_name)
.or_insert_with(|| Column::new(MutableFixedSizeBinaryArray::new(16)))
Expand Down

0 comments on commit 61a96ca

Please sign in to comment.