Skip to content

Commit

Permalink
chore: fix flaky topic sharded broadcasting tests (#2066)
Browse files Browse the repository at this point in the history
* chore: fix flaky topic sharded broadcasting tests

* chore: fix failing test
  • Loading branch information
Ziinc authored May 9, 2024
1 parent c6ebb8a commit 9939a34
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 31 deletions.
5 changes: 1 addition & 4 deletions lib/logflare/backends/buffer_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,9 @@ defmodule Logflare.Backends.BufferProducer do
pid = self()

Task.start_link(fn ->
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]
len = GenStage.estimate_buffered_count(pid)
local_buffer = %{Node.self() => %{len: len}}

shard = :erlang.phash2(state.source_token, pool_size)

cluster_buffer = PubSubRates.Cache.get_cluster_buffers(state.source_token)

# maybe broadcast
Expand All @@ -78,7 +75,7 @@ defmodule Logflare.Backends.BufferProducer do

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"buffers:shard-#{shard}",
"buffers",
cluster_broadcast_payload
)

Expand Down
19 changes: 3 additions & 16 deletions lib/logflare/pubsub_rates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,15 @@ defmodule Logflare.PubSubRates do
def subscribe(topics) when is_list(topics), do: Enum.map(topics, &subscribe/1)

def subscribe(topic) when topic in @topics do
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]

for shard <- 1..pool_size do
PubSub.subscribe(Logflare.PubSub, "#{topic}:shard-#{shard}")
end

:ok
PubSub.subscribe(Logflare.PubSub, "#{topic}")
end

@doc """
Global sharded broadcast for a rate-specific message.
"""
@spec global_broadcast_rate({atom(), atom(), term()}) :: :ok
def global_broadcast_rate({msg, source_token, _payload} = data) when msg in @topics do
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]
shard = :erlang.phash2(source_token, pool_size)

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"#{msg}:shard-#{shard}",
data
)
def global_broadcast_rate({msg, _source_token, _payload} = data) when msg in @topics do
Phoenix.PubSub.broadcast(Logflare.PubSub, "#{msg}", data)
end
end
4 changes: 1 addition & 3 deletions lib/logflare/source/rate_counter_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,11 @@ defmodule Logflare.Source.RateCounterServer do
end

def broadcast(%RateCounterServer{} = state) do
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]
shard = :erlang.phash2(state.source_id, pool_size)
local_rates = %{Node.self() => state_to_external(state)}

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"rates:shard-#{shard}",
"rates",
{:rates, state.source_id, local_rates}
)

Expand Down
6 changes: 1 addition & 5 deletions lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,15 @@ defmodule Logflare.Source.RecentLogsServer do
defp broadcast_count(
%{source_token: source_token, inserts_since_boot: inserts_since_boot} = state
) do
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]
current_inserts = Source.Data.get_node_inserts(source_token)

if current_inserts > inserts_since_boot do
bq_inserts = Source.Data.get_bq_inserts(source_token)

inserts_payload = %{Node.self() => %{node_inserts: current_inserts, bq_inserts: bq_inserts}}

shard = :erlang.phash2(source_token, pool_size)

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"inserts:shard-#{shard}",
"inserts",
{:inserts, source_token, inserts_payload}
)
end
Expand Down
6 changes: 3 additions & 3 deletions test/logflare/cluster_pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ defmodule Logflare.ClusterPubSubTest do

test "subscribe/1 inserts", %{source: %{token: source_token}} do
PubSubRates.subscribe(:inserts)
PubSubRates.global_broadcast_rate({:inserts, source_token, %{data: "some val"}})

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({:inserts, source_token, %{data: "some val"}})
assert_received {:inserts, ^source_token, %{data: "some val"}}
end)
end

test "subscribe/1 rates", %{source: %{token: source_token}} do
PubSubRates.subscribe(:rates)
PubSubRates.global_broadcast_rate({:rates, source_token, %{data: "some val"}})

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({:rates, source_token, %{data: "some val"}})
assert_received {:rates, ^source_token, %{data: "some val"}}
end)
end

test "subscribe/1 buffers", %{source: %{token: source_token}} do
PubSubRates.subscribe(:buffers)
PubSubRates.global_broadcast_rate({:buffers, source_token, %{data: "some val"}})

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({:buffers, source_token, %{data: "some val"}})
assert_received {:buffers, ^source_token, %{data: "some val"}}
end)
end
Expand Down

0 comments on commit 9939a34

Please sign in to comment.