diff --git a/README.md b/README.md index 6573499..f06c7b3 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Weddell is an Elixir client for [Google Pub/Sub](https://cloud.google.com/pubsub ```elixir def deps do [ - {:weddell, "~> 0.3"}, + {:weddell, "~> 0.4"}, {:goth, "~> 0.11"}, ] end @@ -29,6 +29,10 @@ config :goth, json: {:system, "GCP_CREDENTIALS_JSON"} ``` +By default Weddell will start a client and connect on application start. +This can be disabled by setting `:no_connect_on_start` in the application config. +Clients can then be started with `Weddell.Client.start_link/3`. + ## Getting Started ### Creating a topic and subscription diff --git a/lib/weddell.ex b/lib/weddell.ex index 5cc5e0d..4b48d42 100644 --- a/lib/weddell.ex +++ b/lib/weddell.ex @@ -5,20 +5,23 @@ defmodule Weddell do use Application alias GRPC.RPCError - alias Weddell.{Message, - Client, - Client.Publisher, - SubscriptionDetails} + alias Weddell.{Message, Client, Client.Publisher, SubscriptionDetails} @typedoc "An RPC error" - @type error :: {:error, RPCError.t} + @type error :: {:error, RPCError.t()} @doc """ Start Weddell and connect to the Pub/Sub server. """ def start(_type, _args) do import Supervisor.Spec - children = [worker(Client, [])] + + children = + case Application.get_env(:weddell, :no_connect_on_start, false) do + true -> [] + false -> [worker(Client, [])] + end + opts = [strategy: :one_for_one, name: __MODULE__] Supervisor.start_link(children, opts) end @@ -26,9 +29,9 @@ defmodule Weddell do @doc """ Return the client currently connected to Pub/Sub. """ - @spec client(timeout :: integer()) :: Client.t + @spec client(timeout :: integer()) :: Client.t() def client(timeout \\ 5000) do - GenServer.call(Weddell.Client, {:client}, timeout) + Weddell.Client.client(Weddell.Client, timeout) end @doc """ @@ -38,9 +41,9 @@ defmodule Weddell do Weddell.create_topic("foo") #=> :ok """ - @spec create_topic(topic_name :: String.t, timeout :: integer()) :: :ok | error + @spec create_topic(topic_name :: String.t(), timeout :: integer()) :: :ok | error def create_topic(name, timeout \\ 5000) do - GenServer.call(Weddell.Client, {:create_topic, name}, timeout) + Weddell.Client.create_topic(Weddell.Client, name, timeout) end @doc """ @@ -50,9 +53,9 @@ defmodule Weddell do Weddell.delete_topic("foo") #=> :ok """ - @spec delete_topic(topic_name :: String.t, timeout :: integer()) :: :ok | error + @spec delete_topic(topic_name :: String.t(), timeout :: integer()) :: :ok | error def delete_topic(name, timeout \\ 5000) do - GenServer.call(Weddell.Client, {:delete_topic, name}, timeout) + Weddell.Client.delete_topic(Weddell.Client, name, timeout) end @doc """ @@ -76,12 +79,12 @@ defmodule Weddell do * `:cursor` - List topics starting at a cursor returned by an earlier call. _(default: nil)_ """ - @spec topics(opts :: Client.list_options, timeout :: integer()) :: - {:ok, topic_names :: [String.t]} | - {:ok, topic_names :: [String.t], Client.cursor} | - error + @spec topics(opts :: Client.list_options(), timeout :: integer()) :: + {:ok, topic_names :: [String.t()]} + | {:ok, topic_names :: [String.t()], Client.cursor()} + | error def topics(opts \\ [], timeout \\ 5000) do - GenServer.call(Weddell.Client, {:topics, opts}, timeout) + Weddell.Client.topics(Weddell.Client, opts, timeout) end @doc """ @@ -104,13 +107,15 @@ defmodule Weddell do to the specified URL. For example, a Webhook endpoint might use "https://example.com/push". _(default: nil)_ """ - @spec create_subscription(subscription_name :: String.t, - topic_name :: String.t, - Client.subscription_options, - timeout :: integer()) :: - :ok | error + @spec create_subscription( + subscription_name :: String.t(), + topic_name :: String.t(), + Client.subscription_options(), + timeout :: integer() + ) :: + :ok | error def create_subscription(name, topic, opts \\ [], timeout \\ 5000) do - GenServer.call(Weddell.Client, {:create_subscription, name, topic, opts}, timeout) + Weddell.Client.create_subscription(Weddell.Client, name, topic, opts, timeout) end @doc """ @@ -120,10 +125,10 @@ defmodule Weddell do Weddell.delete_subscription("foo") #=> :ok """ - @spec delete_subscription(subscription_name :: String.t, timeout :: integer()) :: - :ok | error + @spec delete_subscription(subscription_name :: String.t(), timeout :: integer()) :: + :ok | error def delete_subscription(name, timeout \\ 5000) do - GenServer.call(Weddell.Client, {:delete_subscription, name}, timeout) + Weddell.Client.delete_subscription(Weddell.Client, name, timeout) end @doc """ @@ -147,12 +152,12 @@ defmodule Weddell do * `:cursor` - List subscriptions starting at a cursor returned by an earlier call. _(default: nil)_ """ - @spec subscriptions(opts :: Client.list_options, timeout :: integer()) :: - {:ok, subscriptions :: [SubscriptionDetails.t]} | - {:ok, subscriptions :: [SubscriptionDetails.t], Client.cursor} | - error + @spec subscriptions(opts :: Client.list_options(), timeout :: integer()) :: + {:ok, subscriptions :: [SubscriptionDetails.t()]} + | {:ok, subscriptions :: [SubscriptionDetails.t()], Client.cursor()} + | error def subscriptions(opts \\ [], timeout \\ 5000) do - GenServer.call(Weddell.Client, {:subscriptions, opts}, timeout) + Weddell.Client.subscriptions(Weddell.Client, opts, timeout) end @doc """ @@ -176,14 +181,16 @@ defmodule Weddell do * `:cursor` - List subscriptions starting at a cursor returned by an earlier call. _(default: nil)_ """ - @spec topic_subscriptions(topic :: String.t, - opts :: Client.list_options, - timeout :: integer()) :: - {:ok, subscriptions :: [String.t]} | - {:ok, subscriptions :: [String.t], Client.cursor} | - error + @spec topic_subscriptions( + topic :: String.t(), + opts :: Client.list_options(), + timeout :: integer() + ) :: + {:ok, subscriptions :: [String.t()]} + | {:ok, subscriptions :: [String.t()], Client.cursor()} + | error def topic_subscriptions(topic, opts \\ [], timeout \\ 5000) do - GenServer.call(Weddell.Client, {:topic_subscriptions, topic, opts}, timeout) + Weddell.Client.topic_subscriptions(Weddell.Client, topic, opts, timeout) end @doc """ @@ -216,12 +223,14 @@ defmodule Weddell do |> Weddell.publish("foo-topic") """ - @spec publish(Publisher.new_message | [Publisher.new_message], - topic_name :: String.t, - timeout :: integer()) :: - :ok | error + @spec publish( + Publisher.new_message() | [Publisher.new_message()], + topic_name :: String.t(), + timeout :: integer() + ) :: + :ok | error def publish(messages, topic, timeout \\ 5000) do - GenServer.call(Weddell.Client, {:publish, messages, topic}, timeout) + Weddell.Client.publish(Weddell.Client, messages, topic, timeout) end @doc """ @@ -241,11 +250,10 @@ defmodule Weddell do * `:max_messages` - The maximum number of messages to be returned, it may be fewer. _(default: 10)_ """ - @spec pull(subscription_name :: String.t, Client.pull_options, - timeout :: integer()) :: - {:ok, messages :: [Message.t]} | error + @spec pull(subscription_name :: String.t(), Client.pull_options(), timeout :: integer()) :: + {:ok, messages :: [Message.t()]} | error def pull(subscription, opts \\ [], timeout \\ 5000) do - GenServer.call(Weddell.Client, {:pull, subscription, opts}, timeout) + Weddell.Client.pull(Weddell.Client, subscription, opts, timeout) end @doc """ @@ -257,11 +265,13 @@ defmodule Weddell do Weddell.acknowledge(messages, "foo-subscription") #=> :ok """ - @spec acknowledge(messages :: [Message.t] | Message.t, - subscription_name :: String.t, - timeout :: integer()) :: - :ok | error + @spec acknowledge( + messages :: [Message.t()] | Message.t(), + subscription_name :: String.t(), + timeout :: integer() + ) :: + :ok | error def acknowledge(messages, subscription, timeout \\ 5000) do - GenServer.call(Weddell.Client, {:acknowledge, messages, subscription}, timeout) + Weddell.Client.acknowledge(Weddell.Client, messages, subscription, timeout) end end diff --git a/lib/weddell/client.ex b/lib/weddell/client.ex index cbbd7f1..311ea41 100644 --- a/lib/weddell/client.ex +++ b/lib/weddell/client.ex @@ -6,61 +6,62 @@ defmodule Weddell.Client do """ use GenServer - alias GRPC.{Stub, - RPCError} - alias Weddell.Client.{Publisher, - Subscriber} + alias GRPC.{Stub, RPCError} + alias Weddell.Client.{Publisher, Subscriber} @default_host "pubsub.googleapis.com" @default_port 443 @typedoc "A Weddell client" - @type t :: %__MODULE__{channel: GRPC.Channel.t, - project: String.t} + @type t :: %__MODULE__{channel: GRPC.Channel.t(), project: String.t()} defstruct [:channel, :project] @typedoc "An RPC error" - @type error :: {:error, RPCError.t} + @type error :: {:error, RPCError.t()} @typedoc "Option value used when connecting a client" - @type connect_option :: {:host, String.t} | - {:port, pos_integer} | - {:scheme, :http | :https} | - {:ssl, [:ssl.ssl_option]} + @type connect_option :: + {:host, String.t()} + | {:port, pos_integer} + | {:scheme, :http | :https} + | {:ssl, [:ssl.ssl_option()]} @typedoc "Option values used when connecting clients" @type connect_options :: [connect_option] @typedoc "Option values used when creating a subscription" - @type subscription_option :: {:ack_deadline, pos_integer} | - {:push_endpoint, String.t} + @type subscription_option :: + {:ack_deadline, pos_integer} + | {:push_endpoint, String.t()} @typedoc "Options used when creating a subscription" @type subscription_options :: [subscription_option] @typedoc "Option value used when retrieving lists" - @type list_option :: {:max, pos_integer} | - {:cursor, cursor} + @type list_option :: + {:max, pos_integer} + | {:cursor, cursor} @typedoc "Option values used when retrieving lists" @type list_options :: [list_option] @typedoc "Option values used when pulling messages" - @type pull_option :: {:return_immediately, boolean} | - {:max_messages, pos_integer} + @type pull_option :: + {:return_immediately, boolean} + | {:max_messages, pos_integer} @typedoc "Options used when pulling messages" @type pull_options :: [pull_option] @typedoc "Option values used when publishing messages" - @type publish_option :: {:attributes, %{optional(String.t) => String.t}} + @type publish_option :: {:attributes, %{optional(String.t()) => String.t()}} @typedoc "Options used when publishing messages" @type publish_options :: [publish_option] @typedoc "A cursor used for pagination of lists" - @type cursor :: String.t + @type cursor :: String.t() @doc """ Start the client process and connect to Pub/Sub using settings in the application config. @@ -85,14 +86,126 @@ defmodule Weddell.Client do * `ssl` - SSL settings to be used when connecting with the `:https` scheme. See `ssl_option()` in the [ssl documentation] (http://erlang.org/doc/man/ssl.html). _(default: [:cacerts: :certifi.cacerts()])_ + * `no_connect_on_start` - By default Weddell will start a client and connect on application start. + When `true` a client will not be started. Clients can then be started with `Weddell.Client.start_link/3`. _(default: false)_ """ def start_link do - GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + project = Application.get_env(:weddell, :project) + options = Application.get_all_env(:weddell) + start_link(project, options, name: __MODULE__) end - def init(:ok) do - connect(Application.get_env(:weddell, :project), - Application.get_all_env(:weddell)) + def start_link(project, options, gen_server_options \\ []) do + GenServer.start_link(__MODULE__, [project, options], gen_server_options) + end + + @spec client(server :: GenServer.server(), timeout :: integer()) :: Client.t() + def client(server, timeout \\ 5000) do + GenServer.call(server, {:client}, timeout) + end + + @spec create_topic(server :: GenServer.server(), topic_name :: String.t(), timeout :: integer()) :: + :ok | error + def create_topic(server, name, timeout \\ 5000) do + GenServer.call(server, {:create_topic, name}, timeout) + end + + @spec delete_topic(server :: GenServer.server(), topic_name :: String.t(), timeout :: integer()) :: + :ok | error + def delete_topic(server, name, timeout \\ 5000) do + GenServer.call(server, {:delete_topic, name}, timeout) + end + + @spec topics(server :: GenServer.server(), opts :: Client.list_options(), timeout :: integer()) :: + {:ok, topic_names :: [String.t()]} + | {:ok, topic_names :: [String.t()], Client.cursor()} + | error + def topics(server, opts \\ [], timeout \\ 5000) do + GenServer.call(server, {:topics, opts}, timeout) + end + + @spec create_subscription( + server :: GenServer.server(), + subscription_name :: String.t(), + topic_name :: String.t(), + Client.subscription_options(), + timeout :: integer() + ) :: + :ok | error + def create_subscription(server, name, topic, opts \\ [], timeout \\ 5000) do + GenServer.call(server, {:create_subscription, name, topic, opts}, timeout) + end + + @spec delete_subscription( + server :: GenServer.server(), + subscription_name :: String.t(), + timeout :: integer() + ) :: + :ok | error + def delete_subscription(server, name, timeout \\ 5000) do + GenServer.call(server, {:delete_subscription, name}, timeout) + end + + @spec subscriptions( + server :: GenServer.server(), + opts :: Client.list_options(), + timeout :: integer() + ) :: + {:ok, subscriptions :: [SubscriptionDetails.t()]} + | {:ok, subscriptions :: [SubscriptionDetails.t()], Client.cursor()} + | error + def subscriptions(server, opts \\ [], timeout \\ 5000) do + GenServer.call(server, {:subscriptions, opts}, timeout) + end + + @spec topic_subscriptions( + server :: GenServer.server(), + topic :: String.t(), + opts :: Client.list_options(), + timeout :: integer() + ) :: + {:ok, subscriptions :: [String.t()]} + | {:ok, subscriptions :: [String.t()], Client.cursor()} + | error + def topic_subscriptions(server, topic, opts \\ [], timeout \\ 5000) do + GenServer.call(server, {:topic_subscriptions, topic, opts}, timeout) + end + + @spec publish( + server :: GenServer.server(), + Publisher.new_message() | [Publisher.new_message()], + topic_name :: String.t(), + timeout :: integer() + ) :: + :ok | error + def publish(server, messages, topic, timeout \\ 5000) do + GenServer.call(server, {:publish, messages, topic}, timeout) + end + + @spec pull( + server :: GenServer.server(), + subscription_name :: String.t(), + Client.pull_options(), + timeout :: integer() + ) :: + {:ok, messages :: [Message.t()]} | error + def pull(server, subscription, opts \\ [], timeout \\ 5000) do + GenServer.call(server, {:pull, subscription, opts}, timeout) + end + + @spec acknowledge( + server :: GenServer.server(), + messages :: [Message.t()] | Message.t(), + subscription_name :: String.t(), + timeout :: integer() + ) :: + :ok | error + def acknowledge(server, messages, subscription, timeout \\ 5000) do + GenServer.call(server, {:acknowledge, messages, subscription}, timeout) + end + + def init([project, options]) do + connect(project, options) end @doc """ @@ -117,23 +230,27 @@ defmodule Weddell.Client do in the [ssl documentation](http://erlang.org/doc/man/ssl.html). _(default: [cacerts: :certifi.cacerts()])_ """ - @spec connect(project :: String.t, opts :: connect_options) :: {:ok, t} + @spec connect(project :: String.t(), opts :: connect_options) :: {:ok, t} def connect(project, opts \\ []) do scheme = Keyword.get(opts, :scheme, :https) ssl = ssl_opts(opts) cred = if scheme == :https, do: GRPC.Credential.new(ssl: ssl), else: nil host = Keyword.get(opts, :host, @default_host) port = Keyword.get(opts, :port, @default_port) + {:ok, channel} = - Stub.connect("#{host}:#{port}", cred: cred, adapter_opts: %{ - http2_opts: %{keepalive: :infinity} - }) - {:ok, %__MODULE__{channel: channel, - project: project}} + Stub.connect("#{host}:#{port}", + cred: cred, + adapter_opts: %{ + http2_opts: %{keepalive: :infinity} + } + ) + + {:ok, %__MODULE__{channel: channel, project: project}} end @doc false - @spec request_opts() :: Keyword.t + @spec request_opts() :: Keyword.t() def request_opts(extra_opts \\ []) do [metadata: auth_header(), content_type: "application/grpc"] |> Enum.concat(extra_opts) @@ -144,24 +261,34 @@ defmodule Weddell.Client do case request do {:create_topic, name} -> {:reply, Publisher.create_topic(client, name), client} + {:delete_topic, name} -> {:reply, Publisher.delete_topic(client, name), client} + {:topics, opts} -> {:reply, Publisher.topics(client, opts), client} + {:publish, messages, topic} -> {:reply, Publisher.publish(client, messages, topic), client} + {:topic_subscriptions, topic, opts} -> {:reply, Publisher.topic_subscriptions(client, topic, opts), client} + {:create_subscription, name, topic, opts} -> {:reply, Subscriber.create_subscription(client, name, topic, opts), client} + {:delete_subscription, name} -> {:reply, Subscriber.delete_subscription(client, name), client} + {:subscriptions, opts} -> {:reply, Subscriber.subscriptions(client, opts), client} + {:pull, subscription, opts} -> {:reply, Subscriber.pull(client, subscription, opts), client} + {:acknowledge, messages, subscription} -> {:reply, Subscriber.acknowledge(client, messages, subscription), client} + {:client} -> {:reply, client, client} end @@ -177,6 +304,7 @@ defmodule Weddell.Client do case Goth.Token.for_scope("https://www.googleapis.com/auth/pubsub") do {:ok, %{token: token, type: token_type}} -> %{"authorization" => "#{token_type} #{token}"} + _ -> %{} end @@ -193,6 +321,7 @@ defmodule Weddell.Client do defp ssl_opts(opts) do default_opts = [cacerts: :certifi.cacerts()] ssl_opts = Keyword.get(opts, :ssl, []) + default_opts |> Keyword.merge(ssl_opts) end diff --git a/mix.exs b/mix.exs index 3f740f9..d0c3d56 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Weddell.Mixfile do use Mix.Project - @version "0.3.0" + @version "0.4.0" def project do [ diff --git a/test/weddell/client/client_test.exs b/test/weddell/client/client_test.exs index 74d2a4e..b8a2933 100644 --- a/test/weddell/client/client_test.exs +++ b/test/weddell/client/client_test.exs @@ -1,2 +1,65 @@ defmodule Weddell.ClientTest do + use ExUnit.Case + + alias Weddell.{PublisherStubMock, SubscriberStubMock} + + setup do + Application.stop(:weddell) + Application.put_env(:weddell, :publisher_stub, PublisherStubMock) + Application.put_env(:weddell, :subscriber_stub, SubscriberStubMock) + Application.put_env(:weddell, :project, "weddell") + Application.delete_env(:weddell, :no_connect_on_start) + end + + describe "Weddell application" do + test "starts with a client by default" do + nil = GenServer.whereis(Weddell.Client) + Application.start(:weddell) + pid = wait_for_server(Weddell.Client, 100) + true = is_pid(pid) + %Weddell.Client{} = Weddell.client() + end + + test "starts without a client if :no_connect_on_start is set" do + Application.put_env(:weddell, :no_connect_on_start, true) + nil = GenServer.whereis(Weddell.Client) + Application.start(:weddell) + # assert_raise + nil = wait_for_server(Weddell.Client, 100) + assert {:noproc, _} = catch_exit(Weddell.client()) + nil = GenServer.whereis(Weddell.Client) + end + + test "client can be started separately" do + Application.put_env(:weddell, :no_connect_on_start, true) + nil = GenServer.whereis(Weddell.Client) + Application.start(:weddell) + + {:ok, pid} = + Weddell.Client.start_link( + "myproject", + Application.get_all_env(:weddell), + name: :my_client + ) + + true = is_pid(pid) + + %Weddell.Client{} = GenServer.call(:my_client, {:client}) + end + end + + defp wait_for_server(server, 0) do + nil + end + + defp wait_for_server(server, timeout) do + case GenServer.whereis(server) do + nil -> + :timer.sleep(10) + wait_for_server(server, timeout - 10) + + pid -> + pid + end + end end