diff --git a/config/config.exs b/config/config.exs index 53b772203..6b22f02ed 100644 --- a/config/config.exs +++ b/config/config.exs @@ -57,6 +57,7 @@ config :ueberauth, Ueberauth, config :phoenix, :json_library, Jason config :postgrex, :json_library, Jason +config :syn, scopes: [:endpoints] oauth_common = [ repo: Logflare.Repo, diff --git a/lib/logflare/backends/recent_logs.ex b/lib/logflare/backends/recent_logs.ex deleted file mode 100644 index 764e2f7fa..000000000 --- a/lib/logflare/backends/recent_logs.ex +++ /dev/null @@ -1,76 +0,0 @@ -defmodule Logflare.Backends.RecentLogs do - @moduledoc false - @doc """ - A global cache of latest 100 logs. One per cluster. - - Listens to the cluster-wide recent_logs channel topic. - - Periodically does a cluster sync to ensure all local caches are syncronised. - """ - alias Logflare.{Source, Backends, LogEvent} - use TypedStruct - use GenServer - - typedstruct enforce: true do - field :source, Source.t() - field :data, list(LogEvent.t()) - end - - def start_link(%Source{} = source) do - GenServer.start_link(__MODULE__, source, name: get_global_name(source)) - end - - def init(source) do - {:ok, %__MODULE__{source: source, data: []}} - end - - # Convenience function to retrieve the globally registered name of a source's RecentLogs process. - defp get_global_name(%Source{} = source) do - {:global, Backends.via_source(source, __MODULE__)} - end - - @doc """ - Convenience function to retrieve the globally registered pid of a source's RecentLogs process. - """ - @spec get_pid(Source.t()) :: pid() | nil - def get_pid(%Source{} = source) do - Backends.via_source(source, __MODULE__) - |> :global.whereis_name() - |> case do - :undefined -> nil - pid -> pid - end - end - - @doc """ - Pushes events into the cache, and removes older events. - """ - @spec push(pid(), list(LogEvent.t())) :: :ok - def push(pid, events) when is_list(events) do - GenServer.cast(pid, {:push, events}) - end - - @doc """ - Returns the list of cached log events, sorted. - """ - @spec list(pid()) :: list(LogEvent.t()) - def list(pid) do - GenServer.call(pid, :list) - end - - # GENSERVER CALLBACKs - - def handle_cast({:push, log_events}, state) when is_list(log_events) do - sorted = Enum.sort_by(log_events, & &1.body["timestamp"]) - - data = - (sorted ++ state.data) - |> Enum.take(100) - - {:noreply, %{state | data: data}} - end - - def handle_call(:list, _from, state) do - {:reply, state.data, state} - end -end diff --git a/lib/logflare/endpoints.ex b/lib/logflare/endpoints.ex index 0155fe45d..e1a4b581c 100644 --- a/lib/logflare/endpoints.ex +++ b/lib/logflare/endpoints.ex @@ -103,7 +103,7 @@ defmodule Logflare.Endpoints do if should_kill_caches? do # kill all caches - for pid <- Resolver.resolve(endpoint) do + for pid <- Resolver.list_caches(endpoint) do Utils.Tasks.async(fn -> Cache.invalidate(pid) end) @@ -356,7 +356,7 @@ defmodule Logflare.Endpoints do end def calculate_endpoint_metrics(%Query{} = endpoint) do - cache_count = endpoint |> Resolver.resolve() |> length() + cache_count = endpoint |> Resolver.list_caches() |> length() %{ endpoint diff --git a/lib/logflare/endpoints/cache.ex b/lib/logflare/endpoints/cache.ex index fb249ca00..0e9ff1a21 100644 --- a/lib/logflare/endpoints/cache.ex +++ b/lib/logflare/endpoints/cache.ex @@ -26,9 +26,7 @@ defmodule Logflare.Endpoints.Cache do } def start_link({query, params}) do - GenServer.start_link(__MODULE__, {query, params}, - name: {:global, {__MODULE__, query.id, params}} - ) + GenServer.start_link(__MODULE__, {query, params}) end @doc """ @@ -102,6 +100,11 @@ defmodule Logflare.Endpoints.Cache do unless state.disable_cache, do: refresh(proactive_querying_ms(state)) + # register on syn + pid = self() + :syn.register(:endpoints, {query.id, params}, pid) + :syn.join(:endpoints, query.id, pid) + {:ok, state} end diff --git a/lib/logflare/endpoints/resolver.ex b/lib/logflare/endpoints/resolver.ex index cb20d4192..3d4a296d7 100644 --- a/lib/logflare/endpoints/resolver.ex +++ b/lib/logflare/endpoints/resolver.ex @@ -4,35 +4,35 @@ defmodule Logflare.Endpoints.Resolver do """ alias Logflare.Endpoints.Cache - def resolve(%Logflare.Endpoints.Query{id: id}) do - :global.registered_names() - |> Enum.filter(fn - {Cache, ^id, _} -> true - _ -> false - end) - |> Enum.map(&:global.whereis_name/1) + require Logger + + @doc """ + Lists all caches for an endpoint + """ + def list_caches(%Logflare.Endpoints.Query{id: id}) do + :syn.members(:endpoints, id) + |> Enum.map(fn {pid, _} -> pid end) end + @doc """ + Starts up or performs a lookup for an Endpoint.Cache process. + Returns the resolved pid. + """ def resolve(%Logflare.Endpoints.Query{id: id} = query, params) do - :global.set_lock({Cache, {id, params}}) - - result = - case :global.whereis_name({Cache, id, params}) do - :undefined -> - spec = {Cache, {query, params}} - - case DynamicSupervisor.start_child(Cache, spec) do - {:ok, pid} -> pid - {:error, {:already_started, pid}} -> pid - end - - pid -> - Cache.touch(pid) - pid - end + :syn.lookup(:endpoints, {id, params}) + |> case do + {pid, _} when is_pid(pid) -> + Cache.touch(pid) + pid - :global.del_lock({Cache, {id, params}}) + _ -> + spec = {Cache, {query, params}} + Logger.debug("Starting up Endpoint.Cache for Endpoint.Query id=#{id}", endpoint_id: id) - result + case DynamicSupervisor.start_child(Cache, spec) do + {:ok, pid} -> pid + {:error, {:already_started, pid}} -> pid + end + end end end diff --git a/mix.exs b/mix.exs index c35256e18..82c5ebe45 100644 --- a/mix.exs +++ b/mix.exs @@ -123,6 +123,7 @@ defmodule Logflare.Mixfile do # Concurrency and pipelines {:broadway, "~> 1.0.6"}, + {:syn, "~> 3.3"}, # Test {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 8732544d0..942edd817 100644 --- a/mix.lock +++ b/mix.lock @@ -123,12 +123,13 @@ "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "stripity_stripe": {:hex, :stripity_stripe, "2.9.0", "241f49bb653a2bf1b94744264c885d4e29331fbffa3334db35500cac3af05ca9", [:mix], [{:hackney, "~> 1.15", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:uri_query, "~> 0.1.2", [hex: :uri_query, repo: "hexpm", optional: false]}], "hexpm", "22702f51b949a4897922a5b622fdb4651c081d8ca82e209359a693e34a50cb50"}, "swoosh": {:hex, :swoosh, "0.25.6", "5a7db75f0c206c65d31c9da056234fe98e29835668b04b7a2bf7839e626da88f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "5d97f8f183e50d812b37d4d27db3e393996a335004dee6477c0aff5195d5f52b"}, + "syn": {:hex, :syn, "3.3.0", "4684a909efdfea35ce75a9662fc523e4a8a4e8169a3df275e4de4fa63f99c486", [:rebar3], [], "hexpm", "e58ee447bc1094bdd21bf0acc102b1fbf99541a508cd48060bf783c245eaf7d6"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, "telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"}, "tesla": {:hex, :tesla, "1.9.0", "8c22db6a826e56a087eeb8cdef56889731287f53feeb3f361dec5d4c8efb6f14", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "7c240c67e855f7e63e795bf16d6b3f5115a81d1f44b7fe4eadbf656bae0fef8a"}, - "thousand_island": {:hex, :thousand_island, "1.2.0", "4f548ae771ab5f96bc7e199f9824c0c2ce6d365f8c93f5f64dbbb33988e484bf", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "521671fea179672addb6af46455fc2a77be1edda4c0ed351633e0ef37a4b3584"}, "test_server": {:hex, :test_server, "0.1.16", "403d6cebaa7ad1d08c0ca9475af48836c45ae0f64cda9f4e88b0f17310c5c452", [:mix], [{:bandit, ">= 1.4.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 2.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:x509, "~> 0.6", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "f27cfc858d494e4df413a962fbe4d7aa3daf1a3824307f0b046bb8cec6ac8e42"}, + "thousand_island": {:hex, :thousand_island, "1.2.0", "4f548ae771ab5f96bc7e199f9824c0c2ce6d365f8c93f5f64dbbb33988e484bf", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "521671fea179672addb6af46455fc2a77be1edda4c0ed351633e0ef37a4b3584"}, "timber_logfmt": {:git, "https://github.com/Logflare/logfmt-elixir.git", "9766367ccb3014b47b0a621a489261011dcea769", []}, "timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},