From 9be572044a23bb63af88a0e13a8b40811254a09d Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Tue, 22 Oct 2024 11:20:21 -0700 Subject: [PATCH 1/6] fix: partitions PubSubRates.Rates --- .../ingest_event_queue/broadcast_worker.ex | 2 +- lib/logflare/pubsub_rates.ex | 64 +++++++++++++++---- lib/logflare/pubsub_rates/buffers.ex | 4 +- lib/logflare/pubsub_rates/inserts.ex | 4 +- lib/logflare/pubsub_rates/rates.ex | 21 +++++- lib/logflare/source/rate_counter_server.ex | 6 +- lib/logflare/source/recent_logs_server.ex | 6 +- test/logflare/cluster_pubsub_test.exs | 23 +++---- 8 files changed, 87 insertions(+), 43 deletions(-) diff --git a/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex b/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex index a594e017e..45a0e5ba5 100644 --- a/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex +++ b/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex @@ -65,7 +65,7 @@ defmodule Logflare.Backends.IngestEventQueue.BroadcastWorker do len = Backends.get_and_cache_local_pending_buffer_len(source_id, backend_id) local_buffer = %{Node.self() => %{len: len}} - PubSubRates.global_broadcast_rate({:buffers, source_id, backend_id, local_buffer}) + PubSubRates.global_broadcast_rate({"buffers", source_id, backend_id, local_buffer}) end defp local_broadcast_cluster_length({source_id, backend_id}) do diff --git a/lib/logflare/pubsub_rates.ex b/lib/logflare/pubsub_rates.ex index df193c745..5859fb752 100644 --- a/lib/logflare/pubsub_rates.ex +++ b/lib/logflare/pubsub_rates.ex @@ -4,7 +4,8 @@ defmodule Logflare.PubSubRates do alias Logflare.PubSubRates alias Phoenix.PubSub - @topics [:buffers, :rates, :inserts] + @topics ["buffers", "rates", "inserts"] + @partitions 8 def start_link(init_arg) do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) @@ -14,7 +15,13 @@ defmodule Logflare.PubSubRates do def init(_init_arg) do children = [ PubSubRates.Cache, - PubSubRates.Rates, + {PartitionSupervisor, + child_spec: PubSubRates.Rates, + name: PubSubRates.Supervisors, + partitions: partitions(), + with_arguments: fn [opts], partition -> + [Keyword.put(opts, :partition, partition)] + end}, PubSubRates.Buffers, PubSubRates.Inserts ] @@ -28,29 +35,58 @@ defmodule Logflare.PubSubRates do ### Examples iex> subscribe(:all) - iex> subscribe(:buffers) - iex> subscribe(:inserts) - iex> subscribe(:rates) + iex> subscribe("buffers") + iex> subscribe("inserts") + iex> subscribe("rates") """ - @spec subscribe(atom()) :: :ok + @spec subscribe(:all | binary() | maybe_improper_list()) :: + :ok | list() | {:error, {:already_registered, pid()}} + def subscribe(:all), do: subscribe(@topics) + + def subscribe("rates" <> _partition = topic), + do: PubSub.subscribe(Logflare.PubSub, topic) + def subscribe(topics) when is_list(topics), do: Enum.map(topics, &subscribe/1) def subscribe(topic) when topic in @topics do - PubSub.subscribe(Logflare.PubSub, "#{topic}") + PubSub.subscribe(Logflare.PubSub, topic) end @doc """ Global sharded broadcast for a rate-specific message. """ - @spec global_broadcast_rate({atom(), non_neg_integer(), nil | non_neg_integer(), term()}) :: :ok - @spec global_broadcast_rate({atom(), atom(), term()}) :: :ok - def global_broadcast_rate({msg, source_id, _backend_id, _payload} = data) - when msg in @topics and is_integer(source_id) do - Phoenix.PubSub.broadcast(Logflare.PubSub, "#{msg}", data) + @spec global_broadcast_rate( + {binary(), any(), any()} + | {binary(), integer(), any(), any()} + ) :: :ok | {:error, any()} + + def global_broadcast_rate({topic, source_id, _backend_id, _payload} = data) + when topic in @topics and is_integer(source_id) do + Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data) + end + + def global_broadcast_rate({"rates" = topic, source_token, _payload} = data) + when topic in @topics do + partitioned_topic = partitioned_topic(topic, source_token) + + Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data) end - def global_broadcast_rate({msg, _source_token, _payload} = data) when msg in @topics do - Phoenix.PubSub.broadcast(Logflare.PubSub, "#{msg}", data) + def global_broadcast_rate({topic, _source_token, _payload} = data) when topic in @topics do + Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data) + end + + @doc """ + The number of partitions for a paritioned child. + """ + @spec partitions() :: integer() + def partitions(), do: @partitions + + @doc """ + Partitions a topic for a source_token. + """ + def partitioned_topic(topic, source_token) do + topic <> (:erlang.phash2(source_token, partitions()) |> Integer.to_string()) end end diff --git a/lib/logflare/pubsub_rates/buffers.ex b/lib/logflare/pubsub_rates/buffers.ex index a8f67118f..a33d58c33 100644 --- a/lib/logflare/pubsub_rates/buffers.ex +++ b/lib/logflare/pubsub_rates/buffers.ex @@ -15,12 +15,12 @@ defmodule Logflare.PubSubRates.Buffers do @impl GenServer def init(_state) do - PubSubRates.subscribe(:buffers) + PubSubRates.subscribe("buffers") {:ok, %{}} end @impl GenServer - def handle_info({:buffers, source_id, backend_id, buffers}, state) + def handle_info({"buffers", source_id, backend_id, buffers}, state) when is_integer(source_id) and is_map(buffers) do Cache.cache_buffers(source_id, backend_id, buffers) {:noreply, state} diff --git a/lib/logflare/pubsub_rates/inserts.ex b/lib/logflare/pubsub_rates/inserts.ex index 03f8b931e..40598c19e 100644 --- a/lib/logflare/pubsub_rates/inserts.ex +++ b/lib/logflare/pubsub_rates/inserts.ex @@ -16,11 +16,11 @@ defmodule Logflare.PubSubRates.Inserts do end def init(state) do - PubSubRates.subscribe(:inserts) + PubSubRates.subscribe("inserts") {:ok, state} end - def handle_info({:inserts, source_token, inserts}, state) do + def handle_info({"inserts", source_token, inserts}, state) do Cache.cache_inserts(source_token, inserts) {:noreply, state} end diff --git a/lib/logflare/pubsub_rates/rates.ex b/lib/logflare/pubsub_rates/rates.ex index 3091eee11..872f3c831 100644 --- a/lib/logflare/pubsub_rates/rates.ex +++ b/lib/logflare/pubsub_rates/rates.ex @@ -7,16 +7,31 @@ defmodule Logflare.PubSubRates.Rates do use GenServer + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + type: :worker, + restart: :permanent, + shutdown: 500 + } + end + def start_link(args \\ []) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) + partition = Keyword.get(args, :partition, 0) + name = :"#{__MODULE__}#{partition}" + + GenServer.start_link(__MODULE__, args, name: name) end def init(state) do - PubSubRates.subscribe(:rates) + partition = Keyword.get(state, :partition, 0) + topic = "rates#{partition}" + PubSubRates.subscribe(topic) {:ok, state} end - def handle_info({:rates, source_token, rates}, state) do + def handle_info({"rates", source_token, rates}, state) do Cache.cache_rates(source_token, rates) {:noreply, state} end diff --git a/lib/logflare/source/rate_counter_server.ex b/lib/logflare/source/rate_counter_server.ex index fff8a836b..45f6e4dea 100644 --- a/lib/logflare/source/rate_counter_server.ex +++ b/lib/logflare/source/rate_counter_server.ex @@ -247,11 +247,7 @@ defmodule Logflare.Source.RateCounterServer do def broadcast(%RateCounterServer{} = state) do local_rates = %{Node.self() => state_to_external(state)} - Phoenix.PubSub.broadcast( - Logflare.PubSub, - "rates", - {:rates, state.source_id, local_rates} - ) + PubSubRates.global_broadcast_rate({"rates", state.source_id, local_rates}) cluster_rates = PubSubRates.Cache.get_cluster_rates(state.source_id) diff --git a/lib/logflare/source/recent_logs_server.ex b/lib/logflare/source/recent_logs_server.ex index fca6c2ffc..3bccce55d 100644 --- a/lib/logflare/source/recent_logs_server.ex +++ b/lib/logflare/source/recent_logs_server.ex @@ -137,11 +137,7 @@ defmodule Logflare.Source.RecentLogsServer do bq_inserts = Source.Data.get_bq_inserts(source_token) inserts_payload = %{Node.self() => %{node_inserts: current_inserts, bq_inserts: bq_inserts}} - Phoenix.PubSub.broadcast( - Logflare.PubSub, - "inserts", - {:inserts, source_token, inserts_payload} - ) + PubSubRates.global_broadcast_rate({"inserts", source_token, inserts_payload}) end current_cluster_inserts = PubSubRates.Cache.get_cluster_inserts(state.source_token) diff --git a/test/logflare/cluster_pubsub_test.exs b/test/logflare/cluster_pubsub_test.exs index 4348f721d..d4b9848d9 100644 --- a/test/logflare/cluster_pubsub_test.exs +++ b/test/logflare/cluster_pubsub_test.exs @@ -20,29 +20,30 @@ defmodule Logflare.ClusterPubSubTest do end test "subscribe/1 inserts", %{source: %{token: source_token}} do - PubSubRates.subscribe(:inserts) + PubSubRates.subscribe("inserts") TestUtils.retry_assert(fn -> - PubSubRates.global_broadcast_rate({:inserts, source_token, %{data: "some val"}}) - assert_received {:inserts, ^source_token, %{data: "some val"}} + PubSubRates.global_broadcast_rate({"inserts", source_token, %{data: "some val"}}) + assert_received {"inserts", ^source_token, %{data: "some val"}} end) end test "subscribe/1 rates", %{source: %{token: source_token}} do - PubSubRates.subscribe(:rates) + PubSubRates.partitioned_topic("rates", source_token) + |> PubSubRates.subscribe() TestUtils.retry_assert(fn -> - PubSubRates.global_broadcast_rate({:rates, source_token, %{data: "some val"}}) - assert_received {:rates, ^source_token, %{data: "some val"}} + PubSubRates.global_broadcast_rate({"rates", source_token, %{data: "some val"}}) + assert_received {"rates", ^source_token, %{data: "some val"}} end) end test "subscribe/1 buffers", %{source: %{id: source_id}} do - PubSubRates.subscribe(:buffers) + PubSubRates.subscribe("buffers") TestUtils.retry_assert(fn -> - PubSubRates.global_broadcast_rate({:buffers, source_id, nil, %{data: "some val"}}) - assert_received {:buffers, ^source_id, nil, %{data: "some val"}} + PubSubRates.global_broadcast_rate({"buffers", source_id, nil, %{data: "some val"}}) + assert_received {"buffers", ^source_id, nil, %{data: "some val"}} end) end @@ -50,12 +51,12 @@ defmodule Logflare.ClusterPubSubTest do Phoenix.PubSub.broadcast( Logflare.PubSub, "buffers", - {:buffers, source.token, %{Node.self() => %{len: 5}}} + {"buffers", source.token, %{Node.self() => %{len: 5}}} ) :timer.sleep(100) assert PubSubRates.Cache.get_cluster_buffers(source.id, nil) == 0 - PubSubRates.global_broadcast_rate({:buffers, source.id, nil, %{Node.self() => %{len: 5}}}) + PubSubRates.global_broadcast_rate({"buffers", source.id, nil, %{Node.self() => %{len: 5}}}) :timer.sleep(100) assert PubSubRates.Cache.get_cluster_buffers(source.id, nil) == 5 end From 516c81f9c6db51b5fc5e9954e9c16138537a01e7 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Tue, 22 Oct 2024 11:27:56 -0700 Subject: [PATCH 2/6] fix: bump version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index c15ef4be5..dd4f0bd50 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.8.12 \ No newline at end of file +1.8.13 \ No newline at end of file From e2f152d580cce2ae4a2822fe9cd67eca29e6c345 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Tue, 22 Oct 2024 14:57:43 -0700 Subject: [PATCH 3/6] fix: cleanup --- lib/logflare/pubsub_rates.ex | 2 +- lib/logflare/pubsub_rates/rates.ex | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/lib/logflare/pubsub_rates.ex b/lib/logflare/pubsub_rates.ex index 5859fb752..c4c7add1e 100644 --- a/lib/logflare/pubsub_rates.ex +++ b/lib/logflare/pubsub_rates.ex @@ -20,7 +20,7 @@ defmodule Logflare.PubSubRates do name: PubSubRates.Supervisors, partitions: partitions(), with_arguments: fn [opts], partition -> - [Keyword.put(opts, :partition, partition)] + [Keyword.put(opts, :partition, Integer.to_string(partition))] end}, PubSubRates.Buffers, PubSubRates.Inserts diff --git a/lib/logflare/pubsub_rates/rates.ex b/lib/logflare/pubsub_rates/rates.ex index 872f3c831..db90abcba 100644 --- a/lib/logflare/pubsub_rates/rates.ex +++ b/lib/logflare/pubsub_rates/rates.ex @@ -7,6 +7,8 @@ defmodule Logflare.PubSubRates.Rates do use GenServer + @topic "rates" + def child_spec(args) do %{ id: __MODULE__, @@ -18,21 +20,25 @@ defmodule Logflare.PubSubRates.Rates do end def start_link(args \\ []) do - partition = Keyword.get(args, :partition, 0) + partition = get_partition_opt(args) name = :"#{__MODULE__}#{partition}" GenServer.start_link(__MODULE__, args, name: name) end - def init(state) do - partition = Keyword.get(state, :partition, 0) - topic = "rates#{partition}" + def init(args) do + partition = get_partition_opt(args) + topic = @topic <> partition PubSubRates.subscribe(topic) - {:ok, state} + {:ok, args} end - def handle_info({"rates", source_token, rates}, state) do + def handle_info({@topic, source_token, rates}, state) do Cache.cache_rates(source_token, rates) {:noreply, state} end + + defp get_partition_opt(args) do + Keyword.get(args, :partition, "0") + end end From 4d589fef085ff05977529c94433d5043a25c7d71 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Tue, 22 Oct 2024 15:05:00 -0700 Subject: [PATCH 4/6] fix: use same partition count as PubSub --- lib/logflare/pubsub_rates.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logflare/pubsub_rates.ex b/lib/logflare/pubsub_rates.ex index c4c7add1e..a121b3300 100644 --- a/lib/logflare/pubsub_rates.ex +++ b/lib/logflare/pubsub_rates.ex @@ -5,7 +5,7 @@ defmodule Logflare.PubSubRates do alias Logflare.PubSubRates alias Phoenix.PubSub @topics ["buffers", "rates", "inserts"] - @partitions 8 + @partitions Application.get_env(:logflare, Logflare.PubSub)[:pool_size] def start_link(init_arg) do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) From d50a0d7ff5cd0c4cf8ae5b82d1340b3aba363e82 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Tue, 22 Oct 2024 15:05:15 -0700 Subject: [PATCH 5/6] fix: use all cores for PubSub --- config/config.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.exs b/config/config.exs index 45eee9ee1..8f895f839 100644 --- a/config/config.exs +++ b/config/config.exs @@ -34,7 +34,7 @@ config :logflare, LogflareWeb.Endpoint, pubsub_server: Logflare.PubSub, live_view: [signing_salt: "Fvo_-oQi4bjPfQLh"] -config :logflare, Logflare.PubSub, pool_size: 10 +config :logflare, Logflare.PubSub, pool_size: 56 # Configures Elixir's Logger config :logger, From 3b8a49e20373a6a62d82cc71cb54ebf4d602b6f3 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Tue, 22 Oct 2024 15:06:23 -0700 Subject: [PATCH 6/6] fix: use compile_env instead --- lib/logflare/pubsub_rates.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logflare/pubsub_rates.ex b/lib/logflare/pubsub_rates.ex index a121b3300..68bf4c61a 100644 --- a/lib/logflare/pubsub_rates.ex +++ b/lib/logflare/pubsub_rates.ex @@ -5,7 +5,7 @@ defmodule Logflare.PubSubRates do alias Logflare.PubSubRates alias Phoenix.PubSub @topics ["buffers", "rates", "inserts"] - @partitions Application.get_env(:logflare, Logflare.PubSub)[:pool_size] + @partitions Application.compile_env(:logflare, Logflare.PubSub)[:pool_size] def start_link(init_arg) do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)