Skip to content

Commit

Permalink
Skip queueing next/prev link when there was none (#520)
Browse files Browse the repository at this point in the history
* Skip queueing next/prev link when there was none

* Only queue new jobs if there aren't currently available or executing
  • Loading branch information
skanderm authored Jul 1, 2024
1 parent 476ef8e commit 8847bb0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
17 changes: 14 additions & 3 deletions server/lib/orcasite/radio/workers/link_feed_stream.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
defmodule Orcasite.Radio.Workers.LinkFeedStream do
use Oban.Worker, queue: :feeds, unique: [keys: [:feed_stream_id], period: 10]
use Oban.Worker,
queue: :feeds,
unique: [
keys: [:feed_stream_id],
period: :infinity,
states: [:available, :scheduled, :executing]
]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"feed_stream_id" => nil} = args}) do
:ok
end

def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id} = args}) do
enqueue_prev_stream = Map.get(args, "enqueue_prev_stream", false)
enqueue_next_stream = Map.get(args, "enqueue_next_stream", false)
Expand All @@ -17,7 +27,8 @@ defmodule Orcasite.Radio.Workers.LinkFeedStream do
|> Ash.Changeset.for_update(:link_next_stream)
|> Orcasite.Radio.update()
|> case do
{:ok, %{next_feed_stream_id: next_feed_stream_id} = fs} ->
{:ok, %{next_feed_stream_id: next_feed_stream_id} = fs}
when not is_nil(next_feed_stream_id) ->
fs
|> Ash.Changeset.for_update(:update_end_time_and_duration)
|> Orcasite.Radio.update()
Expand All @@ -40,7 +51,7 @@ defmodule Orcasite.Radio.Workers.LinkFeedStream do
|> Ash.Changeset.for_update(:link_prev_stream)
|> Orcasite.Radio.update()
|> case do
{:ok, %{prev_feed_stream_id: prev_feed_stream_id}} ->
{:ok, %{prev_feed_stream_id: prev_feed_stream_id}} when not is_nil(prev_feed_stream_id) ->
# If new link to previous stream, queue another link job
if enqueue_prev_stream and prev_depth > 0 do
%{
Expand Down
4 changes: 3 additions & 1 deletion server/lib/orcasite/radio/workers/update_feed_segments.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule Orcasite.Radio.Workers.UpdateFeedSegments do
use Oban.Worker, queue: :feeds, unique: [keys: [:feed_stream_id], period: 10]
use Oban.Worker,
queue: :feeds,
unique: [keys: [:feed_stream_id], period: :infinity, states: [:available, :scheduled, :executing]]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id}}) do
Expand Down
26 changes: 16 additions & 10 deletions server/priv/repo/seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ feeds = [
node_name: "rpi_mast_center",
slug: "mast-center",
bucket: "dev-streaming-orcasound-net",
bucket_region: "us-west-2",
bucket_region: "us-west-2"
},
# %{
# lat_lng_string: "48.0336664, -122.6040035",
Expand All @@ -51,15 +51,15 @@ feeds = [
node_name: "rpi_sunset_bay",
slug: "sunset-bay",
bucket: "dev-streaming-orcasound-net",
bucket_region: "us-west-2",
bucket_region: "us-west-2"
},
%{
lat_lng_string: "48.591294, -123.058779",
name: "North San Juan Channel",
node_name: "rpi_north_sjc",
slug: "north-sjc",
bucket: "dev-streaming-orcasound-net",
bucket_region: "us-west-2",
bucket_region: "us-west-2"
}
]

Expand Down Expand Up @@ -297,12 +297,18 @@ Orcasite.Accounts.User
}
]
|> Enum.map(fn attrs ->
feed_id = feeds |> Enum.find(fn feed -> feed.slug == attrs[:slug] end) |> Map.get(:id)
feeds
|> Enum.find(fn feed -> feed.slug == attrs[:slug] end)
|> case do
%{id: feed_id} ->
Orcasite.Radio.Detection
|> Ash.Changeset.for_create(
:submit_detection,
Map.merge(attrs, %{feed_id: feed_id, send_notifications: false})
)
|> Orcasite.Radio.create!(verbose?: true, authorize?: false)

Orcasite.Radio.Detection
|> Ash.Changeset.for_create(
:submit_detection,
Map.merge(attrs, %{feed_id: feed_id, send_notifications: false})
)
|> Orcasite.Radio.create!(verbose?: true, authorize?: false)
_ ->
:ok
end
end)

0 comments on commit 8847bb0

Please sign in to comment.