Skip to content

Commit

Permalink
chore: cleanup PubSubRates mod (#2236)
Browse files Browse the repository at this point in the history
* chore: cleanup PubSubRates mod

* chore: docs and specs

* fix: partition should be a string

* fix: nit
  • Loading branch information
chasers authored Oct 23, 2024
1 parent 65c5423 commit a16c182
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 53 deletions.
72 changes: 31 additions & 41 deletions lib/logflare/pubsub_rates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Logflare.PubSubRates do

alias Logflare.PubSubRates
alias Phoenix.PubSub

@topics ["buffers", "rates", "inserts"]
@partitions Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

Expand Down Expand Up @@ -47,28 +48,31 @@ defmodule Logflare.PubSubRates do
### Examples
iex> subscribe(:all)
iex> subscribe("buffers")
iex> subscribe("inserts")
iex> subscribe("rates")
iex> subscribe(["buffers", "inserts", "rates])
"""
@spec subscribe(:all | binary() | maybe_improper_list()) ::
:ok | list() | {:error, {:already_registered, pid()}}

def subscribe(:all), do: subscribe(@topics)

def subscribe("rates" <> _partition = topic),
do: PubSub.subscribe(Logflare.PubSub, topic)

def subscribe("buffers" <> _partition = topic),
do: PubSub.subscribe(Logflare.PubSub, topic)
def subscribe(topics) when is_list(topics) do
for topic <- topics, partition <- 0..partitions() do
part = Integer.to_string(partition)
subscribe(topic, part)
end
end

def subscribe("inserts" <> _partition = topic),
do: PubSub.subscribe(Logflare.PubSub, topic)
@doc """
Subscribes to a topic for a partition.
def subscribe(topics) when is_list(topics), do: Enum.map(topics, &subscribe/1)
### Examples
def subscribe(topic) when topic in @topics do
PubSub.subscribe(Logflare.PubSub, topic)
iex> subscribe("buffers", "0")
iex> subscribe("inserts", "56")
"""
@spec subscribe(binary(), binary()) :: :ok | {:error, {:already_registered, pid()}}
def subscribe(topic, partition) when topic in @topics and is_binary(partition) do
PubSub.subscribe(Logflare.PubSub, topic <> partition)
end

@doc """
Expand All @@ -79,53 +83,39 @@ defmodule Logflare.PubSubRates do
| {binary(), integer(), any(), any()}
) :: :ok | {:error, any()}

def global_broadcast_rate({"buffers" = topic, source_id, backend_id, _payload} = data)
def global_broadcast_rate({topic, source_id, backend_id, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, {source_id, backend_id})

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({topic, source_id, _backend_id, _payload} = data)
when topic in @topics and is_integer(source_id) do
Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data)
end

def global_broadcast_rate({"rates" = topic, source_token, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, source_token)

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({"buffers" = topic, source_token, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, source_token)

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({"inserts" = topic, source_token, _payload} = data)
def global_broadcast_rate({topic, source_token, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, source_token)

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({topic, _source_token, _payload} = data) when topic in @topics do
Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data)
end

@doc """
The number of partitions for a paritioned child.
"""
@spec partitions() :: integer()
def partitions(), do: @partitions

@doc """
Partitions a topic for a source_token.
Partitions a topic for a key.
"""
@spec partitioned_topic(binary(), any()) :: binary()
def partitioned_topic(topic, key) when is_binary(topic) do
topic <> make_partition(key)
end

@doc """
Makes a string of a partition integer from a key.
"""
def partitioned_topic(topic, source_token) do
topic <> (:erlang.phash2(source_token, partitions()) |> Integer.to_string())
@spec make_partition(any()) :: binary()
def make_partition(key) do
:erlang.phash2(key, partitions()) |> Integer.to_string()
end
end
4 changes: 2 additions & 2 deletions lib/logflare/pubsub_rates/buffers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ defmodule Logflare.PubSubRates.Buffers do

def init(args) do
partition = get_partition_opt(args)
topic = @topic <> partition
PubSubRates.subscribe(topic)

PubSubRates.subscribe(@topic, partition)
{:ok, args}
end

Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/pubsub_rates/inserts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ defmodule Logflare.PubSubRates.Inserts do

def init(args) do
partition = get_partition_opt(args)
topic = @topic <> partition
PubSubRates.subscribe(topic)

PubSubRates.subscribe(@topic, partition)
{:ok, args}
end

Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/pubsub_rates/rates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ defmodule Logflare.PubSubRates.Rates do

def init(args) do
partition = get_partition_opt(args)
topic = @topic <> partition
PubSubRates.subscribe(topic)

PubSubRates.subscribe(@topic, partition)
{:ok, args}
end

Expand Down
9 changes: 3 additions & 6 deletions test/logflare/cluster_pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ defmodule Logflare.ClusterPubSubTest do
end

test "subscribe/1 inserts", %{source: %{token: source_token}} do
PubSubRates.partitioned_topic("inserts", source_token)
|> PubSubRates.subscribe()
PubSubRates.subscribe("inserts", PubSubRates.make_partition(source_token))

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({"inserts", source_token, %{data: "some val"}})
Expand All @@ -30,8 +29,7 @@ defmodule Logflare.ClusterPubSubTest do
end

test "subscribe/1 rates", %{source: %{token: source_token}} do
PubSubRates.partitioned_topic("rates", source_token)
|> PubSubRates.subscribe()
PubSubRates.subscribe("rates", PubSubRates.make_partition(source_token))

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({"rates", source_token, %{data: "some val"}})
Expand All @@ -42,8 +40,7 @@ defmodule Logflare.ClusterPubSubTest do
test "subscribe/1 buffers", %{source: %{id: source_id}} do
backend_id = 1

PubSubRates.partitioned_topic("buffers", {source_id, backend_id})
|> PubSubRates.subscribe()
PubSubRates.subscribe("buffers", PubSubRates.make_partition({source_id, backend_id}))

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({"buffers", source_id, backend_id, %{data: "some val"}})
Expand Down

0 comments on commit a16c182

Please sign in to comment.