From 3c471937b69760a44c7290de9eaea7f5012141de Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 14 Sep 2023 14:57:29 +0300 Subject: [PATCH] feat(electric): Add support for timestamp and timestamptz column types (#330) Closes VAX-818. --- .changeset/neat-laws-wonder.md | 5 + clients/typescript/src/satellite/client.ts | 2 + clients/typescript/src/util/common.ts | 10 +- .../lib/electric/satellite/serialization.ex | 154 ++++++++---- .../lib/satellite/protocol_helpers.ex | 7 +- .../test/electric/postgres/extension_test.exs | 8 +- .../electric/satellite/serialization_test.exs | 223 ++++++++++++------ .../electric/satellite/ws_server_test.exs | 4 +- .../satellite/ws_validations_test.exs | 72 +++++- e2e/satellite_client/src/client.ts | 32 ++- ....13_node_satellite_can_sync_timestamps.lux | 85 +++++++ e2e/tests/_satellite_macros.luxinc | 9 + 12 files changed, 461 insertions(+), 150 deletions(-) create mode 100644 .changeset/neat-laws-wonder.md create mode 100644 e2e/tests/03.13_node_satellite_can_sync_timestamps.lux diff --git a/.changeset/neat-laws-wonder.md b/.changeset/neat-laws-wonder.md new file mode 100644 index 0000000000..cbfc1a18cf --- /dev/null +++ b/.changeset/neat-laws-wonder.md @@ -0,0 +1,5 @@ +--- +"@core/electric": minor +--- + +Implement support for electrifying and syncing tables that have columns types timestamp and timestamptz diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index 9c2186f5da..ceb87e711e 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -1047,6 +1047,8 @@ function deserializeColumnData( switch (columnType) { case 'CHAR': case 'TEXT': + case 'TIMESTAMP': + case 'TIMESTAMPTZ': case 'UUID': case 'VARCHAR': return typeDecoder.text(column) diff --git a/clients/typescript/src/util/common.ts b/clients/typescript/src/util/common.ts index f0d61dbc22..7a7add51b4 100644 --- a/clients/typescript/src/util/common.ts +++ b/clients/typescript/src/util/common.ts @@ -19,7 +19,7 @@ setGlobalUUID( export const typeDecoder = { number: bytesToNumber, - text: (bytes: Uint8Array) => new TextDecoder().decode(bytes), + text: bytesToString, } export const typeEncoder = { @@ -47,14 +47,18 @@ export function numberToBytes(i: number) { ) } -export function bytesToNumber(bs: Uint8Array) { +export function bytesToNumber(bytes: Uint8Array) { let n = 0 - for (const byte of bs.values()) { + for (const byte of bytes.values()) { n = (n << 8) | byte } return n } +export function bytesToString(bytes: Uint8Array) { + return new TextDecoder().decode(bytes) +} + export function uuid() { return (globalThis as any).uuid() } diff --git a/components/electric/lib/electric/satellite/serialization.ex b/components/electric/lib/electric/satellite/serialization.ex index 19d3e79531..d4e62cdd20 100644 --- a/components/electric/lib/electric/satellite/serialization.ex +++ b/components/electric/lib/electric/satellite/serialization.ex @@ -14,6 +14,8 @@ defmodule Electric.Satellite.Serialization do import Electric.Postgres.Extension, only: [is_migration_relation: 1, is_ddl_relation: 1, is_extension_relation: 1] + import Bitwise + require Logger @type relation_mapping() :: @@ -25,6 +27,7 @@ defmodule Electric.Satellite.Serialization do int2 int4 int8 float8 text + timestamp timestamptz varchar ]a end @@ -123,8 +126,7 @@ defmodule Electric.Satellite.Serialization do known_relations = Enum.reduce(add_relations, state.known_relations, fn relation, known -> - {_relation_id, _column_names, known} = load_new_relation(relation, known) - + {_relation_id, _columns, known} = load_new_relation(relation, known) known end) @@ -160,8 +162,8 @@ defmodule Electric.Satellite.Serialization do {rel_id, rel_cols, new_relations, known_relations} = case fetch_relation_id(relation, known_relations) do - {:new, {relation_id, columns, known}} -> - {relation_id, columns, [relation | new_relations], known} + {:new, {relation_id, columns, known_relations}} -> + {relation_id, columns, [relation | new_relations], known_relations} {:existing, {relation_id, columns}} -> {relation_id, columns, new_relations, known_relations} @@ -221,39 +223,70 @@ defmodule Electric.Satellite.Serialization do %SatTransOp{op: {:delete, op_delete}} end - @spec map_to_row(%{String.t() => binary()} | nil, [String.t()]) :: %SatOpRow{} - def map_to_row(nil, _), do: nil + @spec map_to_row(%{String.t() => binary()} | nil, [map], Keyword.t()) :: %SatOpRow{} + def map_to_row(data, cols, opts \\ []) - def map_to_row(data, rel_cols) when is_list(rel_cols) and is_map(data) do - bitmask = [] - values = [] + def map_to_row(data, cols, opts) when is_list(cols) and is_map(data) do + encode_value_fn = + if opts[:skip_value_encoding?] do + fn val, _type -> val end + else + &encode_column_value/2 + end - {num_columns, bitmask, values} = - Enum.reduce(rel_cols, {0, bitmask, values}, fn column_name, {num, bitmask0, values0} -> + {values, {bitmask, num_cols}} = + Enum.map_reduce(cols, {0, 0}, fn col, {bitmask, num_cols} -> # FIXME: This is inefficient, data should be stored in order, so that we # do not have to do lookup here, but filter columns based on the schema instead - case Map.get(data, column_name, nil) do + case Map.get(data, col.name) do nil -> - {num + 1, [1 | bitmask0], [<<>> | values0]} + {"", {bor(bitmask <<< 1, 1), num_cols + 1}} - value when is_binary(value) -> - {num + 1, [0 | bitmask0], [value | values0]} + val when is_binary(val) -> + {encode_value_fn.(val, col.type), {bitmask <<< 1, num_cols + 1}} end end) - bitmask = - case rem(num_columns, 8) do - 0 -> bitmask - n -> :lists.duplicate(8 - n, 0) ++ bitmask - end - - bitmask = for i <- Enum.reverse(bitmask), do: <>, into: <<>> - - %SatOpRow{nulls_bitmask: bitmask, values: Enum.reverse(values)} + %SatOpRow{nulls_bitmask: encode_nulls_bitmask(bitmask, num_cols), values: values} + end + + # Values of type `timestamp` are coming over Postgres' logical replication stream in the following form: + # + # 2023-08-14 14:01:28.848242 + # + # We don't need to do conversion on those values before passing them on to Satellite clients, so we let the catch-all + # function clause handle those. Values of type `timestamptz`, however, are encoded as follows: + # + # 2023-08-14 10:01:28.848242+00 + # + # This is not valid syntax for SQLite's builtin datetime functions and we would like to avoid letting the Satellite + # protocol propagate Postgres' data formatting quirks to clients. So a minor conversion step is done here to replace + # `+00` with `Z` so that the whole string becomes conformant with ISO-8601. + # + # NOTE: We're ensuring the time zone offset is always `+00` by setting the `timezone` parameter to `'UTC'` before + # starting the replication stream. + defp encode_column_value(val, :timestamptz) do + {:ok, dt, 0} = DateTime.from_iso8601(val) + DateTime.to_string(dt) + end + + # No-op encoding for the rest of supported types + defp encode_column_value(val, _type), do: val + + defp encode_nulls_bitmask(bitmask, num_cols) do + case rem(num_cols, 8) do + 0 -> + <> + + n -> + extra_bits = 8 - n + bitmask = bitmask <<< extra_bits + <> + end end def fetch_relation_id(relation, known_relations) do - case Map.get(known_relations, relation, nil) do + case Map.get(known_relations, relation) do nil -> {:new, load_new_relation(relation, known_relations)} @@ -264,9 +297,7 @@ defmodule Electric.Satellite.Serialization do defp load_new_relation(relation, known_relations) do %{oid: relation_id, columns: columns} = fetch_relation(relation) - column_names = for %{name: column_name} <- columns, do: column_name - - {relation_id, column_names, Map.put(known_relations, relation, {relation_id, column_names})} + {relation_id, columns, Map.put(known_relations, relation, {relation_id, columns})} end defp fetch_relation(relation) do @@ -368,7 +399,7 @@ defmodule Electric.Satellite.Serialization do end defp op_to_change(%SatOpInsert{row_data: row_data, tags: tags}, columns) do - %NewRecord{record: decode_record(row_data, columns), tags: tags} + %NewRecord{record: decode_record!(row_data, columns), tags: tags} end defp op_to_change( @@ -376,7 +407,7 @@ defmodule Electric.Satellite.Serialization do columns ) do %UpdatedRecord{ - record: decode_record(row_data, columns, :allow_nulls), + record: decode_record!(row_data, columns, :allow_nulls), old_record: nil, tags: tags } @@ -387,8 +418,8 @@ defmodule Electric.Satellite.Serialization do columns ) do %UpdatedRecord{ - record: decode_record(row_data, columns), - old_record: decode_record(old_row_data, columns), + record: decode_record!(row_data, columns), + old_record: decode_record!(old_row_data, columns), tags: tags } end @@ -399,22 +430,22 @@ defmodule Electric.Satellite.Serialization do defp op_to_change(%SatOpDelete{old_row_data: old_row_data, tags: tags}, columns) do %DeletedRecord{ - old_record: decode_record(old_row_data, columns), + old_record: decode_record!(old_row_data, columns), tags: tags } end - @spec decode_record(%SatOpRow{}, [String.t()], :allow_nulls | nil) :: + @spec decode_record!(%SatOpRow{}, [String.t()], :allow_nulls | nil) :: %{String.t() => nil | String.t()} | nil - def decode_record(row, columns) do - decode_record(row, columns, nil) + def decode_record!(row, columns) do + decode_record!(row, columns, nil) end - defp decode_record(nil, _columns, _opt) do + defp decode_record!(nil, _columns, _opt) do raise "protocol violation, empty row" end - defp decode_record(%SatOpRow{nulls_bitmask: bitmask, values: values}, columns, opt) do + defp decode_record!(%SatOpRow{nulls_bitmask: bitmask, values: values}, columns, opt) do decode_values(values, bitmask, columns, opt) |> Map.new() end @@ -424,7 +455,7 @@ defmodule Electric.Satellite.Serialization do defp decode_values([val | values], <<0::1, bitmask::bits>>, [col | columns], opt) when is_binary(val) do [ - {col.name, decode_column_value(val, col.type)} + {col.name, decode_column_value!(val, col.type)} | decode_values(values, bitmask, columns, opt) ] end @@ -442,13 +473,13 @@ defmodule Electric.Satellite.Serialization do Given a column value received from a Satellite client, transcode it into the format that can be fed into Postgres' logical replication stream (aka "server-native format"). """ - @spec decode_column_value(binary, atom) :: binary + @spec decode_column_value!(binary, atom) :: binary - def decode_column_value(val, type) when type in [:bytea, :text, :varchar] do + def decode_column_value!(val, type) when type in [:bytea, :text, :varchar] do val end - def decode_column_value(val, type) when type in [:int2, :int4, :int8] do + def decode_column_value!(val, type) when type in [:int2, :int4, :int8] do :ok = val |> String.to_integer() @@ -457,12 +488,33 @@ defmodule Electric.Satellite.Serialization do val end - def decode_column_value(val, :float8) do + def decode_column_value!(val, :float8) do _ = String.to_float(val) val end - def decode_column_value(val, :uuid) do + def decode_column_value!(val, :timestamp) do + # NaiveDateTime silently discards time zone offset if it is present in the string. But we want to reject such strings + # because values of type `timestamp` must not have an offset. + {:error, :missing_offset} = DateTime.from_iso8601(val) + + dt = NaiveDateTime.from_iso8601!(val) + :ok = assert_year_in_range(dt.year) + + val + end + + def decode_column_value!(val, :timestamptz) do + # The offset of datetimes coming over the Satellite protocol MUST be 0. + true = String.ends_with?(val, "Z") + + {:ok, dt, 0} = DateTime.from_iso8601(val) + :ok = assert_year_in_range(dt.year) + + val + end + + def decode_column_value!(val, :uuid) do {:ok, uuid} = Electric.Utils.validate_uuid(val) uuid end @@ -474,4 +526,18 @@ defmodule Electric.Satellite.Serialization do defp assert_integer_in_range!(int, :int2) when int in @int2_range, do: :ok defp assert_integer_in_range!(int, :int4) when int in @int4_range, do: :ok defp assert_integer_in_range!(int, :int8) when int in @int8_range, do: :ok + + # Postgres[1] uses BC/AD suffixes to indicate whether the date is in the Common Era or precedes it. Postgres assumes year + # 0 did not exist, so in its worldview '0001-12-31 BC' is immediately followed by '0001-01-01'. + # + # In SQLite[2], the builtin functions for working with dates and times only work for dates between '0001-01-01 00:00:00' + # and '9999-12-31 23:59:59'. + # + # To be conservative in our validations and not let invalid values slip through by accident, we're limiting the range + # of supported dates to start on '0001-01-01` and end on '9999-12-31'. This applies to :date, :timestamp, and + # :timestamptz types. + # + # [1]: https://www.postgresql.org/docs/current/datatype-datetime.html + # [2]: https://www.sqlite.org/lang_datefunc.html + defp assert_year_in_range(year) when year in 1..9999, do: :ok end diff --git a/components/electric/lib/satellite/protocol_helpers.ex b/components/electric/lib/satellite/protocol_helpers.ex index 0783da6aca..f860738b01 100644 --- a/components/electric/lib/satellite/protocol_helpers.ex +++ b/components/electric/lib/satellite/protocol_helpers.ex @@ -55,15 +55,14 @@ defmodule Satellite.ProtocolHelpers do def insert(table, data) when is_map(data) do schema = schema(table) - columns = Enum.map(schema.columns, & &1.name) - + columns = schema.columns %SatOpInsert{relation_id: schema.oid, row_data: Serialization.map_to_row(data, columns)} end def update(table, pk, old_data, new_data, tags \\ []) when is_list(tags) and is_map(pk) and is_map(old_data) and is_map(new_data) do schema = schema(table) - columns = Enum.map(schema.columns, & &1.name) + columns = schema.columns %SatOpUpdate{ relation_id: schema.oid, @@ -75,7 +74,7 @@ defmodule Satellite.ProtocolHelpers do def delete(table, old_data, tags \\ []) when is_list(tags) and is_map(old_data) do schema = schema(table) - columns = Enum.map(schema.columns, & &1.name) + columns = schema.columns %SatOpDelete{ relation_id: schema.oid, diff --git a/components/electric/test/electric/postgres/extension_test.exs b/components/electric/test/electric/postgres/extension_test.exs index f6ea4f302c..7518d286e3 100644 --- a/components/electric/test/electric/postgres/extension_test.exs +++ b/components/electric/test/electric/postgres/extension_test.exs @@ -437,7 +437,9 @@ defmodule Electric.Postgres.ExtensionTest do num8a INT8, num8b BIGINT, real8a FLOAT8, - real8b DOUBLE PRECISION + real8b DOUBLE PRECISION, + ts TIMESTAMP, + tstz TIMESTAMPTZ ); CALL electric.electrify('public.t1'); """) @@ -454,7 +456,7 @@ defmodule Electric.Postgres.ExtensionTest do c1 CHARACTER, c2 CHARACTER(11), c3 VARCHAR(11), - created_at TIMESTAMP + created_at TIMETZ ); CALL electric.electrify('public.t1'); """) @@ -466,7 +468,7 @@ defmodule Electric.Postgres.ExtensionTest do "c1" character(1) "c2" character(11) "c3" character varying(11) - "created_at" timestamp without time zone + "created_at" time with time zone """ |> String.trim() end diff --git a/components/electric/test/electric/satellite/serialization_test.exs b/components/electric/test/electric/satellite/serialization_test.exs index 19ea3564d8..76c02cc0e3 100644 --- a/components/electric/test/electric/satellite/serialization_test.exs +++ b/components/electric/test/electric/satellite/serialization_test.exs @@ -1,94 +1,167 @@ defmodule Electric.Satellite.SerializationTest do - alias Electric.Satellite.Serialization - - use Electric.Satellite.Protobuf use ExUnit.Case, async: true - alias Electric.Replication.Changes.Transaction + use Electric.Satellite.Protobuf alias Electric.Postgres.{Lsn, Schema, Extension.SchemaCache} + alias Electric.Replication.Changes.Transaction + alias Electric.Satellite.Serialization - test "test row serialization" do - data = %{"not_null" => <<"4">>, "null" => nil, "not_present" => <<"some other value">>} - columns = ["null", "this_columns_is_empty", "not_null"] + describe "map_to_row" do + test "encodes a map into a SatOpRow struct" do + uuid = Electric.Utils.uuid4() + + data = %{ + "not_null" => "4", + "null" => nil, + "not_present" => "some other value", + "int" => "13", + "var" => "...", + "real" => "-3.14", + "id" => uuid + } - serialized_data = Serialization.map_to_row(data, columns) + columns = [ + %{name: "null", type: :text}, + %{name: "this_columns_is_empty", type: :text}, + %{name: "not_null", type: :text}, + %{name: "id", type: :uuid}, + %{name: "int", type: :int4}, + %{name: "var", type: :varchar}, + %{name: "real", type: :float8} + ] + + assert %SatOpRow{ + values: ["", "", "4", uuid, "13", "...", "-3.14"], + nulls_bitmask: <<0b11000000>> + } == Serialization.map_to_row(data, columns) + end - expected = %SatOpRow{ - nulls_bitmask: <<1::1, 1::1, 0::1, 0::5>>, - values: [<<>>, <<>>, <<"4">>] - } + test "converts the +00 offset to Z in timestamptz values" do + data = %{ + "t1" => "2023-08-14 14:01:28.848242+00", + "t2" => "2023-08-14 10:01:28+00", + "t3" => "2023-08-13 18:30:00.123+00" + } - assert serialized_data == expected + columns = [ + %{name: "t1", type: :timestamptz}, + %{name: "t2", type: :timestamptz}, + %{name: "t3", type: :timestamptz} + ] + + assert %SatOpRow{ + values: [ + "2023-08-14 14:01:28.848242Z", + "2023-08-14 10:01:28Z", + "2023-08-13 18:30:00.123Z" + ], + nulls_bitmask: <<0>> + } == Serialization.map_to_row(data, columns) + end end - test "test row deserialization" do - deserialized_data = - Serialization.decode_record( - %SatOpRow{nulls_bitmask: <<1::1, 1::1, 0::1, 0::5>>, values: [<<>>, <<>>, <<"4">>]}, - [ - %{name: "null", type: :text}, - %{name: "this_columns_is_empty", type: :text}, - %{name: "not_null", type: :int4} + describe "decode_record!" do + test "decodes a SatOpRow struct into a map" do + row = %SatOpRow{ + nulls_bitmask: <<0b00100001>>, + values: [ + "256", + "hello", + "", + "5.4", + "-1.0e124", + "2023-08-15 17:20:31", + "2023-08-15 17:20:31Z", + "" ] - ) + } - expected = %{"not_null" => <<"4">>, "null" => nil, "this_columns_is_empty" => nil} + columns = [ + %{name: "int", type: :int2}, + %{name: "text", type: :text}, + %{name: "null", type: :bytea}, + %{name: "real1", type: :float8}, + %{name: "real2", type: :float8}, + %{name: "t", type: :timestamp}, + %{name: "tz", type: :timestamptz}, + %{name: "x", type: :float4, nullable?: true} + ] + + assert %{ + "int" => "256", + "text" => "hello", + "null" => nil, + "real1" => "5.4", + "real2" => "-1.0e124", + "t" => "2023-08-15 17:20:31", + "tz" => "2023-08-15 17:20:31Z", + "x" => nil + } == Serialization.decode_record!(row, columns) + end - assert deserialized_data == expected - end + test "raises when the row contains an invalid value for its type" do + test_data = [ + {"1.0", :int4}, + {"33", :float8}, + {"1000000", :int2}, + {"-1000000000000000", :int4}, + {"...", :uuid}, + {"00000000-0000-0000-0000-00000000000g", :uuid}, + {"00000000-0000-0000-0000_000000000001", :uuid}, + {"20230815", :timestamp}, + {"0000-08-15 23:00:00", :timestamp}, + {"-1000-08-15 23:00:00", :timestamp}, + {"2023-08-15 11:12:13+04:00", :timestamp}, + {"2023-08-15 11:12:13Z", :timestamp}, + {"2023-08-15 11:12:13+01", :timestamptz}, + {"2023-08-15 11:12:13+99:98", :timestamptz}, + {"2023-08-15 11:12:13+00", :timestamptz}, + {"0000-08-15 23:00:00Z", :timestamptz}, + {"-2000-08-15 23:00:00Z", :timestamptz} + ] + + Enum.each(test_data, fn {val, type} -> + row = %SatOpRow{nulls_bitmask: <<0>>, values: [val]} + columns = [%{name: "val", type: type}] + + try do + Serialization.decode_record!(row, columns) + rescue + _ -> :ok + else + val -> flunk("Expected decode_record!() to raise but it returned #{inspect(val)}") + end + end) + end - test "test row deserialization with long bitmask" do - mask = <<0b1101000010000000::16>> - - deserialized_data = - Serialization.decode_record( - %SatOpRow{nulls_bitmask: mask, values: Enum.map(0..8, fn _ -> "" end)}, - Enum.map(0..8, &%{name: "bit#{&1}", type: :text}) - ) - - expected = %{ - "bit0" => nil, - "bit1" => nil, - "bit2" => "", - "bit3" => nil, - "bit4" => "", - "bit5" => "", - "bit6" => "", - "bit7" => "", - "bit8" => nil - } - - assert deserialized_data == expected - end + test "raises when the row contains null values for non-null columns" do + row = %SatOpRow{nulls_bitmask: <<0b10000000>>, values: [""]} + columns = [%{name: "val", type: :timestamp, nullable?: false}] - test "test row serialization 2" do - data = %{ - "content" => "hello from pg_1", - "content_text_null" => nil, - "content_text_null_default" => "", - "id" => "f989b58b-980d-4d3c-b178-adb6ae8222f1", - "intvalue_null" => nil, - "intvalue_null_default" => "10" - } - - columns = [ - "id", - "content", - "content_text_null", - "content_text_null_default", - "intvalue_null", - "intvalue_null_default" - ] - - serialized_data = Serialization.map_to_row(data, columns) - - expected = %SatOpRow{ - nulls_bitmask: <<0::1, 0::1, 1::1, 0::1, 1::1, 0::3>>, - values: ["f989b58b-980d-4d3c-b178-adb6ae8222f1", "hello from pg_1", "", "", "", "10"] - } - - assert serialized_data == expected + assert_raise RuntimeError, "protocol violation, null value for a not null column", fn -> + Serialization.decode_record!(row, columns) + end + end + + # This is a regression test + test "decodes a SatOpRow struct with a long bitmask" do + bitmask = <<0b1101000010000000::16>> + row = %SatOpRow{nulls_bitmask: bitmask, values: Enum.map(0..8, fn _ -> "" end)} + columns = for i <- 0..8, do: %{name: "bit#{i}", type: :text} + + assert %{ + "bit0" => nil, + "bit1" => nil, + "bit2" => "", + "bit3" => nil, + "bit4" => "", + "bit5" => "", + "bit6" => "", + "bit7" => "", + "bit8" => nil + } == Serialization.decode_record!(row, columns) + end end describe "relations" do diff --git a/components/electric/test/electric/satellite/ws_server_test.exs b/components/electric/test/electric/satellite/ws_server_test.exs index ebefc1ff41..89a8ae462b 100644 --- a/components/electric/test/electric/satellite/ws_server_test.exs +++ b/components/electric/test/electric/satellite/ws_server_test.exs @@ -527,8 +527,8 @@ defmodule Electric.Satellite.WebsocketServerTest do map = %{"satellite-column-1" => a, "satellite-column-2" => b} Electric.Satellite.Serialization.map_to_row(map, [ - "satellite-column-1", - "satellite-column-2" + %{name: "satellite-column-1", type: :text}, + %{name: "satellite-column-2", type: :text} ]) end diff --git a/components/electric/test/electric/satellite/ws_validations_test.exs b/components/electric/test/electric/satellite/ws_validations_test.exs index 11423a5088..894172cbaf 100644 --- a/components/electric/test/electric/satellite/ws_validations_test.exs +++ b/components/electric/test/electric/satellite/ws_validations_test.exs @@ -10,7 +10,6 @@ defmodule Electric.Satellite.WsValidationsTest do alias Electric.Satellite.Auth alias Electric.Satellite.Serialization - alias Electric.Replication.Changes.{Transaction, NewRecord} @table_name "foo" @receive_timeout 500 @@ -110,7 +109,7 @@ defmodule Electric.Satellite.WsValidationsTest do MockClient.send_data(conn, tx_op_log) end) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}} + refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout end) invalid_records = [ @@ -162,7 +161,7 @@ defmodule Electric.Satellite.WsValidationsTest do MockClient.send_data(conn, tx_op_log) end) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}} + refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout end) invalid_records = [ @@ -207,7 +206,7 @@ defmodule Electric.Satellite.WsValidationsTest do MockClient.send_data(conn, tx_op_log) end) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}} + refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout end) invalid_records = [ @@ -227,6 +226,52 @@ defmodule Electric.Satellite.WsValidationsTest do end) end + test "validates timestamp values", ctx do + vsn = "2023072505" + + :ok = + migrate( + ctx.db, + vsn, + "public.foo", + "CREATE TABLE public.foo (id TEXT PRIMARY KEY, t1 timestamp, t2 timestamptz)" + ) + + valid_records = [ + %{"id" => "1", "t1" => "2023-08-07 21:28:35.111", "t2" => "2023-08-07 21:28:35.421Z"}, + %{"id" => "2", "t2" => "2023-08-07 00:00:00Z"} + ] + + within_replication_context(ctx, vsn, fn conn -> + Enum.each(valid_records, fn record -> + tx_op_log = serialize_trans(record) + MockClient.send_data(conn, tx_op_log) + end) + + refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout + end) + + invalid_records = [ + %{"id" => "10", "t1" => "now"}, + %{"id" => "11", "t1" => "12345678901234567890"}, + %{"id" => "12", "t1" => "20230832T000000"}, + %{"id" => "13", "t1" => "2023-08-07 21:28:35+03:00"}, + %{"id" => "13", "t2" => "2023-08-07 21:28:35+03:00"}, + %{"id" => "14", "t2" => ""}, + %{"id" => "15", "t2" => "+"}, + %{"id" => "16", "t2" => "2023-08-07 24:28:35"}, + %{"id" => "16", "t2" => "2023-08-07 24:28:35+00"} + ] + + Enum.each(invalid_records, fn record -> + within_replication_context(ctx, vsn, fn conn -> + tx_op_log = serialize_trans(record) + MockClient.send_data(conn, tx_op_log) + assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout + end) + end) + end + defp within_replication_context(ctx, vsn, expectation_fn) do with_connect(ctx.conn_opts, fn conn -> # Replication start ceremony @@ -256,12 +301,19 @@ defmodule Electric.Satellite.WsValidationsTest do end defp serialize_trans(record) do - {[op_log], _relations, _relation_mappings} = - %Transaction{ - changes: [%NewRecord{relation: {"public", @table_name}, record: record, tags: []}], - commit_timestamp: DateTime.utc_now() - } - |> Serialization.serialize_trans(1, %{}) + %{oid: relation_id, columns: columns} = + Electric.Postgres.Extension.SchemaCache.Global.relation!({"public", @table_name}) + + row_data = Serialization.map_to_row(record, columns, skip_value_encoding?: true) + commit_timestamp = DateTime.to_unix(DateTime.utc_now(), :millisecond) + + op_log = %SatOpLog{ + ops: [ + %SatTransOp{op: {:begin, %SatOpBegin{lsn: "1", commit_timestamp: commit_timestamp}}}, + %SatTransOp{op: {:insert, %SatOpInsert{relation_id: relation_id, row_data: row_data}}}, + %SatTransOp{op: {:commit, %SatOpCommit{}}} + ] + } op_log end diff --git a/e2e/satellite_client/src/client.ts b/e2e/satellite_client/src/client.ts index fa5f5cf900..f2ae2f388f 100644 --- a/e2e/satellite_client/src/client.ts +++ b/e2e/satellite_client/src/client.ts @@ -10,7 +10,10 @@ import { globalRegistry } from 'electric-sql/satellite' setLogLevel('DEBUG') +let dbName: string + export const make_db = (name: string): any => { + dbName = name return new Database(name) } @@ -51,13 +54,14 @@ export const set_subscribers = (db: Electric) => { }) } -export const syncTable = async (electric: Electric, table: 'items' | 'other_items') => { - if (table === 'items') { - const { synced } = await electric.db.items.sync() - return await synced - } else if (table === 'other_items') { +export const syncTable = async (electric: Electric, table: string) => { + if (table === 'other_items') { const { synced } = await electric.db.other_items.sync({ include: { items: true } }) return await synced + } else { + const satellite = globalRegistry.satellites[dbName] + const { synced } = await satellite.subscribe([{selects: [{tablename: table}]}]) + return await synced } } @@ -69,6 +73,10 @@ export const get_columns = async (electric: Electric, table: string) => { return electric.db.raw({ sql: `SELECT * FROM pragma_table_info(?);`, args: [table] }) } +export const get_rows = async (electric: Electric, table: string) => { + return await electric.db.raw({sql: `SELECT * FROM ${table};`}) +} + export const get_items = async (electric: Electric) => { return await electric.db.items.findMany({}) } @@ -99,14 +107,20 @@ export const insert_item = async (electric: Electric, keys: [string]) => { } export const insert_extended_item = async (electric: Electric, values: { string: string }) => { - const fixedColumns = { 'id': uuidv4 } - const columns = Object.keys(fixedColumns).concat(Object.keys(values)) + await insert_extended_into(electric, "items", values) +} + +export const insert_extended_into = async (electric: Electric, table: string, values: { string: string }) => { + if (!values['id']) { + values['id'] = uuidv4() + } + const columns = Object.keys(values) const columnNames = columns.join(", ") const placeHolders = Array(columns.length).fill("?") - const args = Object.values(fixedColumns).map(f => f()).concat(Object.values(values)) + const args = Object.values(values) await electric.db.raw({ - sql: `INSERT INTO items (${columnNames}) VALUES (${placeHolders}) RETURNING *;`, + sql: `INSERT INTO ${table} (${columnNames}) VALUES (${placeHolders}) RETURNING *;`, args: args, }) } diff --git a/e2e/tests/03.13_node_satellite_can_sync_timestamps.lux b/e2e/tests/03.13_node_satellite_can_sync_timestamps.lux new file mode 100644 index 0000000000..a38ed2ff46 --- /dev/null +++ b/e2e/tests/03.13_node_satellite_can_sync_timestamps.lux @@ -0,0 +1,85 @@ +[doc NodeJS Satellite correctly syncs TIMESTAMP and TIMESTAMPTZ values from and to Electric] +[include _shared.luxinc] +[include _satellite_macros.luxinc] + +[invoke setup] + +[shell pg_1] + [local sql= + """ + CREATE TABLE public.timestamps ( + id TEXT PRIMARY KEY DEFAULT uuid_generate_v4(), + created_at TIMESTAMP NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + ); + CALL electric.electrify('public.timestamps'); + """] + [invoke migrate_pg 20230823 $sql] + +[invoke setup_client 1 electric_1 5133] + +[shell satellite_1] + [invoke node_await_table "timestamps"] + [invoke node_sync_table "timestamps"] + +[shell pg_1] + !INSERT INTO public.timestamps (id) VALUES ('00000000-0000-0000-0000-000000000001'); + ??INSERT 0 1 + +[shell satellite_1] + [invoke node_await_get_from_table "timestamps" "00000000-0000-0000-0000-000000000001"] + + !await client.get_rows(db, "timestamps") + ?created_at: '([0-9-]{10} [0-9:]{8}\.[0-9]+)' + [global created_at=$1] + ?updated_at: '([0-9-]{10} [0-9:]{8}\.[0-9]+)Z' + [global updated_at=$1] + +[shell pg_1] + !INSERT INTO public.timestamps (id, created_at, updated_at) VALUES (\ + '00000000-0000-0000-0000-000000000002',\ + '2023-08-23 09:10:11',\ + '2023-08-23 09:10:11.001'); + ??INSERT 0 1 + +[shell satellite_1] + [invoke node_await_get_from_table "timestamps" "00000000-0000-0000-0000-000000000002"] + + !await client.get_rows(db, "timestamps") + ??created_at: '2023-08-23 09:10:11' + ??updated_at: '2023-08-23 09:10:11.001Z' + + [invoke node_await_insert_extended_into "timestamps" "{id: '00000000-0000-0000-0000-000000000003', created_at: '1999-01-02 00:15:54.555', updated_at: '1999-02-01 23:59:59Z'}"] + +[shell pg_1] + [invoke wait-for "SELECT * FROM public.timestamps;" "00000000-0000-0000-0000-000000000003" 10 $psql] + + !SELECT * FROM public.timestamps; + ??00000000-0000-0000-0000-000000000001 | $created_at | $updated_at+00 + ?00000000-0000-0000-0000-000000000002 \| 2023-08-23 09:10:11\s+\| 2023-08-23 09:10:11\.001\+00 + ?00000000-0000-0000-0000-000000000003 \| 1999-01-02 00:15:54\.555\s+\| 1999-02-01 23:59:59\+00 + +# Start a new Satellite client and verify that it receives all timestamps +[invoke setup_client 2 electric_1 5133] + +[shell satellite_2] + [invoke node_await_table "timestamps"] + [invoke node_sync_table "timestamps"] + + [invoke node_await_get_from_table "timestamps" "00000000-0000-0000-0000-000000000003"] + + !await client.get_rows(db, "timestamps") + ??id: '00000000-0000-0000-0000-000000000001' + ??created_at: '$created_at' + ??updated_at: '${updated_at}Z' + + ??id: '00000000-0000-0000-0000-000000000002' + ??created_at: '2023-08-23 09:10:11' + ??updated_at: '2023-08-23 09:10:11.001Z' + + ??id: '00000000-0000-0000-0000-000000000003' + ??created_at: '1999-01-02 00:15:54.555' + ??updated_at: '1999-02-01 23:59:59Z' + +[cleanup] + [invoke teardown] diff --git a/e2e/tests/_satellite_macros.luxinc b/e2e/tests/_satellite_macros.luxinc index 11f0ac1a6f..a57b4fbee3 100644 --- a/e2e/tests/_satellite_macros.luxinc +++ b/e2e/tests/_satellite_macros.luxinc @@ -35,6 +35,10 @@ [invoke wait-for "await client.get_items(db)" "${match}" 10 $node] [endmacro] +[macro node_await_get_from_table table match] + [invoke wait-for "await client.get_rows(db, '${table}')" "${match}" 10 $node] +[endmacro] + [macro node_await_table match] [invoke wait-for "await client.get_tables(db)" "${match}" 10 $node] [endmacro] @@ -57,6 +61,11 @@ ?$node [endmacro] +[macro node_await_insert_extended_into table keys] + !await client.insert_extended_into(db, '${table}', ${keys}) + ?$node +[endmacro] + [macro node_await_get_other match] [invoke wait-for "client.get_other_items(db)" "${match}" 10 $node] [endmacro]