From af1c01ba646661f59c44667de2daa5ee34dc0415 Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Fri, 7 Jun 2024 12:07:42 -0700 Subject: [PATCH 1/7] Create FeedSegment, update listing timestamps with callback function --- server/lib/orcasite/global_setup.ex | 5 +- server/lib/orcasite/radio/aws_client.ex | 19 +- server/lib/orcasite/radio/feed_segment.ex | 245 ++++++++++++++++++++++ server/lib/orcasite/radio/feed_stream.ex | 3 +- server/priv/repo/seeds.exs | 48 ++--- 5 files changed, 286 insertions(+), 34 deletions(-) create mode 100644 server/lib/orcasite/radio/feed_segment.ex diff --git a/server/lib/orcasite/global_setup.ex b/server/lib/orcasite/global_setup.ex index 853dd15c..54af7044 100644 --- a/server/lib/orcasite/global_setup.ex +++ b/server/lib/orcasite/global_setup.ex @@ -4,11 +4,12 @@ defmodule Orcasite.GlobalSetup do |> Ash.Query.for_read(:read) |> Orcasite.Radio.read!() |> Stream.map(fn feed -> - with {:ok, %{timestamps: timestamps}} <- Orcasite.Radio.AwsClient.list_timestamps(feed) do + Orcasite.Radio.AwsClient.list_timestamps(feed, fn timestamps -> timestamps |> Enum.map(&%{feed: feed, playlist_timestamp: &1}) |> Orcasite.Radio.bulk_create(Orcasite.Radio.FeedStream, :create) - end + end) + :ok end) |> Enum.to_list() end diff --git a/server/lib/orcasite/radio/aws_client.ex b/server/lib/orcasite/radio/aws_client.ex index 46e14ab5..c20790bd 100644 --- a/server/lib/orcasite/radio/aws_client.ex +++ b/server/lib/orcasite/radio/aws_client.ex @@ -3,13 +3,14 @@ defmodule Orcasite.Radio.AwsClient do @default_results %{count: 0, timestamps: []} - def list_timestamps(%Feed{} = feed) do - loop_request_timestamp(feed) + def list_timestamps(%Feed{} = feed, callback \\ nil) do + loop_request_timestamp(feed, callback) end def request_timestamps( %Feed{node_name: node_name, bucket: bucket, bucket_region: bucket_region}, - continuation_token \\ nil + continuation_token \\ nil, + callback \\ nil ) do continuation = case continuation_token do @@ -18,7 +19,7 @@ defmodule Orcasite.Radio.AwsClient do token -> [continuation_token: token] end - options = [prefix: "#{node_name}/hls/", delimiter: "/", max_keys: 1000] ++ continuation + options = [prefix: "#{node_name}/hls/", delimiter: "/", max_keys: 100] ++ continuation ExAws.S3.list_objects_v2(bucket, options) |> ExAws.request(region: bucket_region) @@ -36,6 +37,10 @@ defmodule Orcasite.Radio.AwsClient do token = body |> Map.get(:continuation_token) has_more = body |> Map.get(:is_truncated) == "true" + if is_function(callback) do + callback.(timestamps) + end + {:ok, %{count: count, timestamps: timestamps, continuation_token: token, has_more: has_more}} @@ -44,8 +49,8 @@ defmodule Orcasite.Radio.AwsClient do end end - def loop_request_timestamp(feed, token \\ nil, results \\ @default_results) do - request_timestamps(feed, token) + def loop_request_timestamp(feed, callback \\ nil, token \\ nil, results \\ @default_results) do + request_timestamps(feed, token, callback) |> case do {:ok, %{ @@ -54,7 +59,7 @@ defmodule Orcasite.Radio.AwsClient do timestamps: timestamps, continuation_token: continuation_token }} -> - loop_request_timestamp(feed, continuation_token, %{ + loop_request_timestamp(feed, callback, continuation_token, %{ count: count + results.count, timestamps: results.timestamps ++ timestamps }) diff --git a/server/lib/orcasite/radio/feed_segment.ex b/server/lib/orcasite/radio/feed_segment.ex new file mode 100644 index 00000000..ce6ab311 --- /dev/null +++ b/server/lib/orcasite/radio/feed_segment.ex @@ -0,0 +1,245 @@ +defmodule Orcasite.Radio.FeedSegment do + use Ash.Resource, + extensions: [AshAdmin.Resource, AshUUID, AshGraphql.Resource, AshJsonApi.Resource], + data_layer: AshPostgres.DataLayer + + postgres do + table "feed_segments" + repo Orcasite.Repo + + migration_defaults id: "fragment(\"uuid_generate_v7()\")" + + custom_indexes do + index [:start_time] + index [:end_time] + index [:feed_id] + index [:feed_stream_id] + index [:bucket] + end + end + + identities do + identity :feed_segment_timestamp, [:feed_id, :start_time] + end + + attributes do + uuid_attribute :id, prefix: "fdseg" + + attribute :start_time, :utc_datetime + attribute :end_time, :utc_datetime + attribute :duration, :decimal + + attribute :bucket, :string + attribute :bucket_region, :string + attribute :cloudfront_url, :string + + attribute :playlist_timestamp, :string do + description "UTC Unix epoch for playlist start (e.g. 1541027406)" + end + + attribute :playlist_path, :string do + description "S3 object path for playlist dir (e.g. /rpi_orcasound_lab/hls/1541027406/)" + end + + attribute :playlist_path, :string do + description "S3 object path for playlist dir (e.g. /rpi_orcasound_lab/hls/1541027406/)" + end + + attribute :playlist_m3u8_path, :string do + description "S3 object path for playlist file (e.g. /rpi_orcasound_lab/hls/1541027406/live.m3u8)" + end + + attribute :segment_path, :string do + description "S3 object path for ts file (e.g. /rpi_orcasound_lab/hls/1541027406/live005.ts)" + end + + create_timestamp :inserted_at + update_timestamp :updated_at + end + + relationships do + belongs_to :feed, Orcasite.Radio.Feed + belongs_to :feed_stream, Orcasite.Radio.FeedStream + end + + actions do + defaults [:read, :update, :destroy] + + read :index do + pagination do + offset? true + countable true + default_limit 100 + end + + argument :feed_id, :string + + filter expr(if not is_nil(^arg(:feed_id), do: feed_id == ^arg(:feed_id)), else: true) + end + + create :from_ts_path do + upsert? true + upsert_identity :feed_stream_timestamp + + upsert_fields [ + :start_time, + :bucket, + :bucket_region, + :cloudfront_url, + :playlist_path, + :playlist_m3u8_path, + :playlist_timestamp + ] + + argument :m3u8_path, :string, allow_nil?: false + argument :feed, :map + argument :playlist_path, :string + + change fn changeset, _context -> + path = + changeset + |> Ash.Changeset.get_argument(:m3u8_path) + |> String.trim_leading("/") + + with {:path, %{"node_name" => node_name, "timestamp" => playlist_timestamp}} <- + {:path, + Regex.named_captures( + ~r|(?[^/]+)/hls/(?[^/]+)/live.m3u8|, + path + )}, + {:feed, _, {:ok, feed}} <- + {:feed, node_name, Orcasite.Radio.Feed.get_feed_by_node_name(node_name)} do + changeset + |> Ash.Changeset.manage_relationship(:feed, feed, type: :append) + |> Ash.Changeset.change_attribute(:playlist_timestamp, playlist_timestamp) + |> Ash.Changeset.set_argument(:feed, feed) + else + {:feed, node_name, _error} -> + changeset + |> Ash.Changeset.add_error("Feed for #{node_name} not found") + + {:path, _error} -> + changeset + |> Ash.Changeset.add_error( + "Path #{path} does not match the hls object path format (/hls//live.m3u8)" + ) + end + end + + change fn changeset, context -> + if !changeset.valid? do + changeset + else + feed = Ash.Changeset.get_argument_or_attribute(changeset, :feed) + + playlist_timestamp = + changeset + |> Ash.Changeset.get_argument_or_attribute(:playlist_timestamp) + + feed + |> Map.take([:bucket, :bucket_region, :cloudfront_url]) + |> Enum.reduce(changeset, fn {attribute, value}, acc -> + acc + |> Ash.Changeset.change_new_attribute(attribute, value) + end) + |> Ash.Changeset.change_new_attribute( + :start_time, + playlist_timestamp + |> String.to_integer() + |> DateTime.from_unix!() + ) + |> Ash.Changeset.change_new_attribute( + :playlist_path, + "/#{feed.node_name}/hls/#{playlist_timestamp}/" + ) + |> Ash.Changeset.change_new_attribute( + :playlist_m3u8_path, + "/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8" + ) + |> Ash.Changeset.change_attribute(:playlist_timestamp, playlist_timestamp) + end + end + end + + create :create do + primary? true + upsert? true + upsert_identity :feed_stream_timestamp + + upsert_fields [ + :start_time, + :end_time, + :duration, + :bucket, + :bucket_region, + :cloudfront_url, + :playlist_timestamp + ] + + accept [ + :start_time, + :end_time, + :duration, + :bucket, + :bucket_region, + :cloudfront_url, + :playlist_timestamp + ] + + argument :feed, :map, allow_nil?: false + argument :prev_feed_stream, :string + + argument :playlist_timestamp, :string do + allow_nil? false + constraints allow_empty?: false + end + + change set_attribute(:playlist_timestamp, arg(:playlist_timestamp)) + change manage_relationship(:feed, type: :append) + change manage_relationship(:prev_feed_stream, type: :append) + + change fn changeset, context -> + feed = Ash.Changeset.get_argument_or_attribute(changeset, :feed) + + playlist_timestamp = + changeset + |> Ash.Changeset.get_argument(:playlist_timestamp) + + feed + |> Map.take([:bucket, :bucket_region, :cloudfront_url]) + |> Enum.reduce(changeset, fn {attribute, value}, acc -> + acc + |> Ash.Changeset.change_new_attribute(attribute, value) + end) + |> Ash.Changeset.change_new_attribute( + :start_time, + playlist_timestamp + |> String.to_integer() + |> DateTime.from_unix!() + ) + |> Ash.Changeset.change_new_attribute( + :playlist_path, + "/#{feed.node_name}/hls/#{playlist_timestamp}/" + ) + |> Ash.Changeset.change_new_attribute( + :playlist_m3u8_path, + "/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8" + ) + end + end + end + + code_interface do + define_for Orcasite.Radio + + define :create_from_m3u8_path, action: :from_m3u8_path, args: [:m3u8_path] + end + + graphql do + type :feed_stream + + queries do + list :feed_streams, :index + end + end +end diff --git a/server/lib/orcasite/radio/feed_stream.ex b/server/lib/orcasite/radio/feed_stream.ex index 4bdef0d4..e50a2712 100644 --- a/server/lib/orcasite/radio/feed_stream.ex +++ b/server/lib/orcasite/radio/feed_stream.ex @@ -43,7 +43,7 @@ defmodule Orcasite.Radio.FeedStream do end attribute :playlist_m3u8_path, :string do - description "S3 object path for playlist dir (e.g. /rpi_orcasound_lab/hls/1541027406/live.m3u8)" + description "S3 object path for playlist file (e.g. /rpi_orcasound_lab/hls/1541027406/live.m3u8)" end create_timestamp :inserted_at @@ -165,6 +165,7 @@ defmodule Orcasite.Radio.FeedStream do primary? true upsert? true upsert_identity :feed_stream_timestamp + upsert_fields [ :start_time, :end_time, diff --git a/server/priv/repo/seeds.exs b/server/priv/repo/seeds.exs index 0f19df36..c2488338 100644 --- a/server/priv/repo/seeds.exs +++ b/server/priv/repo/seeds.exs @@ -13,14 +13,14 @@ require Ash.Query feeds = [ - %{ - lat_lng_string: "48.5583362, -123.1735774", - name: "Orcasound Lab (Haro Strait)", - node_name: "rpi_orcasound_lab", - slug: "orcasound-lab", - bucket: "dev-streaming-orcasound-net", - bucket_region: "us-west-2", - }, + # %{ + # lat_lng_string: "48.5583362, -123.1735774", + # name: "Orcasound Lab (Haro Strait)", + # node_name: "rpi_orcasound_lab", + # slug: "orcasound-lab", + # bucket: "dev-streaming-orcasound-net", + # bucket_region: "us-west-2", + # }, %{ lat_lng_string: "47.34922, -122.32512", name: "MaST Center Aquarium", @@ -29,22 +29,22 @@ feeds = [ bucket: "dev-streaming-orcasound-net", bucket_region: "us-west-2", }, - %{ - lat_lng_string: "48.0336664, -122.6040035", - name: "Bush Point", - node_name: "rpi_bush_point", - slug: "bush-point", - bucket: "dev-streaming-orcasound-net", - bucket_region: "us-west-2", - }, - %{ - lat_lng_string: "48.135743, -122.760614", - name: "Port Townsend", - node_name: "rpi_port_townsend", - slug: "port-townsend", - bucket: "dev-streaming-orcasound-net", - bucket_region: "us-west-2", - }, + # %{ + # lat_lng_string: "48.0336664, -122.6040035", + # name: "Bush Point", + # node_name: "rpi_bush_point", + # slug: "bush-point", + # bucket: "dev-streaming-orcasound-net", + # bucket_region: "us-west-2", + # }, + # %{ + # lat_lng_string: "48.135743, -122.760614", + # name: "Port Townsend", + # node_name: "rpi_port_townsend", + # slug: "port-townsend", + # bucket: "dev-streaming-orcasound-net", + # bucket_region: "us-west-2", + # }, %{ lat_lng_string: "47.86497296593844, -122.33393605795372", name: "Sunset Bay", From 1df67b46dc1a06dc653a8fa0c14ddb3ed0727c38 Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Wed, 12 Jun 2024 17:16:11 -0700 Subject: [PATCH 2/7] Update feed segment creation to use bulk insertion --- server/lib/orcasite/radio/aws_client.ex | 16 +- server/lib/orcasite/radio/feed_segment.ex | 130 ++---- server/lib/orcasite/radio/feed_stream.ex | 101 +++++ .../lib/orcasite/radio/feed_stream_queue.ex | 8 +- server/lib/orcasite/radio/registry.ex | 1 + .../20240612233042_create_feed_segments.exs | 90 +++++ .../repo/feed_segments/20240612233042.json | 346 ++++++++++++++++ .../repo/feed_streams/20240612233042.json | 376 ++++++++++++++++++ 8 files changed, 960 insertions(+), 108 deletions(-) create mode 100644 server/priv/repo/migrations/20240612233042_create_feed_segments.exs create mode 100644 server/priv/resource_snapshots/repo/feed_segments/20240612233042.json create mode 100644 server/priv/resource_snapshots/repo/feed_streams/20240612233042.json diff --git a/server/lib/orcasite/radio/aws_client.ex b/server/lib/orcasite/radio/aws_client.ex index c20790bd..e1dc0d54 100644 --- a/server/lib/orcasite/radio/aws_client.ex +++ b/server/lib/orcasite/radio/aws_client.ex @@ -1,8 +1,22 @@ defmodule Orcasite.Radio.AwsClient do - alias Orcasite.Radio.Feed + alias Orcasite.Radio.{Feed, FeedStream} @default_results %{count: 0, timestamps: []} + def get_feed_stream(%FeedStream{ + bucket_region: bucket_region, + bucket: bucket, + playlist_m3u8_path: path + }) do + ExAws.S3.get_object(bucket, path) + |> ExAws.request(region: bucket_region) + |> case do + {:ok, %{body: body, status_code: 200}} -> {:ok, body} + {:ok, other} -> {:error, other} + {:error, error} -> {:error, error} + end + end + def list_timestamps(%Feed{} = feed, callback \\ nil) do loop_request_timestamp(feed, callback) end diff --git a/server/lib/orcasite/radio/feed_segment.ex b/server/lib/orcasite/radio/feed_segment.ex index ce6ab311..93ae7d0f 100644 --- a/server/lib/orcasite/radio/feed_segment.ex +++ b/server/lib/orcasite/radio/feed_segment.ex @@ -20,6 +20,7 @@ defmodule Orcasite.Radio.FeedSegment do identities do identity :feed_segment_timestamp, [:feed_id, :start_time] + identity :feed_segment_path, [:segment_path] end attributes do @@ -34,11 +35,7 @@ defmodule Orcasite.Radio.FeedSegment do attribute :cloudfront_url, :string attribute :playlist_timestamp, :string do - description "UTC Unix epoch for playlist start (e.g. 1541027406)" - end - - attribute :playlist_path, :string do - description "S3 object path for playlist dir (e.g. /rpi_orcasound_lab/hls/1541027406/)" + description "UTC Unix epoch for playlist (m3u8 dir) start (e.g. 1541027406)" end attribute :playlist_path, :string do @@ -53,6 +50,11 @@ defmodule Orcasite.Radio.FeedSegment do description "S3 object path for ts file (e.g. /rpi_orcasound_lab/hls/1541027406/live005.ts)" end + attribute :file_name, :string do + description "ts file name (e.g. live005.ts)" + allow_nil? false + end + create_timestamp :inserted_at update_timestamp :updated_at end @@ -77,103 +79,22 @@ defmodule Orcasite.Radio.FeedSegment do filter expr(if not is_nil(^arg(:feed_id), do: feed_id == ^arg(:feed_id)), else: true) end - create :from_ts_path do - upsert? true - upsert_identity :feed_stream_timestamp - - upsert_fields [ - :start_time, - :bucket, - :bucket_region, - :cloudfront_url, - :playlist_path, - :playlist_m3u8_path, - :playlist_timestamp - ] - - argument :m3u8_path, :string, allow_nil?: false - argument :feed, :map - argument :playlist_path, :string - - change fn changeset, _context -> - path = - changeset - |> Ash.Changeset.get_argument(:m3u8_path) - |> String.trim_leading("/") - - with {:path, %{"node_name" => node_name, "timestamp" => playlist_timestamp}} <- - {:path, - Regex.named_captures( - ~r|(?[^/]+)/hls/(?[^/]+)/live.m3u8|, - path - )}, - {:feed, _, {:ok, feed}} <- - {:feed, node_name, Orcasite.Radio.Feed.get_feed_by_node_name(node_name)} do - changeset - |> Ash.Changeset.manage_relationship(:feed, feed, type: :append) - |> Ash.Changeset.change_attribute(:playlist_timestamp, playlist_timestamp) - |> Ash.Changeset.set_argument(:feed, feed) - else - {:feed, node_name, _error} -> - changeset - |> Ash.Changeset.add_error("Feed for #{node_name} not found") - - {:path, _error} -> - changeset - |> Ash.Changeset.add_error( - "Path #{path} does not match the hls object path format (/hls//live.m3u8)" - ) - end - end - - change fn changeset, context -> - if !changeset.valid? do - changeset - else - feed = Ash.Changeset.get_argument_or_attribute(changeset, :feed) - - playlist_timestamp = - changeset - |> Ash.Changeset.get_argument_or_attribute(:playlist_timestamp) - - feed - |> Map.take([:bucket, :bucket_region, :cloudfront_url]) - |> Enum.reduce(changeset, fn {attribute, value}, acc -> - acc - |> Ash.Changeset.change_new_attribute(attribute, value) - end) - |> Ash.Changeset.change_new_attribute( - :start_time, - playlist_timestamp - |> String.to_integer() - |> DateTime.from_unix!() - ) - |> Ash.Changeset.change_new_attribute( - :playlist_path, - "/#{feed.node_name}/hls/#{playlist_timestamp}/" - ) - |> Ash.Changeset.change_new_attribute( - :playlist_m3u8_path, - "/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8" - ) - |> Ash.Changeset.change_attribute(:playlist_timestamp, playlist_timestamp) - end - end - end - create :create do primary? true upsert? true - upsert_identity :feed_stream_timestamp + upsert_identity :feed_segment_path upsert_fields [ + :playlist_path, + :duration, :start_time, :end_time, - :duration, :bucket, :bucket_region, :cloudfront_url, - :playlist_timestamp + :playlist_timestamp, + :playlist_m3u8_path, + :file_name ] accept [ @@ -183,27 +104,30 @@ defmodule Orcasite.Radio.FeedSegment do :bucket, :bucket_region, :cloudfront_url, - :playlist_timestamp + :playlist_timestamp, + :playlist_m3u8_path, + :playlist_path, + :file_name ] argument :feed, :map, allow_nil?: false - argument :prev_feed_stream, :string + argument :feed_stream, :map, allow_nil?: false - argument :playlist_timestamp, :string do + argument :segment_path, :string do allow_nil? false constraints allow_empty?: false end - change set_attribute(:playlist_timestamp, arg(:playlist_timestamp)) + change set_attribute(:segment_path, arg(:segment_path)) change manage_relationship(:feed, type: :append) - change manage_relationship(:prev_feed_stream, type: :append) + change manage_relationship(:feed_stream, type: :append) change fn changeset, context -> feed = Ash.Changeset.get_argument_or_attribute(changeset, :feed) playlist_timestamp = changeset - |> Ash.Changeset.get_argument(:playlist_timestamp) + |> Ash.Changeset.get_attribute(:playlist_timestamp) feed |> Map.take([:bucket, :bucket_region, :cloudfront_url]) @@ -229,17 +153,11 @@ defmodule Orcasite.Radio.FeedSegment do end end - code_interface do - define_for Orcasite.Radio - - define :create_from_m3u8_path, action: :from_m3u8_path, args: [:m3u8_path] - end - graphql do - type :feed_stream + type :feed_segment queries do - list :feed_streams, :index + list :feed_segments, :index end end end diff --git a/server/lib/orcasite/radio/feed_stream.ex b/server/lib/orcasite/radio/feed_stream.ex index e50a2712..07d79b33 100644 --- a/server/lib/orcasite/radio/feed_stream.ex +++ b/server/lib/orcasite/radio/feed_stream.ex @@ -21,6 +21,7 @@ defmodule Orcasite.Radio.FeedStream do identities do identity :feed_stream_timestamp, [:feed_id, :start_time] + identity :playlist_m3u8_path, [:playlist_m3u8_path] end attributes do @@ -55,6 +56,7 @@ defmodule Orcasite.Radio.FeedStream do belongs_to :prev_feed_stream, Orcasite.Radio.FeedStream belongs_to :next_feed_stream, Orcasite.Radio.FeedStream + has_many :feed_segments, Orcasite.Radio.FeedSegment has_many :bout_feed_streams, Orcasite.Radio.BoutFeedStream many_to_many :bouts, Orcasite.Radio.Bout do @@ -227,6 +229,105 @@ defmodule Orcasite.Radio.FeedStream do ) end end + + update :update_segments do + description "Pulls contents of m3u8 file and creates a FeedSegment per new entry" + validate present([:playlist_m3u8_path, :playlist_path]) + + change after_action(fn change, feed_stream -> + file_name_query = + Orcasite.Radio.FeedSegment |> Ash.Query.new() |> Ash.Query.select([:file_name]) + + %{feed: feed, feed_segments: existing_feed_segments} = + feed_stream |> Orcasite.Radio.load!([:feed, feed_segments: file_name_query]) + + playlist_start_time = Ash.Changeset.get_attribute(change, :start_time) + playlist_path = Ash.Changeset.get_attribute(change, :playlist_path) + + {:ok, body} = Orcasite.Radio.AwsClient.get_feed_stream(feed_stream) + + feed_segments = + body + |> String.split("#") + # Looks like "EXTINF:10.005378,\nlive000.ts\n" + |> Enum.filter(&String.contains?(&1, "EXTINF")) + |> Enum.reduce( + [], + fn extinf_string, acc -> + with %{"duration" => duration_string, "file_name" => file_name} <- + Regex.named_captures( + ~r|EXTINF:(?[^,]+),\n(?[^\n]+)|, + extinf_string + ) do + duration = Decimal.new(duration_string) + + start_offset = + Enum.map(acc, & &1.duration) + |> Enum.reduce(Decimal.new("0"), &Decimal.add/2) + |> Decimal.mult(1000) + + end_offset = + duration + |> Decimal.mult(1000) + |> Decimal.add(start_offset) + |> Decimal.round() + + start_time = + DateTime.add( + playlist_start_time, + Decimal.to_integer(Decimal.round(start_offset)), + :millisecond + ) + + end_time = + DateTime.add( + playlist_start_time, + Decimal.to_integer(end_offset), + :millisecond + ) + + [ + %{ + file_name: file_name, + playlist_path: playlist_path, + duration: duration, + start_time: start_time, + end_time: end_time, + bucket: feed_stream.bucket, + bucket_region: feed_stream.bucket_region, + cloudfront_url: feed_stream.cloudfront_url, + playlist_timestamp: feed_stream.playlist_timestamp, + playlist_m3u8_path: feed_stream.playlist_m3u8_path, + segment_path: playlist_path <> file_name, + feed: feed, + feed_stream: feed_stream + } + | acc + ] + else + _ -> acc + end + end + ) + + existing_file_names = existing_feed_segments |> Enum.map(& &1.file_name) + + insert_segments = + feed_segments + |> Enum.filter(&(&1.file_name not in existing_file_names)) + + insert_segments + |> Orcasite.Radio.bulk_create(Orcasite.Radio.FeedSegment, :create, + upsert?: true, + upsert_identity: :feed_segment_path, + stop_on_error?: true + ) + |> case do + %{status: :success} -> {:ok, feed_stream} + %{errors: error} -> {:error, error} + end + end) + end end code_interface do diff --git a/server/lib/orcasite/radio/feed_stream_queue.ex b/server/lib/orcasite/radio/feed_stream_queue.ex index 7ca5ea2f..5971fbed 100644 --- a/server/lib/orcasite/radio/feed_stream_queue.ex +++ b/server/lib/orcasite/radio/feed_stream_queue.ex @@ -56,7 +56,13 @@ defmodule Orcasite.Radio.FeedStreamQueue do end) Task.Supervisor.start_child(Orcasite.TaskSupervisor, fn -> - Orcasite.Radio.bulk_create(paths, Orcasite.Radio.FeedStream, :from_m3u8_path) + paths + |> Orcasite.Radio.bulk_create( + Orcasite.Radio.FeedStream, + :from_m3u8_path, + upsert?: true, + upsert_identity: feed_stream_timestamp + ) end) messages diff --git a/server/lib/orcasite/radio/registry.ex b/server/lib/orcasite/radio/registry.ex index eaabd03a..2ac05a32 100644 --- a/server/lib/orcasite/radio/registry.ex +++ b/server/lib/orcasite/radio/registry.ex @@ -8,6 +8,7 @@ defmodule Orcasite.Radio.Registry do entry Orcasite.Radio.Bout entry Orcasite.Radio.FeedStream entry Orcasite.Radio.BoutFeedStream + entry Orcasite.Radio.FeedSegment end end diff --git a/server/priv/repo/migrations/20240612233042_create_feed_segments.exs b/server/priv/repo/migrations/20240612233042_create_feed_segments.exs new file mode 100644 index 00000000..7b90309b --- /dev/null +++ b/server/priv/repo/migrations/20240612233042_create_feed_segments.exs @@ -0,0 +1,90 @@ +defmodule Orcasite.Repo.Migrations.CreateFeedSegments do + @moduledoc """ + Updates resources based on their most recent snapshots. + + This file was autogenerated with `mix ash_postgres.generate_migrations` + """ + + use Ecto.Migration + + def up do + create unique_index(:feed_streams, [:playlist_m3u8_path], + name: "feed_streams_playlist_m3u8_path_index" + ) + + create table(:feed_segments, primary_key: false) do + add :id, :uuid, null: false, default: fragment("uuid_generate_v7()"), primary_key: true + add :start_time, :utc_datetime + add :end_time, :utc_datetime + add :duration, :decimal + add :bucket, :text + add :bucket_region, :text + add :cloudfront_url, :text + add :playlist_timestamp, :text + add :playlist_path, :text + add :playlist_m3u8_path, :text + add :segment_path, :text + add :file_name, :text, null: false + add :inserted_at, :utc_datetime_usec, null: false, default: fragment("now()") + add :updated_at, :utc_datetime_usec, null: false, default: fragment("now()") + + add :feed_id, + references(:feeds, column: :id, name: "feed_segments_feed_id_fkey", type: :uuid) + + add :feed_stream_id, + references(:feed_streams, + column: :id, + name: "feed_segments_feed_stream_id_fkey", + type: :uuid + ) + end + + create index(:feed_segments, [:bucket]) + + create index(:feed_segments, [:feed_stream_id]) + + create index(:feed_segments, [:feed_id]) + + create index(:feed_segments, [:end_time]) + + create index(:feed_segments, [:start_time]) + + create unique_index(:feed_segments, [:segment_path], + name: "feed_segments_feed_segment_path_index" + ) + + create unique_index(:feed_segments, [:feed_id, :start_time], + name: "feed_segments_feed_segment_timestamp_index" + ) + end + + def down do + drop_if_exists unique_index(:feed_segments, [:feed_id, :start_time], + name: "feed_segments_feed_segment_timestamp_index" + ) + + drop_if_exists unique_index(:feed_segments, [:segment_path], + name: "feed_segments_feed_segment_path_index" + ) + + drop constraint(:feed_segments, "feed_segments_feed_id_fkey") + + drop constraint(:feed_segments, "feed_segments_feed_stream_id_fkey") + + drop_if_exists index(:feed_segments, [:start_time]) + + drop_if_exists index(:feed_segments, [:end_time]) + + drop_if_exists index(:feed_segments, [:feed_id]) + + drop_if_exists index(:feed_segments, [:feed_stream_id]) + + drop_if_exists index(:feed_segments, [:bucket]) + + drop table(:feed_segments) + + drop_if_exists unique_index(:feed_streams, [:playlist_m3u8_path], + name: "feed_streams_playlist_m3u8_path_index" + ) + end +end diff --git a/server/priv/resource_snapshots/repo/feed_segments/20240612233042.json b/server/priv/resource_snapshots/repo/feed_segments/20240612233042.json new file mode 100644 index 00000000..891959ed --- /dev/null +++ b/server/priv/resource_snapshots/repo/feed_segments/20240612233042.json @@ -0,0 +1,346 @@ +{ + "attributes": [ + { + "default": "fragment(\"uuid_generate_v7()\")", + "size": null, + "type": "uuid", + "source": "id", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": true + }, + { + "default": "nil", + "size": null, + "type": "utc_datetime", + "source": "start_time", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "utc_datetime", + "source": "end_time", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "decimal", + "source": "duration", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "bucket", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "bucket_region", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "cloudfront_url", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "playlist_timestamp", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "playlist_path", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "playlist_m3u8_path", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "segment_path", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "file_name", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": false + }, + { + "default": "fragment(\"now()\")", + "size": null, + "type": "utc_datetime_usec", + "source": "inserted_at", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": false + }, + { + "default": "fragment(\"now()\")", + "size": null, + "type": "utc_datetime_usec", + "source": "updated_at", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "uuid", + "source": "feed_id", + "references": { + "name": "feed_segments_feed_id_fkey", + "table": "feeds", + "schema": null, + "on_delete": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "primary_key?": true, + "destination_attribute": "id", + "deferrable": false, + "match_type": null, + "match_with": null, + "on_update": null, + "destination_attribute_default": null, + "destination_attribute_generated": null + }, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "uuid", + "source": "feed_stream_id", + "references": { + "name": "feed_segments_feed_stream_id_fkey", + "table": "feed_streams", + "schema": null, + "on_delete": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "primary_key?": true, + "destination_attribute": "id", + "deferrable": false, + "match_type": null, + "match_with": null, + "on_update": null, + "destination_attribute_default": null, + "destination_attribute_generated": null + }, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + } + ], + "table": "feed_segments", + "hash": "D3284D3995318EC0278825D83AE3C28D24ECF74E261CBEEFD0A51A2B4A62AEDF", + "repo": "Elixir.Orcasite.Repo", + "identities": [ + { + "name": "feed_segment_path", + "keys": [ + "segment_path" + ], + "all_tenants?": false, + "index_name": "feed_segments_feed_segment_path_index", + "base_filter": null + }, + { + "name": "feed_segment_timestamp", + "keys": [ + "feed_id", + "start_time" + ], + "all_tenants?": false, + "index_name": "feed_segments_feed_segment_timestamp_index", + "base_filter": null + } + ], + "schema": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "custom_indexes": [ + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "start_time" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "start_time" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "end_time" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "end_time" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "feed_id" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "feed_id" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "feed_stream_id" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "feed_stream_id" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "bucket" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "bucket" + ], + "nulls_distinct": true, + "using": null + } + ], + "base_filter": null, + "check_constraints": [], + "custom_statements": [], + "has_create_action": true +} \ No newline at end of file diff --git a/server/priv/resource_snapshots/repo/feed_streams/20240612233042.json b/server/priv/resource_snapshots/repo/feed_streams/20240612233042.json new file mode 100644 index 00000000..658e85f7 --- /dev/null +++ b/server/priv/resource_snapshots/repo/feed_streams/20240612233042.json @@ -0,0 +1,376 @@ +{ + "attributes": [ + { + "default": "fragment(\"uuid_generate_v7()\")", + "size": null, + "type": "uuid", + "source": "id", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": true + }, + { + "default": "nil", + "size": null, + "type": "utc_datetime", + "source": "start_time", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "utc_datetime", + "source": "end_time", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "decimal", + "source": "duration", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "bucket", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "bucket_region", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "cloudfront_url", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "playlist_timestamp", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "playlist_path", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "text", + "source": "playlist_m3u8_path", + "references": null, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "fragment(\"now()\")", + "size": null, + "type": "utc_datetime_usec", + "source": "inserted_at", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": false + }, + { + "default": "fragment(\"now()\")", + "size": null, + "type": "utc_datetime_usec", + "source": "updated_at", + "references": null, + "allow_nil?": false, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "uuid", + "source": "feed_id", + "references": { + "name": "feed_streams_feed_id_fkey", + "table": "feeds", + "schema": null, + "on_delete": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "primary_key?": true, + "destination_attribute": "id", + "deferrable": false, + "match_type": null, + "match_with": null, + "on_update": null, + "destination_attribute_default": null, + "destination_attribute_generated": null + }, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "uuid", + "source": "prev_feed_stream_id", + "references": { + "name": "feed_streams_prev_feed_stream_id_fkey", + "table": "feed_streams", + "schema": null, + "on_delete": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "primary_key?": true, + "destination_attribute": "id", + "deferrable": false, + "match_type": null, + "match_with": null, + "on_update": null, + "destination_attribute_default": null, + "destination_attribute_generated": null + }, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + }, + { + "default": "nil", + "size": null, + "type": "uuid", + "source": "next_feed_stream_id", + "references": { + "name": "feed_streams_next_feed_stream_id_fkey", + "table": "feed_streams", + "schema": null, + "on_delete": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "primary_key?": true, + "destination_attribute": "id", + "deferrable": false, + "match_type": null, + "match_with": null, + "on_update": null, + "destination_attribute_default": null, + "destination_attribute_generated": null + }, + "allow_nil?": true, + "generated?": false, + "primary_key?": false + } + ], + "table": "feed_streams", + "hash": "27A297788ABE51320A69E845F34EB11047A85BEC995DB8761655D08BD63E6A62", + "repo": "Elixir.Orcasite.Repo", + "identities": [ + { + "name": "feed_stream_timestamp", + "keys": [ + "feed_id", + "start_time" + ], + "all_tenants?": false, + "index_name": "feed_streams_feed_stream_timestamp_index", + "base_filter": null + }, + { + "name": "playlist_m3u8_path", + "keys": [ + "playlist_m3u8_path" + ], + "all_tenants?": false, + "index_name": "feed_streams_playlist_m3u8_path_index", + "base_filter": null + } + ], + "schema": null, + "multitenancy": { + "global": null, + "strategy": null, + "attribute": null + }, + "custom_indexes": [ + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "start_time" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "start_time" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "end_time" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "end_time" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "feed_id" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "feed_id" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "prev_feed_stream_id" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "prev_feed_stream_id" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "next_feed_stream_id" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "next_feed_stream_id" + ], + "nulls_distinct": true, + "using": null + }, + { + "message": null, + "name": null, + "table": null, + "include": null, + "prefix": null, + "where": null, + "fields": [ + { + "type": "atom", + "value": "bucket" + } + ], + "unique": false, + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "bucket" + ], + "nulls_distinct": true, + "using": null + } + ], + "base_filter": null, + "check_constraints": [], + "custom_statements": [], + "has_create_action": true +} \ No newline at end of file From a747e72a90e3ec94ea41b7f191d7f19025da90ed Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Wed, 12 Jun 2024 17:17:00 -0700 Subject: [PATCH 3/7] Fix typo --- server/lib/orcasite/radio/feed_stream_queue.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/lib/orcasite/radio/feed_stream_queue.ex b/server/lib/orcasite/radio/feed_stream_queue.ex index 5971fbed..dc706498 100644 --- a/server/lib/orcasite/radio/feed_stream_queue.ex +++ b/server/lib/orcasite/radio/feed_stream_queue.ex @@ -61,7 +61,7 @@ defmodule Orcasite.Radio.FeedStreamQueue do Orcasite.Radio.FeedStream, :from_m3u8_path, upsert?: true, - upsert_identity: feed_stream_timestamp + upsert_identity: :feed_stream_timestamp ) end) From c96fc57f1e53aa25f849499e1c88be15f4df819e Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Wed, 12 Jun 2024 17:50:16 -0700 Subject: [PATCH 4/7] Add feed_stream_id filter for querying feed segments index --- server/lib/orcasite/radio/feed_segment.ex | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/lib/orcasite/radio/feed_segment.ex b/server/lib/orcasite/radio/feed_segment.ex index 93ae7d0f..31e89db3 100644 --- a/server/lib/orcasite/radio/feed_segment.ex +++ b/server/lib/orcasite/radio/feed_segment.ex @@ -75,8 +75,16 @@ defmodule Orcasite.Radio.FeedSegment do end argument :feed_id, :string - - filter expr(if not is_nil(^arg(:feed_id), do: feed_id == ^arg(:feed_id)), else: true) + argument :feed_stream_id, :string + + filter expr( + if(not is_nil(^arg(:feed_id)), do: feed_id == ^arg(:feed_id), else: true) and + if( + not is_nil(^arg(:feed_stream_id)), + do: feed_stream_id == ^arg(:feed_stream_id), + else: true + ) + ) end create :create do From 2002f0b5837f8b11b7e5b25343f4f1fc2a6108d5 Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Wed, 12 Jun 2024 17:54:40 -0700 Subject: [PATCH 5/7] Add json apis for feed_streams and feed_segments --- server/lib/orcasite/radio/feed_segment.ex | 10 ++++++++++ server/lib/orcasite/radio/feed_stream.ex | 12 +++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/server/lib/orcasite/radio/feed_segment.ex b/server/lib/orcasite/radio/feed_segment.ex index 31e89db3..2f9b1f2e 100644 --- a/server/lib/orcasite/radio/feed_segment.ex +++ b/server/lib/orcasite/radio/feed_segment.ex @@ -161,6 +161,16 @@ defmodule Orcasite.Radio.FeedSegment do end end + json_api do + type "feed_segment" + + routes do + base "/feed_segments" + + index :index + end + end + graphql do type :feed_segment diff --git a/server/lib/orcasite/radio/feed_stream.ex b/server/lib/orcasite/radio/feed_stream.ex index 07d79b33..19227c1f 100644 --- a/server/lib/orcasite/radio/feed_stream.ex +++ b/server/lib/orcasite/radio/feed_stream.ex @@ -76,7 +76,7 @@ defmodule Orcasite.Radio.FeedStream do argument :feed_id, :string - filter expr(if not is_nil(^arg(:feed_id), do: feed_id == ^arg(:feed_id)), else: true) + filter expr(if not is_nil(^arg(:feed_id)), do: feed_id == ^arg(:feed_id), else: true) end create :from_m3u8_path do @@ -336,6 +336,16 @@ defmodule Orcasite.Radio.FeedStream do define :create_from_m3u8_path, action: :from_m3u8_path, args: [:m3u8_path] end + json_api do + type "feed_stream" + + routes do + base "/feed_streams" + + index :index + end + end + graphql do type :feed_stream From 17f7504f3e375d8a76cd38b6c0fc60fe3226d602 Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Wed, 12 Jun 2024 18:40:22 -0700 Subject: [PATCH 6/7] Add unique worker for updating feed segments --- server/config/config.exs | 2 +- server/lib/orcasite/application.ex | 2 +- server/lib/orcasite/radio/feed_stream.ex | 20 +++++++++++++++++++ .../lib/orcasite/radio/feed_stream_queue.ex | 2 ++ .../radio/workers/update_feed_segments.ex | 11 ++++++++++ 5 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 server/lib/orcasite/radio/workers/update_feed_segments.ex diff --git a/server/config/config.exs b/server/config/config.exs index c55b5061..ee550f99 100644 --- a/server/config/config.exs +++ b/server/config/config.exs @@ -93,7 +93,7 @@ config :orcasite, Oban, repo: Orcasite.Repo, # 7 day job retention plugins: [{Oban.Plugins.Pruner, max_age: 7 * 24 * 60 * 60}], - queues: [default: 10, email: 10] + queues: [default: 10, email: 10, feed_segments: 10] config :spark, :formatter, remove_parens?: true, diff --git a/server/lib/orcasite/application.ex b/server/lib/orcasite/application.ex index 2a23c568..dffb53f8 100644 --- a/server/lib/orcasite/application.ex +++ b/server/lib/orcasite/application.ex @@ -23,7 +23,7 @@ defmodule Orcasite.Application do {Finch, name: Orcasite.Finch}, {Task.Supervisor, name: Orcasite.TaskSupervisor}, {AshAuthentication.Supervisor, otp_app: :orcasite}, - if(Application.get_env(:orcasite, :feed_stream_queue_url), + if(Application.get_env(:orcasite, :feed_stream_queue_url) not in [nil, ""], do: {Orcasite.Radio.FeedStreamQueue, []} ), OrcasiteWeb.Endpoint diff --git a/server/lib/orcasite/radio/feed_stream.ex b/server/lib/orcasite/radio/feed_stream.ex index 19227c1f..acf2d12b 100644 --- a/server/lib/orcasite/radio/feed_stream.ex +++ b/server/lib/orcasite/radio/feed_stream.ex @@ -96,6 +96,7 @@ defmodule Orcasite.Radio.FeedStream do argument :m3u8_path, :string, allow_nil?: false argument :feed, :map argument :playlist_path, :string + argument :update_segments?, :boolean, default: false change fn changeset, _context -> path = @@ -159,6 +160,15 @@ defmodule Orcasite.Radio.FeedStream do "/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8" ) |> Ash.Changeset.change_attribute(:playlist_timestamp, playlist_timestamp) + |> Ash.Changeset.after_action(fn change, %{id: feed_stream_id} = feed_stream -> + if Ash.Changeset.get_argument(changeset, :update_segments?) do + %{feed_stream_id: feed_stream_id} + |> Orcasite.Radio.Workers.UpdateFeedSegments.new() + |> Oban.insert() + end + + {:ok, feed_stream} + end) end end end @@ -188,6 +198,7 @@ defmodule Orcasite.Radio.FeedStream do :playlist_timestamp ] + argument :update_segments?, :boolean, default: false argument :feed, :map, allow_nil?: false argument :prev_feed_stream, :string @@ -227,6 +238,15 @@ defmodule Orcasite.Radio.FeedStream do :playlist_m3u8_path, "/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8" ) + |> Ash.Changeset.after_action(fn change, %{id: feed_stream_id} = feed_stream -> + if Ash.Changeset.get_argument(changeset, :update_segments?) do + %{feed_stream_id: feed_stream_id} + |> Orcasite.Radio.Workers.UpdateFeedSegments.new() + |> Oban.insert() + end + + {:ok, feed_stream} + end) end end diff --git a/server/lib/orcasite/radio/feed_stream_queue.ex b/server/lib/orcasite/radio/feed_stream_queue.ex index dc706498..fee60c7b 100644 --- a/server/lib/orcasite/radio/feed_stream_queue.ex +++ b/server/lib/orcasite/radio/feed_stream_queue.ex @@ -54,9 +54,11 @@ defmodule Orcasite.Radio.FeedStreamQueue do [] end end) + |> Enum.uniq() Task.Supervisor.start_child(Orcasite.TaskSupervisor, fn -> paths + |> Enum.map(&Map.put(&1, :update_segments?, true)) |> Orcasite.Radio.bulk_create( Orcasite.Radio.FeedStream, :from_m3u8_path, diff --git a/server/lib/orcasite/radio/workers/update_feed_segments.ex b/server/lib/orcasite/radio/workers/update_feed_segments.ex new file mode 100644 index 00000000..b8b597c3 --- /dev/null +++ b/server/lib/orcasite/radio/workers/update_feed_segments.ex @@ -0,0 +1,11 @@ +defmodule Orcasite.Radio.Workers.UpdateFeedSegments do + use Oban.Worker, queue: :feed_segments, unique: [keys: [:feed_stream_id], period: 10] + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id}}) do + Orcasite.Radio.FeedStream + |> Orcasite.Radio.get!(feed_stream_id) + |> Ash.Changeset.for_update(:update_segments) + |> Orcasite.Radio.update() + end +end From d9f5fdac66d4c5b870f34b17766cb043bbf291d2 Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Thu, 13 Jun 2024 12:01:01 -0700 Subject: [PATCH 7/7] Fix typo with selecting continuation token --- server/lib/orcasite/radio/aws_client.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/lib/orcasite/radio/aws_client.ex b/server/lib/orcasite/radio/aws_client.ex index e1dc0d54..f1c81a14 100644 --- a/server/lib/orcasite/radio/aws_client.ex +++ b/server/lib/orcasite/radio/aws_client.ex @@ -48,7 +48,7 @@ defmodule Orcasite.Radio.AwsClient do count = Enum.count(timestamps) - token = body |> Map.get(:continuation_token) + token = body |> Map.get(:next_continuation_token) has_more = body |> Map.get(:is_truncated) == "true" if is_function(callback) do