Skip to content

Commit

Permalink
Datetime, Date, and Time support (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger authored Oct 25, 2021
1 parent c173da5 commit c8ca46e
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 21 deletions.
49 changes: 35 additions & 14 deletions benchmarks/test_benchmarks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import numpy as np
import pandas as pd
import connectorx as cx
from memory_profiler import profile
from sqlalchemy import create_engine
from flaco.io import Database, read_sql
Expand All @@ -26,7 +27,7 @@
)


@pytest.mark.parametrize("loader", ("pandas", "flaco"))
@pytest.mark.parametrize("loader", ("pandas", "flaco", "connectorx"))
def test_basic(benchmark, loader: str):
def _read_all_tables(func, sql, con):
for table in DB_TABLES:
Expand All @@ -35,36 +36,37 @@ def _read_all_tables(func, sql, con):
# as numpy arrays default to no copy.
if loader == "pandas":
func(sql.format(table=table), con)
elif loader == "connectorx":
func(DB_URI, sql.format(table=table))
else:
pd.DataFrame(func(sql.format(table=table), con))
pd.DataFrame(func(sql.format(table=table), con), copy=False)

if loader == "pandas":
engine = create_engine(DB_URI)
benchmark(
_read_all_tables, pd.read_sql, "select * from {table}", engine,
)
elif loader == "connectorx":
benchmark(
_read_all_tables, cx.read_sql,"select * from {table}", DB_URI
)
else:
with Database(DB_URI) as con:
benchmark(_read_all_tables, read_sql, "select * from {table}", con)


@pytest.mark.parametrize("loader", ("pandas", "flaco"))
@pytest.mark.parametrize("loader", ("pandas", "flaco", "connectorx"))
@pytest.mark.parametrize(
"n_rows", np.arange(100_000, 1_000_000, 100_000), ids=lambda val: f"rows={val}"
)
def test_incremental_size(benchmark, loader: str, n_rows: int):
n_cols = 5
table = "test_table"
engine = create_engine(DB_URI)

data = np.random.randint(0, 100_000, size=n_rows * n_cols).reshape((-1, n_cols))
pd.DataFrame(data).to_sql(
table, index=False, con=engine, chunksize=10_000, if_exists="replace"
)
table = _table_setup(n_rows=n_rows, include_nulls=False)

if loader == "pandas":
engine = create_engine(DB_URI)
benchmark(lambda *args: pd.read_sql(*args), f"select * from {table}", engine)
elif loader == "connectorx":
benchmark(cx.read_sql, DB_URI, f"select * from {table}", return_type="pandas")
else:
with Database(DB_URI) as con:
benchmark(
Expand All @@ -79,7 +81,20 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False):
engine = create_engine(DB_URI)

engine.execute(f"drop table if exists {table}")
engine.execute(f"create table {table} (col1 int, col2 int8, col3 float8, col4 float4, col5 text, col6 bytea)")
engine.execute(f"""
create table {table} (
col1 int,
col2 int8,
col3 float8,
col4 float4,
col5 text,
col6 bytea,
col7 date,
col8 timestamp without time zone,
col9 timestamp with time zone,
col10 time
)
""")

df = pd.DataFrame()
df["col1"] = np.random.randint(0, 1000, size=n_rows).astype(np.int32)
Expand All @@ -88,18 +103,24 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False):
df["col4"] = df.col1.astype(np.float64)
df["col5"] = df.col1.astype(str) + "-hello"
df["col6"] = df.col1.astype(bytes)
df["col7"] = pd.date_range('2000-01-01', '2001-01-01', periods=len(df))
df["col8"] = pd.to_datetime(df.col7)
df["col9"] = pd.to_datetime(df.col7, utc=True)
df["col10"] = df.col9.dt.time
df.to_sql(table, index=False, con=engine, chunksize=10_000, if_exists="append")

if include_nulls:
df = df[:20]
df.loc[:, :] = None
df.to_sql(table, index=False, con=engine, if_exists="append")

return table

@profile
def memory_profile():
stmt = "select * from test_table"

_cx_df = cx.read_sql(DB_URI, stmt, return_type="pandas")

with Database(DB_URI) as con:
data = read_sql(stmt, con)
_flaco_df = pd.DataFrame(data, copy=False)
Expand All @@ -108,5 +129,5 @@ def memory_profile():
_pandas_df = pd.read_sql(stmt, engine)

if __name__ == "__main__":
_table_setup(n_rows=2_000_000, include_nulls=False)
_table_setup(n_rows=500_000, include_nulls=False)
memory_profile()
55 changes: 55 additions & 0 deletions flaco/flaco.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,48 @@ typedef struct {
uint32_t len;
} BytesPtr;

typedef struct {
int32_t year;
uint8_t month;
uint8_t day;
} DateInfo;

typedef struct {
uint8_t hour;
uint8_t minute;
uint8_t second;
uint32_t usecond;
} TimeInfo;

typedef struct {
DateInfo date;
TimeInfo time;
} DateTimeInfo;

typedef struct {
int8_t hours;
int8_t minutes;
int8_t seconds;
bool is_positive;
} TzInfo;

typedef struct {
DateInfo date;
TimeInfo time;
TzInfo tz;
} DateTimeTzInfo;

typedef struct {
char *ptr;
uint32_t len;
} StringPtr;

typedef enum {
Bytes,
Date,
DateTime,
DateTimeTz,
Time,
Boolean,
Decimal,
Int8,
Expand All @@ -38,6 +73,22 @@ typedef struct {
BytesPtr _0;
} Bytes_Body;

typedef struct {
DateInfo _0;
} Date_Body;

typedef struct {
DateTimeInfo _0;
} DateTime_Body;

typedef struct {
DateTimeTzInfo _0;
} DateTimeTz_Body;

typedef struct {
TimeInfo _0;
} Time_Body;

typedef struct {
bool _0;
} Boolean_Body;
Expand Down Expand Up @@ -82,6 +133,10 @@ typedef struct {
Data_Tag tag;
union {
Bytes_Body bytes;
Date_Body date;
DateTime_Body date_time;
DateTimeTz_Body date_time_tz;
Time_Body time;
Boolean_Body boolean;
Decimal_Body decimal;
Int8_Body int8;
Expand Down
46 changes: 46 additions & 0 deletions flaco/includes.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,39 @@ cdef extern from "./flaco.h":
char *ptr
np.uint32_t len

ctypedef struct DateInfo:
int year
int month
int day

ctypedef struct TimeInfo:
int hour
int minute
int second
int usecond

ctypedef struct DateTimeInfo:
DateInfo date
TimeInfo time

ctypedef struct TzInfo:
int hours
int minutes
int seconds
bool is_positive

ctypedef struct DateTimeTzInfo:
DateInfo date
TimeInfo time
TzInfo tz

ctypedef enum Data_Tag:
Bytes
Boolean
Date
DateTime
DateTimeTz
Time
Decimal
Int8
Int16
Expand All @@ -37,6 +67,18 @@ cdef extern from "./flaco.h":
ctypedef struct Boolean_Body:
bool _0

ctypedef struct Date_Body:
DateInfo _0

ctypedef struct DateTime_Body:
DateTimeInfo _0

ctypedef struct DateTimeTz_Body:
DateTimeTzInfo _0

ctypedef struct Time_Body:
TimeInfo _0

ctypedef struct Decimal_Body:
np.float64_t _0

Expand Down Expand Up @@ -69,6 +111,10 @@ cdef extern from "./flaco.h":

Bytes_Body bytes
Boolean_Body boolean
Date_Body date
DateTime_Body date_time
DateTimeTz_Body date_time_tz
Time_Body time
Decimal_Body decimal
Int8_Body int8
Int16_Body int16
Expand Down
68 changes: 66 additions & 2 deletions flaco/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@

cimport numpy as np
import numpy as np
import datetime as dt
from libc.stdlib cimport malloc, free
cimport cpython.datetime as dt
from cython.operator cimport dereference as deref
from flaco cimport includes as lib


np.import_array()
dt.import_datetime()

cdef extern from "Python.h":
object PyUnicode_InternFromString(char *v)

cpdef dict read_sql(str stmt, Database db, int n_rows=-1):
cdef bytes stmt_bytes = stmt.encode("utf-8")
Expand Down Expand Up @@ -120,11 +125,19 @@ cdef np.ndarray array_init(lib.Data data, int len):
elif data.tag == lib.Data_Tag.Boolean:
array = np.empty(shape=len, dtype=bool)
elif data.tag == lib.Data_Tag.Bytes:
array = np.empty(shape=len, dtype=bytearray)
array = np.empty(shape=len, dtype=bytes)
elif data.tag == lib.Data_Tag.Decimal:
array = np.empty(shape=len, dtype=np.float64)
elif data.tag == lib.Data_Tag.Null:
array = np.empty(shape=len, dtype=object)
elif data.tag == lib.Data_Tag.Date:
array = np.empty(shape=len, dtype=dt.date)
elif data.tag == lib.Data_Tag.DateTime:
array = np.empty(shape=len, dtype=dt.datetime)
elif data.tag == lib.Data_Tag.DateTimeTz:
array = np.empty(shape=len, dtype=dt.datetime)
elif data.tag == lib.Data_Tag.Time:
array = np.empty(shape=len, dtype=dt.time)
else:
raise ValueError(f"Unsupported tag: {data.tag}")
return array
Expand All @@ -133,6 +146,8 @@ cdef np.ndarray array_init(lib.Data data, int len):
cdef np.ndarray insert_data_into_array(lib.Data data, np.ndarray arr, int idx):
cdef np.ndarray[np.uint8_t, ndim=1] arr_bytes
cdef np.npy_intp intp
cdef dt.timedelta delta
cdef object tzinfo

if data.tag == lib.Data_Tag.Boolean:
arr[idx] = data.boolean._0
Expand Down Expand Up @@ -163,9 +178,58 @@ cdef np.ndarray insert_data_into_array(lib.Data data, np.ndarray arr, int idx):
arr[idx] = data.float32._0

elif data.tag == lib.Data_Tag.String:
arr[idx] = data.string._0.ptr[:data.string._0.len].decode()
arr[idx] = PyUnicode_InternFromString(<char*>data.string._0.ptr)
free(data.string._0.ptr)

elif data.tag == lib.Data_Tag.Date:
arr[idx] = dt.date_new(
data.date._0.year,
data.date._0.month,
data.date._0.day
)

elif data.tag == lib.Data_Tag.DateTime:
arr[idx] = dt.datetime_new(
data.date_time._0.date.year,
data.date_time._0.date.month,
data.date_time._0.date.day,
data.date_time._0.time.hour,
data.date_time._0.time.minute,
data.date_time._0.time.second,
data.date_time._0.time.usecond,
None
)

elif data.tag == lib.Data_Tag.DateTimeTz:
delta = dt.timedelta_new(
data.date_time_tz._0.tz.hours,
data.date_time_tz._0.tz.minutes,
data.date_time_tz._0.tz.seconds
)
if data.date_time_tz._0.tz.is_positive:
tzinfo = dt.timezone(delta)
else:
tzinfo = dt.timezone(-delta)
arr[idx] = dt.datetime_new(
data.date_time_tz._0.date.year,
data.date_time_tz._0.date.month,
data.date_time_tz._0.date.day,
data.date_time_tz._0.time.hour,
data.date_time_tz._0.time.minute,
data.date_time_tz._0.time.second,
data.date_time_tz._0.time.usecond,
tzinfo
)

elif data.tag == lib.Data_Tag.Time:
arr[idx] = dt.time_new(
data.time._0.hour,
data.time._0.minute,
data.time._0.second,
data.time._0.usecond,
None
)

elif data.tag == lib.Data_Tag.Decimal:
arr[idx] = data.decimal._0

Expand Down
Loading

0 comments on commit c8ca46e

Please sign in to comment.