Skip to content

Commit

Permalink
fix: endpoint cache tweaks (#2241)
Browse files Browse the repository at this point in the history
* fix: bigquery queries timeout at 25 seconds now

* feat: another async Tasks util

* fix: Endpoints.Cache tweaks
  • Loading branch information
chasers authored Oct 28, 2024
1 parent ba53ca3 commit 064837a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
2 changes: 1 addition & 1 deletion lib/logflare/ecto/bigquery/bq_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Logflare.BqRepo do
import Logflare.TypeCasts
require Logger

@query_request_timeout 60_000
@query_request_timeout 25_000
@use_query_cache true
@type results :: %{
:rows => nil | [term()],
Expand Down
28 changes: 18 additions & 10 deletions lib/logflare/endpoints/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ defmodule Logflare.Endpoints.Cache do
"""

require Logger

alias Logflare.Endpoints
alias Logflare.Utils.Tasks

use GenServer, restart: :temporary

defstruct query: nil,
Expand All @@ -26,11 +29,13 @@ defmodule Logflare.Endpoints.Cache do
}

def start_link({query, params}) do
GenServer.start_link(__MODULE__, {query, params})
GenServer.start_link(__MODULE__, {query, params},
name: {:via, :syn, {:endpoints, {query.id, params}}}
)
end

@doc """
Initiate a query. Times out at 30 seconds. BigQuery should also timeout at 60 seconds.
Initiate a query. Times out at 30 seconds. BigQuery should also timeout at 25 seconds.
We have a %GoogleApi.BigQuery.V2.Model.ErrorProto{} model but it's missing fields we see in error responses.
"""
def query(cache) when is_pid(cache) do
Expand Down Expand Up @@ -88,6 +93,8 @@ defmodule Logflare.Endpoints.Cache do
end

def init({query, params}) do
:syn.join(:endpoints, query.id, self())

state =
%__MODULE__{
query: query,
Expand All @@ -100,11 +107,6 @@ defmodule Logflare.Endpoints.Cache do

unless state.disable_cache, do: refresh(proactive_querying_ms(state))

# register on syn
pid = self()
:syn.register(:endpoints, {query.id, params}, pid)
:syn.join(:endpoints, query.id, pid)

{:ok, state}
end

Expand Down Expand Up @@ -154,14 +156,20 @@ defmodule Logflare.Endpoints.Cache do
end

def handle_info(:refresh, state) do
task = Task.async(__MODULE__, :do_query, [state])
task = Tasks.async(__MODULE__, :do_query, [state])
tasks = [task | state.query_tasks]

running = Enum.count(tasks)

if running > 1,
do: Logger.warning("CacheTaskError: #{running} Endpoints.Cache tasks are running")

{:noreply, %{state | query_tasks: tasks}}
end

def handle_info({_from_task, {:ok, results}}, state) do
{:noreply, %{state | cached_result: results}}
def handle_info({from_task, {:ok, results}}, state) do
tasks = Enum.reject(state.query_tasks, &(&1.pid == from_task))
{:noreply, %{state | cached_result: results, query_tasks: tasks}}
end

def handle_info({_from_task, {:error, _response}}, state) do
Expand Down
14 changes: 14 additions & 0 deletions lib/logflare/utils/tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ defmodule Logflare.Utils.Tasks do
@doc """
Linked to caller, linked to supervisor
"""
def async(mod, fun, args, opts \\ []) do
Task.Supervisor.async(
{:via, PartitionSupervisor, {Logflare.TaskSupervisors, self()}},
mod,
fun,
args,
opts
)
end

@doc """
Linked to caller, linked to supervisor
"""
@spec async((-> any())) :: Task.t()
def async(func, opts \\ []) do
Task.Supervisor.async(
{:via, PartitionSupervisor, {Logflare.TaskSupervisors, self()}},
Expand Down

0 comments on commit 064837a

Please sign in to comment.