Skip to content

Commit

Permalink
Add unique worker for updating feed segments
Browse files Browse the repository at this point in the history
  • Loading branch information
skanderm committed Jun 13, 2024
1 parent 2002f0b commit 17f7504
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
2 changes: 1 addition & 1 deletion server/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ config :orcasite, Oban,
repo: Orcasite.Repo,
# 7 day job retention
plugins: [{Oban.Plugins.Pruner, max_age: 7 * 24 * 60 * 60}],
queues: [default: 10, email: 10]
queues: [default: 10, email: 10, feed_segments: 10]

config :spark, :formatter,
remove_parens?: true,
Expand Down
2 changes: 1 addition & 1 deletion server/lib/orcasite/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Orcasite.Application do
{Finch, name: Orcasite.Finch},
{Task.Supervisor, name: Orcasite.TaskSupervisor},
{AshAuthentication.Supervisor, otp_app: :orcasite},
if(Application.get_env(:orcasite, :feed_stream_queue_url),
if(Application.get_env(:orcasite, :feed_stream_queue_url) not in [nil, ""],
do: {Orcasite.Radio.FeedStreamQueue, []}
),
OrcasiteWeb.Endpoint
Expand Down
20 changes: 20 additions & 0 deletions server/lib/orcasite/radio/feed_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ defmodule Orcasite.Radio.FeedStream do
argument :m3u8_path, :string, allow_nil?: false
argument :feed, :map
argument :playlist_path, :string
argument :update_segments?, :boolean, default: false

change fn changeset, _context ->
path =
Expand Down Expand Up @@ -159,6 +160,15 @@ defmodule Orcasite.Radio.FeedStream do
"/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8"
)
|> Ash.Changeset.change_attribute(:playlist_timestamp, playlist_timestamp)
|> Ash.Changeset.after_action(fn change, %{id: feed_stream_id} = feed_stream ->
if Ash.Changeset.get_argument(changeset, :update_segments?) do
%{feed_stream_id: feed_stream_id}
|> Orcasite.Radio.Workers.UpdateFeedSegments.new()
|> Oban.insert()
end

{:ok, feed_stream}
end)
end
end
end
Expand Down Expand Up @@ -188,6 +198,7 @@ defmodule Orcasite.Radio.FeedStream do
:playlist_timestamp
]

argument :update_segments?, :boolean, default: false
argument :feed, :map, allow_nil?: false
argument :prev_feed_stream, :string

Expand Down Expand Up @@ -227,6 +238,15 @@ defmodule Orcasite.Radio.FeedStream do
:playlist_m3u8_path,
"/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8"
)
|> Ash.Changeset.after_action(fn change, %{id: feed_stream_id} = feed_stream ->
if Ash.Changeset.get_argument(changeset, :update_segments?) do
%{feed_stream_id: feed_stream_id}
|> Orcasite.Radio.Workers.UpdateFeedSegments.new()
|> Oban.insert()
end

{:ok, feed_stream}
end)
end
end

Expand Down
2 changes: 2 additions & 0 deletions server/lib/orcasite/radio/feed_stream_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ defmodule Orcasite.Radio.FeedStreamQueue do
[]
end
end)
|> Enum.uniq()

Task.Supervisor.start_child(Orcasite.TaskSupervisor, fn ->
paths
|> Enum.map(&Map.put(&1, :update_segments?, true))
|> Orcasite.Radio.bulk_create(
Orcasite.Radio.FeedStream,
:from_m3u8_path,
Expand Down
11 changes: 11 additions & 0 deletions server/lib/orcasite/radio/workers/update_feed_segments.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Orcasite.Radio.Workers.UpdateFeedSegments do
use Oban.Worker, queue: :feed_segments, unique: [keys: [:feed_stream_id], period: 10]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id}}) do
Orcasite.Radio.FeedStream
|> Orcasite.Radio.get!(feed_stream_id)
|> Ash.Changeset.for_update(:update_segments)
|> Orcasite.Radio.update()
end
end

0 comments on commit 17f7504

Please sign in to comment.