From 8847bb0228ab7ba89e9a4e871aec0923837bce5c Mon Sep 17 00:00:00 2001 From: skanderm Date: Mon, 1 Jul 2024 16:00:47 -0700 Subject: [PATCH] Skip queueing next/prev link when there was none (#520) * Skip queueing next/prev link when there was none * Only queue new jobs if there aren't currently available or executing --- .../radio/workers/link_feed_stream.ex | 17 +++++++++--- .../radio/workers/update_feed_segments.ex | 4 ++- server/priv/repo/seeds.exs | 26 ++++++++++++------- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/server/lib/orcasite/radio/workers/link_feed_stream.ex b/server/lib/orcasite/radio/workers/link_feed_stream.ex index c880d431..ec1cf0e4 100644 --- a/server/lib/orcasite/radio/workers/link_feed_stream.ex +++ b/server/lib/orcasite/radio/workers/link_feed_stream.ex @@ -1,7 +1,17 @@ defmodule Orcasite.Radio.Workers.LinkFeedStream do - use Oban.Worker, queue: :feeds, unique: [keys: [:feed_stream_id], period: 10] + use Oban.Worker, + queue: :feeds, + unique: [ + keys: [:feed_stream_id], + period: :infinity, + states: [:available, :scheduled, :executing] + ] @impl Oban.Worker + def perform(%Oban.Job{args: %{"feed_stream_id" => nil} = args}) do + :ok + end + def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id} = args}) do enqueue_prev_stream = Map.get(args, "enqueue_prev_stream", false) enqueue_next_stream = Map.get(args, "enqueue_next_stream", false) @@ -17,7 +27,8 @@ defmodule Orcasite.Radio.Workers.LinkFeedStream do |> Ash.Changeset.for_update(:link_next_stream) |> Orcasite.Radio.update() |> case do - {:ok, %{next_feed_stream_id: next_feed_stream_id} = fs} -> + {:ok, %{next_feed_stream_id: next_feed_stream_id} = fs} + when not is_nil(next_feed_stream_id) -> fs |> Ash.Changeset.for_update(:update_end_time_and_duration) |> Orcasite.Radio.update() @@ -40,7 +51,7 @@ defmodule Orcasite.Radio.Workers.LinkFeedStream do |> Ash.Changeset.for_update(:link_prev_stream) |> Orcasite.Radio.update() |> case do - {:ok, %{prev_feed_stream_id: prev_feed_stream_id}} -> + {:ok, %{prev_feed_stream_id: prev_feed_stream_id}} when not is_nil(prev_feed_stream_id) -> # If new link to previous stream, queue another link job if enqueue_prev_stream and prev_depth > 0 do %{ diff --git a/server/lib/orcasite/radio/workers/update_feed_segments.ex b/server/lib/orcasite/radio/workers/update_feed_segments.ex index 203b6a8b..49fb884a 100644 --- a/server/lib/orcasite/radio/workers/update_feed_segments.ex +++ b/server/lib/orcasite/radio/workers/update_feed_segments.ex @@ -1,5 +1,7 @@ defmodule Orcasite.Radio.Workers.UpdateFeedSegments do - use Oban.Worker, queue: :feeds, unique: [keys: [:feed_stream_id], period: 10] + use Oban.Worker, + queue: :feeds, + unique: [keys: [:feed_stream_id], period: :infinity, states: [:available, :scheduled, :executing]] @impl Oban.Worker def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id}}) do diff --git a/server/priv/repo/seeds.exs b/server/priv/repo/seeds.exs index c2488338..799843b4 100644 --- a/server/priv/repo/seeds.exs +++ b/server/priv/repo/seeds.exs @@ -27,7 +27,7 @@ feeds = [ node_name: "rpi_mast_center", slug: "mast-center", bucket: "dev-streaming-orcasound-net", - bucket_region: "us-west-2", + bucket_region: "us-west-2" }, # %{ # lat_lng_string: "48.0336664, -122.6040035", @@ -51,7 +51,7 @@ feeds = [ node_name: "rpi_sunset_bay", slug: "sunset-bay", bucket: "dev-streaming-orcasound-net", - bucket_region: "us-west-2", + bucket_region: "us-west-2" }, %{ lat_lng_string: "48.591294, -123.058779", @@ -59,7 +59,7 @@ feeds = [ node_name: "rpi_north_sjc", slug: "north-sjc", bucket: "dev-streaming-orcasound-net", - bucket_region: "us-west-2", + bucket_region: "us-west-2" } ] @@ -297,12 +297,18 @@ Orcasite.Accounts.User } ] |> Enum.map(fn attrs -> - feed_id = feeds |> Enum.find(fn feed -> feed.slug == attrs[:slug] end) |> Map.get(:id) + feeds + |> Enum.find(fn feed -> feed.slug == attrs[:slug] end) + |> case do + %{id: feed_id} -> + Orcasite.Radio.Detection + |> Ash.Changeset.for_create( + :submit_detection, + Map.merge(attrs, %{feed_id: feed_id, send_notifications: false}) + ) + |> Orcasite.Radio.create!(verbose?: true, authorize?: false) - Orcasite.Radio.Detection - |> Ash.Changeset.for_create( - :submit_detection, - Map.merge(attrs, %{feed_id: feed_id, send_notifications: false}) - ) - |> Orcasite.Radio.create!(verbose?: true, authorize?: false) + _ -> + :ok + end end)