Skip to content

Commit

Permalink
Fix Date32 encoding and remove supoprt for Date64 (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb authored Oct 18, 2023
1 parent 51dc755 commit f9749f8
Show file tree
Hide file tree
Showing 49 changed files with 157 additions and 131 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ There's no reason we can't support struct data types as well.
| Timestamp(Millisecond) | TIMESTAMP |
| Timestamp(Second) | TIMESTAMP |
| Date32 | DATE |
| Date64 | DATE |
| Date64 | Not supported |
| Time32(Millisecond) | TIME |
| Time32(Second) | TIME |
| Time64(Nanosecond) | Not supported |
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgpq"
version = "0.8.0"
version = "0.9.0"
edition = "2021"
description = "Encode Apache Arrow `RecordBatch`es to Postgres' native binary format"
license = "MIT"
Expand Down
186 changes: 108 additions & 78 deletions core/src/encoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub enum Encoder<'a> {
TimestampMillisecond(TimestampMillisecondEncoder<'a>),
TimestampSecond(TimestampSecondEncoder<'a>),
Date32(Date32Encoder<'a>),
Date64(Date64Encoder<'a>),
Time32Millisecond(Time32MillisecondEncoder<'a>),
Time32Second(Time32SecondEncoder<'a>),
Time64Microsecond(Time64MicrosecondEncoder<'a>),
Expand Down Expand Up @@ -235,23 +234,54 @@ impl_encode!(
BufMut::put_f64
);

const ONE_S_TO_MS: i64 = 1_000;
const ONE_S_TO_US: i64 = 1_000_000;

// Postgres starts counting on Jan 1st 2000
// This is Jan 1st 2000 relative to the UNIX Epoch in us
const POSTGRES_BASE_TIMESTAMP_S: i64 = 946_684_800;
const POSTGRES_BASE_TIMESTAMP_MS: i64 = POSTGRES_BASE_TIMESTAMP_S * ONE_S_TO_MS;
const POSTGRES_BASE_TIMESTAMP_US: i64 = POSTGRES_BASE_TIMESTAMP_S * ONE_S_TO_US;
const PG_BASE_TIMESTAMP_OFFSET_US: i64 = 946_684_800_000_000; // microseconds between 2000-01-01 at midnight (Postgres's epoch) and 1970-01-01 (Arrow's / UNIX epoch)
const PG_BASE_TIMESTAMP_OFFSET_MS: i64 = 946_684_800_000; // milliseconds between 2000-01-01 at midnight (Postgres's epoch) and 1970-01-01 (Arrow's / UNIX epoch)
const PG_BASE_TIMESTAMP_OFFSET_S: i64 = 946_684_800; // seconds between 2000-01-01 at midnight (Postgres's epoch) and 1970-01-01 (Arrow's / UNIX epoch)

#[inline(always)]
fn convert_arrow_timestamp_microseconds_to_pg_timestamp(
_field: &str,
timestamp_us: i64,
) -> Result<i64, ErrorKind> {
// adjust the timestamp from microseconds since 1970-01-01 to microseconds since 2000-01-01 checking for overflows and underflow
timestamp_us
.checked_sub(PG_BASE_TIMESTAMP_OFFSET_US)
.ok_or_else(|| ErrorKind::Encode {
reason: "Underflow converting microseconds since 1970-01-01 (Arrow) to microseconds since 2000-01-01 (Postgres)".to_string(),
})
}

const NUM_US_PER_MS: i64 = 1_000;
const NUM_US_PER_S: i64 = 1_000_000;
/// Convert from Arrow timestamps (milliseconds since 1970-01-01) to Postgres timestamps (microseconds since 2000-01-01)
#[inline(always)]
fn convert_arrow_timestamp_milliseconds_to_pg_timestamp(
_field: &str,
timestamp_ms: i64,
) -> Result<i64, ErrorKind> {
let timestamp_ms = timestamp_ms.checked_sub(PG_BASE_TIMESTAMP_OFFSET_MS).ok_or_else(|| ErrorKind::Encode {
reason: "Underflow converting milliseconds since 1970-01-01 (Arrow) to microseconds since 2000-01-01 (Postgres)".to_string(),
})?;
// convert to microseconds, checking for overflows
timestamp_ms
.checked_mul(1_000)
.ok_or_else(|| ErrorKind::Encode {
reason: "Overflow converting milliseconds to microseconds".to_string(),
})
}

#[inline]
fn adjust_timestamp(val: i64, offset: i64) -> Result<i64, ErrorKind> {
val.sub_checked(offset).map_err(|_| ErrorKind::Encode {
reason: "Value too large to transmit".to_string(),
})
#[inline(always)]
fn convert_arrow_timestamp_seconds_to_pg_timestamp(
_field: &str,
timestamp_s: i64,
) -> Result<i64, ErrorKind> {
let timestamp_s = timestamp_s.checked_sub(PG_BASE_TIMESTAMP_OFFSET_S).ok_or_else(|| ErrorKind::Encode {
reason: "Underflow converting seconds since 1970-01-01 (Arrow) to microseconds since 2000-01-01 (Postgres)".to_string(),
})?;
// convert to microseconds, checking for overflows
timestamp_s
.checked_mul(1_000_000)
.ok_or_else(|| ErrorKind::Encode {
reason: "Overflow converting seconds to microseconds".to_string(),
})
}

#[derive(Debug)]
Expand All @@ -262,7 +292,7 @@ pub struct TimestampMicrosecondEncoder<'a> {
impl_encode_fallible!(
TimestampMicrosecondEncoder,
type_size_fixed(PostgresType::Timestamp.size()),
|_: &str, v: i64| adjust_timestamp(v, POSTGRES_BASE_TIMESTAMP_US),
convert_arrow_timestamp_microseconds_to_pg_timestamp,
BufMut::put_i64
);

Expand All @@ -274,15 +304,7 @@ pub struct TimestampMillisecondEncoder<'a> {
impl_encode_fallible!(
TimestampMillisecondEncoder,
type_size_fixed(PostgresType::Timestamp.size()),
|_: &str, v: i64| {
let v = adjust_timestamp(v, POSTGRES_BASE_TIMESTAMP_MS)?;
match v.mul_checked(NUM_US_PER_MS) {
Ok(v) => Ok(v),
Err(_) => Err(ErrorKind::Encode {
reason: "Overflow encoding millisecond timestamp as microseconds".to_string(),
}),
}
},
convert_arrow_timestamp_milliseconds_to_pg_timestamp,
BufMut::put_i64
);

Expand All @@ -294,69 +316,80 @@ pub struct TimestampSecondEncoder<'a> {
impl_encode_fallible!(
TimestampSecondEncoder,
type_size_fixed(PostgresType::Timestamp.size()),
|_: &str, v: i64| {
let v = adjust_timestamp(v, POSTGRES_BASE_TIMESTAMP_S)?;
match v.mul_checked(NUM_US_PER_S) {
Ok(v) => Ok(v),
Err(_) => Err(ErrorKind::Encode {
reason: "Overflow encoding seconds timestamp as microseconds".to_string(),
}),
}
},
convert_arrow_timestamp_seconds_to_pg_timestamp,
BufMut::put_i64
);

#[derive(Debug)]
pub struct Date32Encoder<'a> {
arr: &'a arrow_array::Date32Array,
const PG_BASE_DATE_OFFSET: i32 = 10_957; // Number of days between PostgreSQL's epoch (2000-01-01) and Arrow's / UNIX epoch (1970-01-01)

#[inline(always)]
fn convert_arrow_date32_to_postgres_date(_field: &str, date: i32) -> Result<i32, ErrorKind> {
// adjust the date from days since 1970-01-01 to days since 2000-01-01 checking for overflows and underflow
date.checked_sub(PG_BASE_DATE_OFFSET).ok_or_else(|| ErrorKind::Encode {
reason: "Underflow converting days since 1970-01-01 (Arrow) to days since 2000-01-01 (Postgres)".to_string(),
})
}
impl_encode!(Date32Encoder, 4, identity, BufMut::put_i32);

#[derive(Debug)]
pub struct Date64Encoder<'a> {
arr: &'a arrow_array::Date64Array,
pub struct Date32Encoder<'a> {
arr: &'a arrow_array::Date32Array,
field: String,
}
impl_encode_fallible!(
Date64Encoder,
type_size_fixed(PostgresType::Date.size()),
|_field: &str, v: i64| {
i32::try_from(v).map_err(|_| ErrorKind::Encode {
reason: "overflow converting 64 bit date to 32 bit date".to_string(),
})
},
Date32Encoder,
4,
convert_arrow_date32_to_postgres_date,
BufMut::put_i32
);

fn convert_arrow_time_seconds_to_postgres_time(
_field: &str,
time_s: i32,
) -> Result<i64, ErrorKind> {
// convert to microseconds, checking for overflows
let time_s = time_s as i64;
time_s
.checked_mul(1_000_000)
.ok_or_else(|| ErrorKind::Encode {
reason: "Overflow converting seconds to microseconds".to_string(),
})
}

fn convert_arrow_time_milliseconds_to_postgres_time(
_field: &str,
time_ms: i32,
) -> Result<i64, ErrorKind> {
// convert to microseconds, checking for overflows
let time_ms = time_ms as i64;
time_ms.checked_mul(1_000).ok_or_else(|| ErrorKind::Encode {
reason: "Overflow converting milliseconds to microseconds".to_string(),
})
}

#[derive(Debug)]
pub struct Time32MillisecondEncoder<'a> {
arr: &'a arrow_array::Time32MillisecondArray,
field: String,
}
impl_encode!(
impl_encode_fallible!(
Time32MillisecondEncoder,
type_size_fixed(PostgresType::Time.size()),
|v| (v as i64) * NUM_US_PER_MS,
convert_arrow_time_milliseconds_to_postgres_time,
BufMut::put_i64
);

#[derive(Debug)]
pub struct Time32SecondEncoder<'a> {
arr: &'a arrow_array::Time32SecondArray,
field: String,
}
impl_encode!(
impl_encode_fallible!(
Time32SecondEncoder,
type_size_fixed(PostgresType::Time.size()),
|v| (v as i64) * NUM_US_PER_S,
convert_arrow_time_seconds_to_postgres_time,
BufMut::put_i64
);

#[inline]
fn write_duration(buf: &mut BytesMut, duration_us: i64) {
buf.put_i64(duration_us);
buf.put_i32(0); // days
buf.put_i32(0); // months
}

#[derive(Debug)]
pub struct Time64MicrosecondEncoder<'a> {
arr: &'a arrow_array::Time64MicrosecondArray,
Expand All @@ -369,6 +402,16 @@ pub struct DurationMicrosecondEncoder<'a> {
}
impl_encode!(DurationMicrosecondEncoder, 16, identity, write_duration);

const NUM_US_PER_MS: i64 = 1_000;
const NUM_US_PER_S: i64 = 1_000_000;

#[inline]
fn write_duration(buf: &mut BytesMut, duration_us: i64) {
buf.put_i64(duration_us);
buf.put_i32(0); // days
buf.put_i32(0); // months
}

#[derive(Debug)]
pub struct DurationMillisecondEncoder<'a> {
arr: &'a arrow_array::DurationMillisecondArray,
Expand All @@ -379,7 +422,7 @@ impl_encode_fallible!(
type_size_fixed(PostgresType::Interval.size()),
|_: &str, v: i64| v.mul_checked(NUM_US_PER_MS).map_err(|_| {
ErrorKind::Encode {
reason: "Overflow encoding millisecond Duration as microseconds".to_string(),
reason: "Overflow encoding millisecond duration as microseconds".to_string(),
}
}),
write_duration
Expand All @@ -390,12 +433,13 @@ pub struct DurationSecondEncoder<'a> {
arr: &'a arrow_array::DurationSecondArray,
field: String,
}

impl_encode_fallible!(
DurationSecondEncoder,
type_size_fixed(PostgresType::Interval.size()),
|_: &str, v: i64| v.mul_checked(NUM_US_PER_S).map_err(|_| {
ErrorKind::Encode {
reason: "Overflow encoding second Duration as microseconds".to_string(),
reason: "Overflow encoding seconds duration as microseconds".to_string(),
}
}),
write_duration
Expand Down Expand Up @@ -832,31 +876,19 @@ impl_encoder_builder_stateless_with_field!(
pub struct Date32EncoderBuilder {
field: Arc<Field>,
}
impl_encoder_builder_stateless!(
impl_encoder_builder_stateless_with_field!(
Date32EncoderBuilder,
Encoder::Date32,
Date32Encoder,
PostgresType::Date,
|dt: &DataType| matches!(dt, DataType::Date32)
);

#[derive(Debug, Clone, PartialEq)]
pub struct Date64EncoderBuilder {
field: Arc<Field>,
}
impl_encoder_builder_stateless_with_field!(
Date64EncoderBuilder,
Encoder::Date64,
Date64Encoder,
PostgresType::Date,
|dt: &DataType| matches!(dt, DataType::Date64)
);

#[derive(Debug, Clone, PartialEq)]
pub struct Time32MillisecondEncoderBuilder {
field: Arc<Field>,
}
impl_encoder_builder_stateless!(
impl_encoder_builder_stateless_with_field!(
Time32MillisecondEncoderBuilder,
Encoder::Time32Millisecond,
Time32MillisecondEncoder,
Expand All @@ -868,7 +900,7 @@ impl_encoder_builder_stateless!(
pub struct Time32SecondEncoderBuilder {
field: Arc<Field>,
}
impl_encoder_builder_stateless!(
impl_encoder_builder_stateless_with_field!(
Time32SecondEncoderBuilder,
Encoder::Time32Second,
Time32SecondEncoder,
Expand Down Expand Up @@ -1147,7 +1179,6 @@ pub enum EncoderBuilder {
TimestampMillisecond(TimestampMillisecondEncoderBuilder),
TimestampSecond(TimestampSecondEncoderBuilder),
Date32(Date32EncoderBuilder),
Date64(Date64EncoderBuilder),
Time32Millisecond(Time32MillisecondEncoderBuilder),
Time32Second(Time32SecondEncoderBuilder),
Time64Microsecond(Time64MicrosecondEncoderBuilder),
Expand Down Expand Up @@ -1198,7 +1229,6 @@ impl EncoderBuilder {
TimeUnit::Second => Self::TimestampSecond(TimestampSecondEncoderBuilder { field }),
},
DataType::Date32 => Self::Date32(Date32EncoderBuilder { field }),
DataType::Date64 => Self::Date64(Date64EncoderBuilder { field }),
DataType::Time32(unit) => match unit {
TimeUnit::Millisecond => {
Self::Time32Millisecond(Time32MillisecondEncoderBuilder { field })
Expand Down
7 changes: 3 additions & 4 deletions core/tests/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class Col:
time_ms = floor(time_s * 1e3)
time_us = floor(time_s * 1e6)

date32 = 2**16 - 1

duration_s = 60
duration_ms = floor(duration_s * 1e3)
duration_us = floor(duration_s * 1e6)
Expand All @@ -44,10 +46,6 @@ class Col:
(pa.field("int16", pa.int16()), [-1, 0, 1]),
(pa.field("int32", pa.int32()), [-1, 0, 1]),
(pa.field("int64", pa.int64()), [-1, 0, 1]),
# (
# pa.field("float16", pa.float16()),
# [np.float16(v) for v in [-1, 0, 1, float("inf")]],
# ),
(pa.field("float32", pa.float32()), [-1, 0, 1, float("inf")]),
(pa.field("float64", pa.float64()), [-1, 0, 1, float("inf")]),
(pa.field("timestamp_us_notz", pa.timestamp("us", None)), [0, 1, timestamp_us]),
Expand All @@ -68,6 +66,7 @@ class Col:
(pa.field("time_s", pa.time32("s")), [0, 1, time_s]),
(pa.field("time_ms", pa.time32("ms")), [0, 1, time_ms]),
(pa.field("time_us", pa.time64("us")), [0, 1, time_us]),
(pa.field("date32", pa.date32()), [0, -date32, date32]),
(pa.field("duration_us", pa.duration("us")), [0, 1, duration_us]),
(pa.field("duration_ms", pa.duration("ms")), [0, 1, duration_ms]),
(pa.field("duration_s", pa.duration("s")), [0, 1, duration_s]),
Expand Down
Loading

0 comments on commit f9749f8

Please sign in to comment.