diff --git a/lib/logflare/pubsub_rates.ex b/lib/logflare/pubsub_rates.ex index 71e2d1ad7..e71793369 100644 --- a/lib/logflare/pubsub_rates.ex +++ b/lib/logflare/pubsub_rates.ex @@ -4,6 +4,7 @@ defmodule Logflare.PubSubRates do alias Logflare.PubSubRates alias Phoenix.PubSub + @topics ["buffers", "rates", "inserts"] @partitions Application.compile_env(:logflare, Logflare.PubSub)[:pool_size] @@ -47,28 +48,31 @@ defmodule Logflare.PubSubRates do ### Examples iex> subscribe(:all) - iex> subscribe("buffers") - iex> subscribe("inserts") - iex> subscribe("rates") + iex> subscribe(["buffers", "inserts", "rates]) """ @spec subscribe(:all | binary() | maybe_improper_list()) :: :ok | list() | {:error, {:already_registered, pid()}} def subscribe(:all), do: subscribe(@topics) - def subscribe("rates" <> _partition = topic), - do: PubSub.subscribe(Logflare.PubSub, topic) - - def subscribe("buffers" <> _partition = topic), - do: PubSub.subscribe(Logflare.PubSub, topic) + def subscribe(topics) when is_list(topics) do + for topic <- topics, partition <- 0..partitions() do + part = Integer.to_string(partition) + subscribe(topic, part) + end + end - def subscribe("inserts" <> _partition = topic), - do: PubSub.subscribe(Logflare.PubSub, topic) + @doc """ + Subscribes to a topic for a partition. - def subscribe(topics) when is_list(topics), do: Enum.map(topics, &subscribe/1) + ### Examples - def subscribe(topic) when topic in @topics do - PubSub.subscribe(Logflare.PubSub, topic) + iex> subscribe("buffers", "0") + iex> subscribe("inserts", "56") + """ + @spec subscribe(binary(), binary()) :: :ok | {:error, {:already_registered, pid()}} + def subscribe(topic, partition) when topic in @topics and is_binary(partition) do + PubSub.subscribe(Logflare.PubSub, topic <> partition) end @doc """ @@ -79,43 +83,20 @@ defmodule Logflare.PubSubRates do | {binary(), integer(), any(), any()} ) :: :ok | {:error, any()} - def global_broadcast_rate({"buffers" = topic, source_id, backend_id, _payload} = data) + def global_broadcast_rate({topic, source_id, backend_id, _payload} = data) when topic in @topics do partitioned_topic = partitioned_topic(topic, {source_id, backend_id}) Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data) end - def global_broadcast_rate({topic, source_id, _backend_id, _payload} = data) - when topic in @topics and is_integer(source_id) do - Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data) - end - - def global_broadcast_rate({"rates" = topic, source_token, _payload} = data) - when topic in @topics do - partitioned_topic = partitioned_topic(topic, source_token) - - Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data) - end - - def global_broadcast_rate({"buffers" = topic, source_token, _payload} = data) - when topic in @topics do - partitioned_topic = partitioned_topic(topic, source_token) - - Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data) - end - - def global_broadcast_rate({"inserts" = topic, source_token, _payload} = data) + def global_broadcast_rate({topic, source_token, _payload} = data) when topic in @topics do partitioned_topic = partitioned_topic(topic, source_token) Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data) end - def global_broadcast_rate({topic, _source_token, _payload} = data) when topic in @topics do - Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data) - end - @doc """ The number of partitions for a paritioned child. """ @@ -123,9 +104,18 @@ defmodule Logflare.PubSubRates do def partitions(), do: @partitions @doc """ - Partitions a topic for a source_token. + Partitions a topic for a key. + """ + @spec partitioned_topic(binary(), any()) :: binary() + def partitioned_topic(topic, key) when is_binary(topic) do + topic <> make_partition(key) + end + + @doc """ + Makes a string of a partition integer from a key. """ - def partitioned_topic(topic, source_token) do - topic <> (:erlang.phash2(source_token, partitions()) |> Integer.to_string()) + @spec make_partition(any()) :: binary() + def make_partition(key) do + :erlang.phash2(key, partitions()) |> Integer.to_string() end end diff --git a/lib/logflare/pubsub_rates/buffers.ex b/lib/logflare/pubsub_rates/buffers.ex index 45a822791..e25a2ecb3 100644 --- a/lib/logflare/pubsub_rates/buffers.ex +++ b/lib/logflare/pubsub_rates/buffers.ex @@ -30,8 +30,8 @@ defmodule Logflare.PubSubRates.Buffers do def init(args) do partition = get_partition_opt(args) - topic = @topic <> partition - PubSubRates.subscribe(topic) + + PubSubRates.subscribe(@topic, partition) {:ok, args} end diff --git a/lib/logflare/pubsub_rates/inserts.ex b/lib/logflare/pubsub_rates/inserts.ex index 3991da1c5..142bec4f4 100644 --- a/lib/logflare/pubsub_rates/inserts.ex +++ b/lib/logflare/pubsub_rates/inserts.ex @@ -28,8 +28,8 @@ defmodule Logflare.PubSubRates.Inserts do def init(args) do partition = get_partition_opt(args) - topic = @topic <> partition - PubSubRates.subscribe(topic) + + PubSubRates.subscribe(@topic, partition) {:ok, args} end diff --git a/lib/logflare/pubsub_rates/rates.ex b/lib/logflare/pubsub_rates/rates.ex index db90abcba..57542be5d 100644 --- a/lib/logflare/pubsub_rates/rates.ex +++ b/lib/logflare/pubsub_rates/rates.ex @@ -28,8 +28,8 @@ defmodule Logflare.PubSubRates.Rates do def init(args) do partition = get_partition_opt(args) - topic = @topic <> partition - PubSubRates.subscribe(topic) + + PubSubRates.subscribe(@topic, partition) {:ok, args} end diff --git a/test/logflare/cluster_pubsub_test.exs b/test/logflare/cluster_pubsub_test.exs index 03e00c0ff..ec177cb3f 100644 --- a/test/logflare/cluster_pubsub_test.exs +++ b/test/logflare/cluster_pubsub_test.exs @@ -20,8 +20,7 @@ defmodule Logflare.ClusterPubSubTest do end test "subscribe/1 inserts", %{source: %{token: source_token}} do - PubSubRates.partitioned_topic("inserts", source_token) - |> PubSubRates.subscribe() + PubSubRates.subscribe("inserts", PubSubRates.make_partition(source_token)) TestUtils.retry_assert(fn -> PubSubRates.global_broadcast_rate({"inserts", source_token, %{data: "some val"}}) @@ -30,8 +29,7 @@ defmodule Logflare.ClusterPubSubTest do end test "subscribe/1 rates", %{source: %{token: source_token}} do - PubSubRates.partitioned_topic("rates", source_token) - |> PubSubRates.subscribe() + PubSubRates.subscribe("rates", PubSubRates.make_partition(source_token)) TestUtils.retry_assert(fn -> PubSubRates.global_broadcast_rate({"rates", source_token, %{data: "some val"}}) @@ -42,8 +40,7 @@ defmodule Logflare.ClusterPubSubTest do test "subscribe/1 buffers", %{source: %{id: source_id}} do backend_id = 1 - PubSubRates.partitioned_topic("buffers", {source_id, backend_id}) - |> PubSubRates.subscribe() + PubSubRates.subscribe("buffers", PubSubRates.make_partition({source_id, backend_id})) TestUtils.retry_assert(fn -> PubSubRates.global_broadcast_rate({"buffers", source_id, backend_id, %{data: "some val"}})