Skip to content

Commit

Permalink
Breakdown of jobs by state (#63)
Browse files Browse the repository at this point in the history
Co-authored-by: Marty Zalega <marty@zalega.me>
  • Loading branch information
egze and evilmarty authored Nov 11, 2024
1 parent a0c527b commit e52ca44
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 70 deletions.
4 changes: 3 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
# Need to increase it because of <.label_value_list /> that does a <pre> tag, which will look broken with extra whitespace.
heex_line_length: 300
]
234 changes: 168 additions & 66 deletions lib/oban/live_dashboard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,41 @@ defmodule Oban.LiveDashboard do
import Phoenix.LiveDashboard.Helpers, only: [format_value: 2]
import Ecto.Query

@impl true
def menu_link(_, _) do
{:ok, "Oban"}
end
@per_page_limits [20, 50, 100]

@oban_sorted_job_states [
"executing",
"available",
"scheduled",
"retryable",
"cancelled",
"discarded",
"completed"
]

@impl true
def render(assigns) do
~H"""
<.live_table
id="oban_jobs"
dom_id="oban-jobs"
page={@page}
row_attrs={&row_attrs/1}
row_fetcher={&fetch_jobs/2}
title="Oban Jobs"
search={false}
>
<:col field={:id} header="ID" sortable={:desc} />
<:col field={:state} sortable={:desc} />
<:col field={:queue} sortable={:desc} />
<:col field={:worker} sortable={:desc} />
<:col :let={job} field={:attempt} header="Attempts" sortable={:desc}>
<%= job.attempt %>/<%= job.max_attempts %>
</:col>
<:col :let={job} field={:inserted_at} sortable={:desc}>
<%= format_value(job.inserted_at) %>
</:col>
<:col :let={job} field={:scheduled_at} sortable={:desc}>
<%= format_value(job.scheduled_at) %>
</:col>
</.live_table>
<.live_modal
:if={@job != nil}
id="modal"
title="Job"
return_to={live_dashboard_path(@socket, @page, params: %{})}
>
<h5 class="mb-3">Oban</h5>
<.live_nav_bar id="oban_states" page={@page} nav_param="job_state" style={:bar} extra_params={["nav"]}>
<:item :for={{job_state, count} <- @job_state_counts} name={job_state} label={job_state_label(job_state, count)} method="navigate">
<.live_table id="oban_jobs" limit={per_page_limits()} dom_id={"oban-jobs-#{job_state}"} page={@page} row_attrs={&row_attrs/1} row_fetcher={&fetch_jobs(&1, &2, job_state)} default_sort_by={@timestamp_field} title="" search={false}>
<:col :let={job} field={:worker} sortable={:desc}>
<p class="font-weight-bold m-0"><%= job.worker %></p>
<pre class="font-weight-lighter text-muted m-0"><%= truncate(inspect(job.args)) %></pre>
</:col>
<:col :let={job} field={:attempt} header="Attempt" sortable={:desc}>
<%= job.attempt %>/<%= job.max_attempts %>
</:col>
<:col field={:queue} header="Queue" sortable={:desc} />
<:col :let={job} field={@timestamp_field} sortable={:desc}>
<%= format_value(timestamp(job, @timestamp_field)) %>
</:col>
</.live_table>
</:item>
</.live_nav_bar>
<.live_modal :if={@job != nil} id="job-modal" title={"Job - #{@job.id}"} return_to={live_dashboard_path(@socket, @page, params: %{})}>
<div class="mb-4 btn-toolbar" role="toolbar" aria-label="Oban Job actions">
<div class="btn-group" role="group">
<button type="button" class="btn btn-primary btn-sm mr-2" phx-click="run_job" phx-value-job={@job.id} disabled={!can_retry_job?(@job)}>Retry Job</button>
Expand All @@ -62,15 +60,9 @@ defmodule Oban.LiveDashboard do
<:elem label="Attempts"><%= @job.attempt %>/<%= @job.max_attempts %></:elem>
<:elem label="Priority"><%= @job.priority %></:elem>
<:elem label="Attempted at"><%= format_value(@job.attempted_at) %></:elem>
<:elem :if={@job.cancelled_at} label="Cancelled at">
<%= format_value(@job.cancelled_at) %>
</:elem>
<:elem :if={@job.completed_at} label="Completed at">
<%= format_value(@job.completed_at) %>
</:elem>
<:elem :if={@job.discarded_at} label="Discarded at">
<%= format_value(@job.discarded_at) %>
</:elem>
<:elem :if={@job.cancelled_at} label="Cancelled at"><%= format_value(@job.cancelled_at) %></:elem>
<:elem :if={@job.completed_at} label="Completed at"><%= format_value(@job.completed_at) %></:elem>
<:elem :if={@job.discarded_at} label="Discarded at"><%= format_value(@job.discarded_at) %></:elem>
<:elem label="Inserted at"><%= format_value(@job.inserted_at) %></:elem>
<:elem label="Scheduled at"><%= format_value(@job.scheduled_at) %></:elem>
</.label_value_list>
Expand All @@ -80,7 +72,25 @@ defmodule Oban.LiveDashboard do
end

@impl true
def handle_params(%{"params" => %{"job" => job_id}}, _url, socket) do
def mount(_params, _, socket) do
{:ok, socket}
end

@impl true
def menu_link(_, _) do
{:ok, "Oban"}
end

@impl true
def handle_params(%{"params" => %{"job" => job_id}} = params, _url, socket) do
socket =
socket
|> assign(job_state: Map.get(params, "job_state", "executing"))
|> assign(sort_by: Map.get(params, "job_state"))
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()

case fetch_job(job_id) do
{:ok, job} ->
{:noreply, assign(socket, job: job)}
Expand All @@ -91,17 +101,16 @@ defmodule Oban.LiveDashboard do
end
end

def handle_params(_params, _url, socket) do
{:noreply, assign(socket, job: nil)}
end
def handle_params(params, _uri, socket) do
socket =
socket
|> assign(job_state: Map.get(params, "job_state", "executing"))
|> assign(sort_by: Map.get(params, "job_state"))
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()

@impl true
def handle_refresh(socket) do
{:noreply,
Phoenix.Component.update(socket, :job, fn
nil -> nil
%{id: job_id} -> get_job(job_id)
end)}
{:noreply, socket}
end

@impl true
Expand Down Expand Up @@ -137,32 +146,95 @@ defmodule Oban.LiveDashboard do
Oban.Repo.get(Oban.config(), Oban.Job, id)
end

defp fetch_jobs(params, _node) do
total_jobs = Oban.Repo.aggregate(Oban.config(), Oban.Job, :count)
jobs = Oban.Repo.all(Oban.config(), jobs_query(params)) |> Enum.map(&Map.from_struct/1)
@impl true
def handle_refresh(socket) do
{:noreply,
socket
|> assign_job_state_counts()
|> Phoenix.Component.update(:job, fn
nil -> nil
%{id: job_id} -> get_job(job_id)
end)}
end

defp assign_job_state_counts(socket) do
job_state_counts_in_db =
Oban.Repo.all(
Oban.config(),
Oban.Job
|> group_by([j], [j.state])
|> order_by([j], [j.state])
|> select([j], {j.state, count(j.id)})
)
|> Enum.into(%{})

job_state_counts =
for job_state <- @oban_sorted_job_states,
do: {job_state, Map.get(job_state_counts_in_db, job_state, 0)}

total_count = Keyword.values(job_state_counts) |> Enum.sum()
job_state_counts = [{"all", total_count} | job_state_counts]

assign(socket, job_state_counts: job_state_counts)
end

defp job_state_label(job_state, count) do
"#{Phoenix.Naming.humanize(job_state)} (#{count})"
end

defp fetch_jobs(params, _node, job_state) do
total_jobs = Oban.Repo.aggregate(Oban.config(), jobs_count_query(job_state), :count)

jobs =
Oban.Repo.all(Oban.config(), jobs_query(params, job_state)) |> Enum.map(&Map.from_struct/1)

{jobs, total_jobs}
end

defp fetch_job(id) do
case get_job(id) do
nil ->
:error

job ->
case Oban.Repo.get(Oban.config(), Oban.Job, id) do
%Oban.Job{} = job ->
{:ok, job}

_ ->
:error
end
end

defp can_retry_job?(%Oban.Job{state: state}), do: state not in ["available", "executing"]

defp can_cancel_job?(%Oban.Job{state: state}), do: state != "cancelled"

defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: l}) do
defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: limit}, "all") do
Oban.Job
|> limit(^l)
|> limit(^limit)
|> order_by({^sort_dir, ^sort_by})
end

defp jobs_query(params, job_state) do
Oban.Job
|> filter_by_job_state(job_state)
|> filter_by_params(params)
end

defp jobs_count_query("all") do
Oban.Job
end

defp jobs_count_query(job_state) do
filter_by_job_state(Oban.Job, job_state)
end

defp filter_by_params(queryable, %{sort_by: sort_by, sort_dir: sort_dir, limit: limit}) do
queryable
|> limit(^limit)
|> order_by({^sort_dir, ^sort_by})
end

defp filter_by_job_state(queryable, job_state) do
where(queryable, [job], job.state == ^job_state)
end

defp row_attrs(job) do
[
{"phx-click", "show_job"},
Expand All @@ -175,9 +247,39 @@ defmodule Oban.LiveDashboard do
Enum.map(errors, &Map.get(&1, "error"))
end

def format_value(%DateTime{} = datetime) do
DateTime.to_string(datetime)
defp format_value(%DateTime{} = datetime) do
Calendar.strftime(datetime, "%Y-%m-%d %H:%M:%S")
end

defp format_value(value), do: value

defp timestamp(job, timestamp_field) do
Map.get(job, timestamp_field)
end

defp assign_timestamp_field(%{assigns: %{job_state: job_state}} = socket) do
timestamp_field =
case job_state do
"available" -> :scheduled_at
"cancelled" -> :cancelled_at
"completed" -> :completed_at
"discarded" -> :discarded_at
"executing" -> :attempted_at
"retryable" -> :scheduled_at
"scheduled" -> :scheduled_at
_ -> :inserted_at
end

assign(socket, timestamp_field: timestamp_field)
end

defp truncate(string, max_length \\ 50) do
if String.length(string) > max_length do
String.slice(string, 0, max_length) <> "…"
else
string
end
end

def format_value(nil), do: nil
defp per_page_limits, do: @per_page_limits
end
51 changes: 48 additions & 3 deletions test/oban/live_dashboard_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,60 @@ defmodule Oban.LiveDashboardTest do
test "shows jobs with limit" do
for _ <- 1..110, do: job_fixture()
{:ok, live, rendered} = live(build_conn(), "/dashboard/oban")
assert rendered |> :binary.matches("<td class=\"oban-jobs-id\"") |> length() <= 100

assert rendered |> :binary.matches("<td class=\"oban-jobs-all-worker\"") |> length() ==
20

rendered = render_patch(live, "/dashboard/oban?limit=100")
assert rendered |> :binary.matches("<td class=\"oban-jobs-id\"") |> length() == 100

assert rendered |> :binary.matches("<td class=\"oban-jobs-all-worker\"") |> length() ==
100
end

test "switch between states" do
Oban.LiveDashboardTest.Repo.delete_all(Oban.Job)
_executing_job = job_fixture(%{"foo" => "executing"}, state: "executing")
_completed_job = job_fixture(%{"foo" => "completed"}, state: "completed")

conn = build_conn()
{:ok, live, rendered} = live(conn, "/dashboard/oban")

assert rendered |> :binary.matches("<td class=\"oban-jobs-all-worker\"") |> length() == 2

{:ok, live, rendered} =
live
|> element("a", "Executing (1)")
|> render_click()
|> follow_redirect(conn)

assert rendered
|> :binary.matches("<td class=\"oban-jobs-executing-worker\"")
|> length() == 1

{:ok, live, rendered} =
live
|> element("a", "Completed (1)")
|> render_click()
|> follow_redirect(conn)

assert rendered
|> :binary.matches("<td class=\"oban-jobs-completed-worker\"")
|> length() == 1

{:ok, _live, rendered} =
live
|> element("a", "Scheduled (0)")
|> render_click()
|> follow_redirect(conn)

assert rendered
|> :binary.matches("<td class=\"oban-jobs-scheduled-worker\"")
|> length() == 0
end

test "shows job info modal" do
job = job_fixture(%{something: "foobar"})
{:ok, live, rendered} = live(build_conn(), "/dashboard/oban?params[job]=#{job.id}")
{:ok, live, _rendered} = live(build_conn(), "/dashboard/oban?params[job]=#{job.id}")
rendered = render(live)
assert rendered =~ "modal-content"
assert rendered =~ "%{&quot;something&quot; =&gt; &quot;foobar&quot;}"
Expand Down

0 comments on commit e52ca44

Please sign in to comment.