Skip to content

Commit

Permalink
chore(electric): Rewrite the splitting algorithm in Proxy.Injector to…
Browse files Browse the repository at this point in the history
… be more concise (#588)

This is a follow-up to
#587 (comment).
  • Loading branch information
alco authored Oct 30, 2023
1 parent c2d0428 commit 690ddc5
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions components/electric/lib/electric/postgres/proxy/injector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,24 @@ defmodule Electric.Postgres.Proxy.Injector do

@spec client_messages_in([M.t()], State.t()) :: {[M.t()], State.t()}
defp client_messages_in(msgs, state) do
case state do
%{pending_messages: []} -> {msgs, state}
%{pending_messages: pending} -> {pending ++ msgs, %{state | pending_messages: []}}
end
|> sync_messages()
{out, still_pending} =
case state do
%{pending_messages: []} -> msgs
%{pending_messages: pending} -> Stream.concat(pending, msgs)
end
|> sync_messages()

{out, %{state | pending_messages: still_pending}}
end

defp sync_messages({msgs, state}) do
{out, pending} =
Enum.flat_map_reduce(msgs, [], fn %type{} = msg, buf ->
if type in [M.Sync, M.Flush, M.Query] do
{[Enum.reverse([msg | buf])], []}
else
{[], [msg | buf]}
end
end)
# Split the list of messages at the last occurrence of any of M.Sync, M.Flush or M.Query.
defp sync_messages(msgs) do
{rpending, rout} =
msgs
|> Enum.reverse()
|> Enum.split_while(fn %type{} -> type not in [M.Sync, M.Flush, M.Query] end)

{List.flatten(out), %{state | pending_messages: Enum.reverse(pending)}}
{Enum.reverse(rout), Enum.reverse(rpending)}
end

def recv_client({stack, state}, msgs) do
Expand Down

0 comments on commit 690ddc5

Please sign in to comment.