Skip to content

Commit

Permalink
feat(electric): Add support for timestamp and timestamptz column types (
Browse files Browse the repository at this point in the history
#330)

Closes VAX-818.
  • Loading branch information
alco authored Sep 14, 2023
1 parent a53395d commit 3c47193
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 150 deletions.
5 changes: 5 additions & 0 deletions .changeset/neat-laws-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": minor
---

Implement support for electrifying and syncing tables that have columns types timestamp and timestamptz
2 changes: 2 additions & 0 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ function deserializeColumnData(
switch (columnType) {
case 'CHAR':
case 'TEXT':
case 'TIMESTAMP':
case 'TIMESTAMPTZ':
case 'UUID':
case 'VARCHAR':
return typeDecoder.text(column)
Expand Down
10 changes: 7 additions & 3 deletions clients/typescript/src/util/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ setGlobalUUID(

export const typeDecoder = {
number: bytesToNumber,
text: (bytes: Uint8Array) => new TextDecoder().decode(bytes),
text: bytesToString,
}

export const typeEncoder = {
Expand Down Expand Up @@ -47,14 +47,18 @@ export function numberToBytes(i: number) {
)
}

export function bytesToNumber(bs: Uint8Array) {
export function bytesToNumber(bytes: Uint8Array) {
let n = 0
for (const byte of bs.values()) {
for (const byte of bytes.values()) {
n = (n << 8) | byte
}
return n
}

export function bytesToString(bytes: Uint8Array) {
return new TextDecoder().decode(bytes)
}

export function uuid() {
return (globalThis as any).uuid()
}
Expand Down
154 changes: 110 additions & 44 deletions components/electric/lib/electric/satellite/serialization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule Electric.Satellite.Serialization do
import Electric.Postgres.Extension,
only: [is_migration_relation: 1, is_ddl_relation: 1, is_extension_relation: 1]

import Bitwise

require Logger

@type relation_mapping() ::
Expand All @@ -25,6 +27,7 @@ defmodule Electric.Satellite.Serialization do
int2 int4 int8
float8
text
timestamp timestamptz
varchar
]a
end
Expand Down Expand Up @@ -123,8 +126,7 @@ defmodule Electric.Satellite.Serialization do

known_relations =
Enum.reduce(add_relations, state.known_relations, fn relation, known ->
{_relation_id, _column_names, known} = load_new_relation(relation, known)

{_relation_id, _columns, known} = load_new_relation(relation, known)
known
end)

Expand Down Expand Up @@ -160,8 +162,8 @@ defmodule Electric.Satellite.Serialization do

{rel_id, rel_cols, new_relations, known_relations} =
case fetch_relation_id(relation, known_relations) do
{:new, {relation_id, columns, known}} ->
{relation_id, columns, [relation | new_relations], known}
{:new, {relation_id, columns, known_relations}} ->
{relation_id, columns, [relation | new_relations], known_relations}

{:existing, {relation_id, columns}} ->
{relation_id, columns, new_relations, known_relations}
Expand Down Expand Up @@ -221,39 +223,70 @@ defmodule Electric.Satellite.Serialization do
%SatTransOp{op: {:delete, op_delete}}
end

@spec map_to_row(%{String.t() => binary()} | nil, [String.t()]) :: %SatOpRow{}
def map_to_row(nil, _), do: nil
@spec map_to_row(%{String.t() => binary()} | nil, [map], Keyword.t()) :: %SatOpRow{}
def map_to_row(data, cols, opts \\ [])

def map_to_row(data, rel_cols) when is_list(rel_cols) and is_map(data) do
bitmask = []
values = []
def map_to_row(data, cols, opts) when is_list(cols) and is_map(data) do
encode_value_fn =
if opts[:skip_value_encoding?] do
fn val, _type -> val end
else
&encode_column_value/2
end

{num_columns, bitmask, values} =
Enum.reduce(rel_cols, {0, bitmask, values}, fn column_name, {num, bitmask0, values0} ->
{values, {bitmask, num_cols}} =
Enum.map_reduce(cols, {0, 0}, fn col, {bitmask, num_cols} ->
# FIXME: This is inefficient, data should be stored in order, so that we
# do not have to do lookup here, but filter columns based on the schema instead
case Map.get(data, column_name, nil) do
case Map.get(data, col.name) do
nil ->
{num + 1, [1 | bitmask0], [<<>> | values0]}
{"", {bor(bitmask <<< 1, 1), num_cols + 1}}

value when is_binary(value) ->
{num + 1, [0 | bitmask0], [value | values0]}
val when is_binary(val) ->
{encode_value_fn.(val, col.type), {bitmask <<< 1, num_cols + 1}}
end
end)

bitmask =
case rem(num_columns, 8) do
0 -> bitmask
n -> :lists.duplicate(8 - n, 0) ++ bitmask
end

bitmask = for i <- Enum.reverse(bitmask), do: <<i::1>>, into: <<>>

%SatOpRow{nulls_bitmask: bitmask, values: Enum.reverse(values)}
%SatOpRow{nulls_bitmask: encode_nulls_bitmask(bitmask, num_cols), values: values}
end

# Values of type `timestamp` are coming over Postgres' logical replication stream in the following form:
#
# 2023-08-14 14:01:28.848242
#
# We don't need to do conversion on those values before passing them on to Satellite clients, so we let the catch-all
# function clause handle those. Values of type `timestamptz`, however, are encoded as follows:
#
# 2023-08-14 10:01:28.848242+00
#
# This is not valid syntax for SQLite's builtin datetime functions and we would like to avoid letting the Satellite
# protocol propagate Postgres' data formatting quirks to clients. So a minor conversion step is done here to replace
# `+00` with `Z` so that the whole string becomes conformant with ISO-8601.
#
# NOTE: We're ensuring the time zone offset is always `+00` by setting the `timezone` parameter to `'UTC'` before
# starting the replication stream.
defp encode_column_value(val, :timestamptz) do
{:ok, dt, 0} = DateTime.from_iso8601(val)
DateTime.to_string(dt)
end

# No-op encoding for the rest of supported types
defp encode_column_value(val, _type), do: val

defp encode_nulls_bitmask(bitmask, num_cols) do
case rem(num_cols, 8) do
0 ->
<<bitmask::size(num_cols)>>

n ->
extra_bits = 8 - n
bitmask = bitmask <<< extra_bits
<<bitmask::size(num_cols + extra_bits)>>
end
end

def fetch_relation_id(relation, known_relations) do
case Map.get(known_relations, relation, nil) do
case Map.get(known_relations, relation) do
nil ->
{:new, load_new_relation(relation, known_relations)}

Expand All @@ -264,9 +297,7 @@ defmodule Electric.Satellite.Serialization do

defp load_new_relation(relation, known_relations) do
%{oid: relation_id, columns: columns} = fetch_relation(relation)
column_names = for %{name: column_name} <- columns, do: column_name

{relation_id, column_names, Map.put(known_relations, relation, {relation_id, column_names})}
{relation_id, columns, Map.put(known_relations, relation, {relation_id, columns})}
end

defp fetch_relation(relation) do
Expand Down Expand Up @@ -368,15 +399,15 @@ defmodule Electric.Satellite.Serialization do
end

defp op_to_change(%SatOpInsert{row_data: row_data, tags: tags}, columns) do
%NewRecord{record: decode_record(row_data, columns), tags: tags}
%NewRecord{record: decode_record!(row_data, columns), tags: tags}
end

defp op_to_change(
%SatOpUpdate{row_data: row_data, old_row_data: nil, tags: tags},
columns
) do
%UpdatedRecord{
record: decode_record(row_data, columns, :allow_nulls),
record: decode_record!(row_data, columns, :allow_nulls),
old_record: nil,
tags: tags
}
Expand All @@ -387,8 +418,8 @@ defmodule Electric.Satellite.Serialization do
columns
) do
%UpdatedRecord{
record: decode_record(row_data, columns),
old_record: decode_record(old_row_data, columns),
record: decode_record!(row_data, columns),
old_record: decode_record!(old_row_data, columns),
tags: tags
}
end
Expand All @@ -399,22 +430,22 @@ defmodule Electric.Satellite.Serialization do

defp op_to_change(%SatOpDelete{old_row_data: old_row_data, tags: tags}, columns) do
%DeletedRecord{
old_record: decode_record(old_row_data, columns),
old_record: decode_record!(old_row_data, columns),
tags: tags
}
end

@spec decode_record(%SatOpRow{}, [String.t()], :allow_nulls | nil) ::
@spec decode_record!(%SatOpRow{}, [String.t()], :allow_nulls | nil) ::
%{String.t() => nil | String.t()} | nil
def decode_record(row, columns) do
decode_record(row, columns, nil)
def decode_record!(row, columns) do
decode_record!(row, columns, nil)
end

defp decode_record(nil, _columns, _opt) do
defp decode_record!(nil, _columns, _opt) do
raise "protocol violation, empty row"
end

defp decode_record(%SatOpRow{nulls_bitmask: bitmask, values: values}, columns, opt) do
defp decode_record!(%SatOpRow{nulls_bitmask: bitmask, values: values}, columns, opt) do
decode_values(values, bitmask, columns, opt)
|> Map.new()
end
Expand All @@ -424,7 +455,7 @@ defmodule Electric.Satellite.Serialization do
defp decode_values([val | values], <<0::1, bitmask::bits>>, [col | columns], opt)
when is_binary(val) do
[
{col.name, decode_column_value(val, col.type)}
{col.name, decode_column_value!(val, col.type)}
| decode_values(values, bitmask, columns, opt)
]
end
Expand All @@ -442,13 +473,13 @@ defmodule Electric.Satellite.Serialization do
Given a column value received from a Satellite client, transcode it into the format that can be fed into Postgres'
logical replication stream (aka "server-native format").
"""
@spec decode_column_value(binary, atom) :: binary
@spec decode_column_value!(binary, atom) :: binary

def decode_column_value(val, type) when type in [:bytea, :text, :varchar] do
def decode_column_value!(val, type) when type in [:bytea, :text, :varchar] do
val
end

def decode_column_value(val, type) when type in [:int2, :int4, :int8] do
def decode_column_value!(val, type) when type in [:int2, :int4, :int8] do
:ok =
val
|> String.to_integer()
Expand All @@ -457,12 +488,33 @@ defmodule Electric.Satellite.Serialization do
val
end

def decode_column_value(val, :float8) do
def decode_column_value!(val, :float8) do
_ = String.to_float(val)
val
end

def decode_column_value(val, :uuid) do
def decode_column_value!(val, :timestamp) do
# NaiveDateTime silently discards time zone offset if it is present in the string. But we want to reject such strings
# because values of type `timestamp` must not have an offset.
{:error, :missing_offset} = DateTime.from_iso8601(val)

dt = NaiveDateTime.from_iso8601!(val)
:ok = assert_year_in_range(dt.year)

val
end

def decode_column_value!(val, :timestamptz) do
# The offset of datetimes coming over the Satellite protocol MUST be 0.
true = String.ends_with?(val, "Z")

{:ok, dt, 0} = DateTime.from_iso8601(val)
:ok = assert_year_in_range(dt.year)

val
end

def decode_column_value!(val, :uuid) do
{:ok, uuid} = Electric.Utils.validate_uuid(val)
uuid
end
Expand All @@ -474,4 +526,18 @@ defmodule Electric.Satellite.Serialization do
defp assert_integer_in_range!(int, :int2) when int in @int2_range, do: :ok
defp assert_integer_in_range!(int, :int4) when int in @int4_range, do: :ok
defp assert_integer_in_range!(int, :int8) when int in @int8_range, do: :ok

# Postgres[1] uses BC/AD suffixes to indicate whether the date is in the Common Era or precedes it. Postgres assumes year
# 0 did not exist, so in its worldview '0001-12-31 BC' is immediately followed by '0001-01-01'.
#
# In SQLite[2], the builtin functions for working with dates and times only work for dates between '0001-01-01 00:00:00'
# and '9999-12-31 23:59:59'.
#
# To be conservative in our validations and not let invalid values slip through by accident, we're limiting the range
# of supported dates to start on '0001-01-01` and end on '9999-12-31'. This applies to :date, :timestamp, and
# :timestamptz types.
#
# [1]: https://www.postgresql.org/docs/current/datatype-datetime.html
# [2]: https://www.sqlite.org/lang_datefunc.html
defp assert_year_in_range(year) when year in 1..9999, do: :ok
end
7 changes: 3 additions & 4 deletions components/electric/lib/satellite/protocol_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ defmodule Satellite.ProtocolHelpers do

def insert(table, data) when is_map(data) do
schema = schema(table)
columns = Enum.map(schema.columns, & &1.name)

columns = schema.columns
%SatOpInsert{relation_id: schema.oid, row_data: Serialization.map_to_row(data, columns)}
end

def update(table, pk, old_data, new_data, tags \\ [])
when is_list(tags) and is_map(pk) and is_map(old_data) and is_map(new_data) do
schema = schema(table)
columns = Enum.map(schema.columns, & &1.name)
columns = schema.columns

%SatOpUpdate{
relation_id: schema.oid,
Expand All @@ -75,7 +74,7 @@ defmodule Satellite.ProtocolHelpers do

def delete(table, old_data, tags \\ []) when is_list(tags) and is_map(old_data) do
schema = schema(table)
columns = Enum.map(schema.columns, & &1.name)
columns = schema.columns

%SatOpDelete{
relation_id: schema.oid,
Expand Down
8 changes: 5 additions & 3 deletions components/electric/test/electric/postgres/extension_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ defmodule Electric.Postgres.ExtensionTest do
num8a INT8,
num8b BIGINT,
real8a FLOAT8,
real8b DOUBLE PRECISION
real8b DOUBLE PRECISION,
ts TIMESTAMP,
tstz TIMESTAMPTZ
);
CALL electric.electrify('public.t1');
""")
Expand All @@ -454,7 +456,7 @@ defmodule Electric.Postgres.ExtensionTest do
c1 CHARACTER,
c2 CHARACTER(11),
c3 VARCHAR(11),
created_at TIMESTAMP
created_at TIMETZ
);
CALL electric.electrify('public.t1');
""")
Expand All @@ -466,7 +468,7 @@ defmodule Electric.Postgres.ExtensionTest do
"c1" character(1)
"c2" character(11)
"c3" character varying(11)
"created_at" timestamp without time zone
"created_at" time with time zone
"""
|> String.trim()
end
Expand Down
Loading

0 comments on commit 3c47193

Please sign in to comment.