Skip to content

Commit

Permalink
Link prev/next feed streams on bulk creation. Update feed_stream dura…
Browse files Browse the repository at this point in the history
…tion and end_time once successfully linking the next stream (#512)
  • Loading branch information
skanderm authored Jun 27, 2024
1 parent 5f09dd3 commit 476ef8e
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 72 deletions.
2 changes: 1 addition & 1 deletion server/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ config :orcasite, Oban,
repo: Orcasite.Repo,
# 7 day job retention
plugins: [{Oban.Plugins.Pruner, max_age: 7 * 24 * 60 * 60}],
queues: [default: 10, email: 10, feed_segments: 10]
queues: [default: 10, email: 10, feeds: 10]

config :spark, :formatter,
remove_parens?: true,
Expand Down
2 changes: 1 addition & 1 deletion server/lib/orcasite/radio/aws_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Orcasite.Radio.AwsClient do

@default_results %{count: 0, timestamps: []}

def get_feed_stream(%FeedStream{
def get_stream_manifest_body(%FeedStream{
bucket_region: bucket_region,
bucket: bucket,
playlist_m3u8_path: path
Expand Down
238 changes: 170 additions & 68 deletions server/lib/orcasite/radio/feed_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ defmodule Orcasite.Radio.FeedStream do
argument :feed, :map
argument :playlist_path, :string
argument :update_segments?, :boolean, default: false
argument :link_streams?, :boolean, default: false

change fn changeset, _context ->
path =
Expand Down Expand Up @@ -168,6 +169,18 @@ defmodule Orcasite.Radio.FeedStream do
|> Oban.insert()
end

if Ash.Changeset.get_argument(changeset, :link_streams?) do
%{
feed_stream_id: feed_stream_id,
enqueue_next_stream: true,
enqueue_prev_stream: true,
prev_depth: 3,
next_depth: 3
}
|> Orcasite.Radio.Workers.LinkFeedStream.new()
|> Oban.insert()
end

{:ok, feed_stream}
end)
end
Expand Down Expand Up @@ -263,77 +276,11 @@ defmodule Orcasite.Radio.FeedStream do
%{feed: feed, feed_segments: existing_feed_segments} =
feed_stream |> Orcasite.Radio.load!([:feed, feed_segments: file_name_query])

playlist_start_time = Ash.Changeset.get_attribute(change, :start_time)
playlist_path = Ash.Changeset.get_attribute(change, :playlist_path)

{:ok, body} = Orcasite.Radio.AwsClient.get_feed_stream(feed_stream)

feed_segments =
body
|> String.split("#")
# Looks like "EXTINF:10.005378,\nlive000.ts\n"
|> Enum.filter(&String.contains?(&1, "EXTINF"))
|> Enum.reduce(
[],
fn extinf_string, acc ->
with %{"duration" => duration_string, "file_name" => file_name} <-
Regex.named_captures(
~r|EXTINF:(?<duration>[^,]+),\n(?<file_name>[^\n]+)|,
extinf_string
) do
duration = Decimal.new(duration_string)

start_offset =
Enum.map(acc, & &1.duration)
|> Enum.reduce(Decimal.new("0"), &Decimal.add/2)
|> Decimal.mult(1000)

end_offset =
duration
|> Decimal.mult(1000)
|> Decimal.add(start_offset)
|> Decimal.round()

start_time =
DateTime.add(
playlist_start_time,
Decimal.to_integer(Decimal.round(start_offset)),
:millisecond
)

end_time =
DateTime.add(
playlist_start_time,
Decimal.to_integer(end_offset),
:millisecond
)

[
%{
file_name: file_name,
playlist_path: playlist_path,
duration: duration,
start_time: start_time,
end_time: end_time,
bucket: feed_stream.bucket,
bucket_region: feed_stream.bucket_region,
cloudfront_url: feed_stream.cloudfront_url,
playlist_timestamp: feed_stream.playlist_timestamp,
playlist_m3u8_path: feed_stream.playlist_m3u8_path,
segment_path: playlist_path <> file_name,
feed: feed,
feed_stream: feed_stream
}
| acc
]
else
_ -> acc
end
end
)
feed_segments = request_and_parse_manifest(feed_stream, feed)

existing_file_names = existing_feed_segments |> Enum.map(& &1.file_name)

# Only insert segments not already present
insert_segments =
feed_segments
|> Enum.filter(&(&1.file_name not in existing_file_names))
Expand All @@ -350,6 +297,91 @@ defmodule Orcasite.Radio.FeedStream do
end
end)
end

update :link_next_stream do
validate present([:start_time])
validate absent([:next_feed_stream_id])

change fn %{data: %{start_time: start_time, feed_id: feed_id}} = change, _context ->
require Ash.Query

Orcasite.Radio.FeedStream
|> Ash.Query.filter(start_time > ^start_time and feed_id == ^feed_id)
|> Ash.Query.sort(start_time: :asc)
|> Ash.Query.limit(1)
|> Orcasite.Radio.read()
|> case do
{:ok, [next_stream]} ->
change
|> Ash.Changeset.manage_relationship(:next_feed_stream, next_stream,
on_lookup: :relate
)

_ ->
change
end
end
end

update :link_prev_stream do
validate present([:start_time])
validate absent([:prev_feed_stream_id])

change fn %{data: %{start_time: start_time, feed_id: feed_id}} = change, _context ->
require Ash.Query

Orcasite.Radio.FeedStream
|> Ash.Query.filter(start_time < ^start_time and feed_id == ^feed_id)
|> Ash.Query.sort(start_time: :desc)
|> Ash.Query.limit(1)
|> Orcasite.Radio.read()
|> case do
{:ok, [prev_stream]} ->
change
|> Ash.Changeset.manage_relationship(:prev_feed_stream, prev_stream,
on_lookup: :relate
)

_ ->
change
end
end
end

update :update_end_time_and_duration do
description "Pulls and parses the manifest body, updates duration and end time. Only runs if there's a next_feed_stream"
validate present([:next_feed_stream_id, :start_time])
validate absent([:duration, :end_time])

change before_action(fn %{data: feed_stream} = change ->
feed_stream
|> request_and_parse_manifest()
|> case do
feed_segments when is_list(feed_segments) and length(feed_segments) > 0 ->
duration =
feed_segments
|> Enum.map(&Map.get(&1, :duration))
|> Enum.reduce(&Decimal.add/2)

end_time =
DateTime.add(
feed_stream.start_time,
duration
|> Decimal.mult(1000)
|> Decimal.round()
|> Decimal.to_integer(),
:millisecond
)

change
|> Ash.Changeset.change_attribute(:end_time, end_time)
|> Ash.Changeset.change_attribute(:duration, duration)

_ ->
change
end
end)
end
end

code_interface do
Expand All @@ -375,4 +407,74 @@ defmodule Orcasite.Radio.FeedStream do
list :feed_streams, :index
end
end

def request_and_parse_manifest(feed_stream, feed \\ nil) do
playlist_start_time = feed_stream.start_time
playlist_path = feed_stream.playlist_path

{:ok, body} = Orcasite.Radio.AwsClient.get_stream_manifest_body(feed_stream)

body
|> String.split("#")
# Looks like "EXTINF:10.005378,\nlive000.ts\n"
|> Enum.filter(&String.contains?(&1, "EXTINF"))
|> Enum.reduce(
[],
fn extinf_string, acc ->
with %{"duration" => duration_string, "file_name" => file_name} <-
Regex.named_captures(
~r|EXTINF:(?<duration>[^,]+),\n(?<file_name>[^\n]+)|,
extinf_string
) do
duration = Decimal.new(duration_string)

start_offset =
Enum.map(acc, & &1.duration)
|> Enum.reduce(Decimal.new("0"), &Decimal.add/2)
|> Decimal.mult(1000)

end_offset =
duration
|> Decimal.mult(1000)
|> Decimal.add(start_offset)
|> Decimal.round()

start_time =
DateTime.add(
playlist_start_time,
Decimal.to_integer(Decimal.round(start_offset)),
:millisecond
)

end_time =
DateTime.add(
playlist_start_time,
Decimal.to_integer(end_offset),
:millisecond
)

[
%{
file_name: file_name,
playlist_path: playlist_path,
duration: duration,
start_time: start_time,
end_time: end_time,
bucket: feed_stream.bucket,
bucket_region: feed_stream.bucket_region,
cloudfront_url: feed_stream.cloudfront_url,
playlist_timestamp: feed_stream.playlist_timestamp,
playlist_m3u8_path: feed_stream.playlist_m3u8_path,
segment_path: playlist_path <> file_name,
feed: feed,
feed_stream: feed_stream
}
| acc
]
else
_ -> acc
end
end
)
end
end
2 changes: 1 addition & 1 deletion server/lib/orcasite/radio/feed_stream_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Orcasite.Radio.FeedStreamQueue do

Task.Supervisor.start_child(Orcasite.TaskSupervisor, fn ->
paths
|> Enum.map(&Map.put(&1, :update_segments?, true))
|> Enum.map(&Map.merge(&1, %{update_segments?: true, link_streams?: true}))
|> Orcasite.Radio.bulk_create(
Orcasite.Radio.FeedStream,
:from_m3u8_path,
Expand Down
61 changes: 61 additions & 0 deletions server/lib/orcasite/radio/workers/link_feed_stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Orcasite.Radio.Workers.LinkFeedStream do
use Oban.Worker, queue: :feeds, unique: [keys: [:feed_stream_id], period: 10]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id} = args}) do
enqueue_prev_stream = Map.get(args, "enqueue_prev_stream", false)
enqueue_next_stream = Map.get(args, "enqueue_next_stream", false)
next_depth = Map.get(args, "next_depth", 3)
prev_depth = Map.get(args, "prev_depth", 3)

feed_stream =
Orcasite.Radio.FeedStream
|> Orcasite.Radio.get!(feed_stream_id)

# If new link to next stream, update times and maybe queue next feed_stream to link
feed_stream
|> Ash.Changeset.for_update(:link_next_stream)
|> Orcasite.Radio.update()
|> case do
{:ok, %{next_feed_stream_id: next_feed_stream_id} = fs} ->
fs
|> Ash.Changeset.for_update(:update_end_time_and_duration)
|> Orcasite.Radio.update()

if enqueue_next_stream and next_depth > 0 do
%{
feed_stream_id: next_feed_stream_id,
enqueue_next_stream: enqueue_next_stream,
next_depth: next_depth - 1
}
|> new()
|> Oban.insert()
end

_ ->
nil
end

feed_stream
|> Ash.Changeset.for_update(:link_prev_stream)
|> Orcasite.Radio.update()
|> case do
{:ok, %{prev_feed_stream_id: prev_feed_stream_id}} ->
# If new link to previous stream, queue another link job
if enqueue_prev_stream and prev_depth > 0 do
%{
feed_stream_id: prev_feed_stream_id,
enqueue_prev_stream: enqueue_prev_stream,
prev_depth: prev_depth - 1
}
|> new()
|> Oban.insert()
end

_ ->
nil
end

:ok
end
end
2 changes: 1 addition & 1 deletion server/lib/orcasite/radio/workers/update_feed_segments.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Orcasite.Radio.Workers.UpdateFeedSegments do
use Oban.Worker, queue: :feed_segments, unique: [keys: [:feed_stream_id], period: 10]
use Oban.Worker, queue: :feeds, unique: [keys: [:feed_stream_id], period: 10]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id}}) do
Expand Down

0 comments on commit 476ef8e

Please sign in to comment.