Skip to content

Commit

Permalink
Add a new module that implement Pubsub behavior
Browse files Browse the repository at this point in the history
- Purpose is an experiment to decouple from Phoenix.Endpoint as
  described in absinthe-graphql#46
  • Loading branch information
treble37 committed Aug 29, 2021
1 parent fdcf5e3 commit ac0340c
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.10.0-otp-22
erlang 22.0.7
elixir 1.12.2-otp-24
erlang 24.0.3
2 changes: 2 additions & 0 deletions lib/absinthe/phoenix/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule Absinthe.Phoenix.Channel do
def join("__absinthe__:control", _, socket) do
schema = socket.assigns[:__absinthe_schema__]
pipeline = socket.assigns[:__absinthe_pipeline__]
endpoint = socket.assigns[:__absinthe_endpoint__] || socket.endpoint
socket = Map.put(socket, :endpoint, endpoint)

absinthe_config = Map.get(socket.assigns, :absinthe, %{})

Expand Down
5 changes: 4 additions & 1 deletion lib/absinthe/phoenix/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ defmodule Absinthe.Phoenix.Socket do
defmacro __using__(opts) do
schema = Keyword.get(opts, :schema)
pipeline = Keyword.get(opts, :pipeline)
# custom endpoint to decouple from Phoenix.Endpoint
endpoint = Keyword.get(opts, :endpoint)

quote do
channel(
"__absinthe__:*",
Absinthe.Phoenix.Channel,
assigns: %{
__absinthe_schema__: unquote(schema),
__absinthe_pipeline__: unquote(pipeline)
__absinthe_pipeline__: unquote(pipeline),
__absinthe_endpoint__: unquote(endpoint)
}
)
end
Expand Down
102 changes: 102 additions & 0 deletions lib/absinthe/phoenix/subscription/pubsub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule Absinthe.Phoenix.Subscription.Pubsub do
@moduledoc false
# Experimental module adding just enough functionality to help decouple from Phoenix.Endpoint

defmacro __using__(opts) do
quote do
@behaviour Absinthe.Subscription.Pubsub
@otp_app unquote(opts)[:otp_app] || raise("endpoint expects :otp_app to be given")
@before_compile unquote(__MODULE__)
end
end

defmacro __before_compile__(_) do
quote do
def node_name() do
Absinthe.Phoenix.Subscription.Pubsub.node_name(@otp_app, __MODULE__)
end

def publish_mutation(topic, mutation_result, subscribed_fields) do
Absinthe.Phoenix.Subscription.Pubsub.publish_mutation(
@otp_app,
__MODULE__,
topic,
mutation_result,
subscribed_fields
)
end

def publish_subscription(topic, data) do
Absinthe.Phoenix.Subscription.Pubsub.publish_subscription(
@otp_app,
__MODULE__,
topic,
data
)
end

def subscribe(topic, opts \\ []) when is_binary(topic) do
Absinthe.Phoenix.Subscription.Pubsub.subscribe(
Absinthe.Phoenix.Subscription.Pubsub.pubsub(@otp_app, __MODULE__),
topic,
opts
)
end
end
end

@doc false
def node_name(otp_app, endpoint) do
pubsub = pubsub(otp_app, endpoint)

Phoenix.PubSub.node_name(pubsub)
end

@doc false
# when publishing subscription results we only care about
# publishing to the local node. Each node manages and runs documents separately
# so there's no point in pushing out the results to other nodes.
def publish_subscription(otp_app, endpoint, topic, result) do
pubsub = pubsub(otp_app, endpoint)

broadcast = %Phoenix.Socket.Broadcast{
topic: topic,
event: "subscription:data",
payload: %{result: result, subscriptionId: topic}
}

Phoenix.PubSub.local_broadcast(pubsub, topic, broadcast, Phoenix.Channel.Server)
end

@doc false
def publish_mutation(otp_app, endpoint, proxy_topic, mutation_result, subscribed_fields) do
pubsub = pubsub(otp_app, endpoint)

# we need to include the current node as part of the broadcast.
# This is because this broadcast will also be picked up by proxies within the
# current node, and they need to be able to ignore this message.
message = %{
node: node_name(otp_app, endpoint),
subscribed_fields: subscribed_fields,
mutation_result: mutation_result
}

Phoenix.PubSub.broadcast(pubsub, proxy_topic, message, Phoenix.Channel.Server)
end

@doc false
def subscribe(pubsub_server, topic, opts \\ []) when is_binary(topic) do
Phoenix.PubSub.subscribe(pubsub_server, topic, opts)
end

def pubsub(otp_app, endpoint) do
pubsub =
Application.get_env(otp_app, endpoint)[:pubsub][:name] ||
Application.get_env(otp_app, endpoint)[:pubsub_server]

pubsub ||
raise """
Pubsub needs to be configured for #{inspect(otp_app)} #{inspect(endpoint)}!
"""
end
end

0 comments on commit ac0340c

Please sign in to comment.