Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create feed segments #494

Merged
merged 9 commits into from
Jun 14, 2024
2 changes: 1 addition & 1 deletion server/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ config :orcasite, Oban,
repo: Orcasite.Repo,
# 7 day job retention
plugins: [{Oban.Plugins.Pruner, max_age: 7 * 24 * 60 * 60}],
queues: [default: 10, email: 10]
queues: [default: 10, email: 10, feed_segments: 10]

config :spark, :formatter,
remove_parens?: true,
Expand Down
2 changes: 1 addition & 1 deletion server/lib/orcasite/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Orcasite.Application do
{Finch, name: Orcasite.Finch},
{Task.Supervisor, name: Orcasite.TaskSupervisor},
{AshAuthentication.Supervisor, otp_app: :orcasite},
if(Application.get_env(:orcasite, :feed_stream_queue_url),
if(Application.get_env(:orcasite, :feed_stream_queue_url) not in [nil, ""],
do: {Orcasite.Radio.FeedStreamQueue, []}
),
OrcasiteWeb.Endpoint
Expand Down
5 changes: 3 additions & 2 deletions server/lib/orcasite/global_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ defmodule Orcasite.GlobalSetup do
|> Ash.Query.for_read(:read)
|> Orcasite.Radio.read!()
|> Stream.map(fn feed ->
with {:ok, %{timestamps: timestamps}} <- Orcasite.Radio.AwsClient.list_timestamps(feed) do
Orcasite.Radio.AwsClient.list_timestamps(feed, fn timestamps ->
timestamps
|> Enum.map(&%{feed: feed, playlist_timestamp: &1})
|> Orcasite.Radio.bulk_create(Orcasite.Radio.FeedStream, :create)
end
end)
:ok
end)
|> Enum.to_list()
end
Expand Down
37 changes: 28 additions & 9 deletions server/lib/orcasite/radio/aws_client.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
defmodule Orcasite.Radio.AwsClient do
alias Orcasite.Radio.Feed
alias Orcasite.Radio.{Feed, FeedStream}

@default_results %{count: 0, timestamps: []}

def list_timestamps(%Feed{} = feed) do
loop_request_timestamp(feed)
def get_feed_stream(%FeedStream{
bucket_region: bucket_region,
bucket: bucket,
playlist_m3u8_path: path
}) do
ExAws.S3.get_object(bucket, path)
|> ExAws.request(region: bucket_region)
|> case do
{:ok, %{body: body, status_code: 200}} -> {:ok, body}
{:ok, other} -> {:error, other}
{:error, error} -> {:error, error}
end
end

def list_timestamps(%Feed{} = feed, callback \\ nil) do
loop_request_timestamp(feed, callback)
end

def request_timestamps(
%Feed{node_name: node_name, bucket: bucket, bucket_region: bucket_region},
continuation_token \\ nil
continuation_token \\ nil,
callback \\ nil
) do
continuation =
case continuation_token do
Expand All @@ -18,7 +33,7 @@ defmodule Orcasite.Radio.AwsClient do
token -> [continuation_token: token]
end

options = [prefix: "#{node_name}/hls/", delimiter: "/", max_keys: 1000] ++ continuation
options = [prefix: "#{node_name}/hls/", delimiter: "/", max_keys: 100] ++ continuation

ExAws.S3.list_objects_v2(bucket, options)
|> ExAws.request(region: bucket_region)
Expand All @@ -33,9 +48,13 @@ defmodule Orcasite.Radio.AwsClient do

count = Enum.count(timestamps)

token = body |> Map.get(:continuation_token)
token = body |> Map.get(:next_continuation_token)
has_more = body |> Map.get(:is_truncated) == "true"

if is_function(callback) do
callback.(timestamps)
end

{:ok,
%{count: count, timestamps: timestamps, continuation_token: token, has_more: has_more}}

Expand All @@ -44,8 +63,8 @@ defmodule Orcasite.Radio.AwsClient do
end
end

def loop_request_timestamp(feed, token \\ nil, results \\ @default_results) do
request_timestamps(feed, token)
def loop_request_timestamp(feed, callback \\ nil, token \\ nil, results \\ @default_results) do
request_timestamps(feed, token, callback)
|> case do
{:ok,
%{
Expand All @@ -54,7 +73,7 @@ defmodule Orcasite.Radio.AwsClient do
timestamps: timestamps,
continuation_token: continuation_token
}} ->
loop_request_timestamp(feed, continuation_token, %{
loop_request_timestamp(feed, callback, continuation_token, %{
count: count + results.count,
timestamps: results.timestamps ++ timestamps
})
Expand Down
181 changes: 181 additions & 0 deletions server/lib/orcasite/radio/feed_segment.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
defmodule Orcasite.Radio.FeedSegment do
use Ash.Resource,
extensions: [AshAdmin.Resource, AshUUID, AshGraphql.Resource, AshJsonApi.Resource],
data_layer: AshPostgres.DataLayer

postgres do
table "feed_segments"
repo Orcasite.Repo

migration_defaults id: "fragment(\"uuid_generate_v7()\")"

custom_indexes do
index [:start_time]
index [:end_time]
index [:feed_id]
index [:feed_stream_id]
index [:bucket]
end
end

identities do
identity :feed_segment_timestamp, [:feed_id, :start_time]
identity :feed_segment_path, [:segment_path]
end

attributes do
uuid_attribute :id, prefix: "fdseg"

attribute :start_time, :utc_datetime
attribute :end_time, :utc_datetime
attribute :duration, :decimal

attribute :bucket, :string
attribute :bucket_region, :string
attribute :cloudfront_url, :string

attribute :playlist_timestamp, :string do
description "UTC Unix epoch for playlist (m3u8 dir) start (e.g. 1541027406)"
end

attribute :playlist_path, :string do
description "S3 object path for playlist dir (e.g. /rpi_orcasound_lab/hls/1541027406/)"
end

attribute :playlist_m3u8_path, :string do
description "S3 object path for playlist file (e.g. /rpi_orcasound_lab/hls/1541027406/live.m3u8)"
end

attribute :segment_path, :string do
description "S3 object path for ts file (e.g. /rpi_orcasound_lab/hls/1541027406/live005.ts)"
end

attribute :file_name, :string do
description "ts file name (e.g. live005.ts)"
allow_nil? false
end

create_timestamp :inserted_at
update_timestamp :updated_at
end

relationships do
belongs_to :feed, Orcasite.Radio.Feed
belongs_to :feed_stream, Orcasite.Radio.FeedStream
end

actions do
defaults [:read, :update, :destroy]

read :index do
pagination do
offset? true
countable true
default_limit 100
end

argument :feed_id, :string
argument :feed_stream_id, :string

filter expr(
if(not is_nil(^arg(:feed_id)), do: feed_id == ^arg(:feed_id), else: true) and
if(
not is_nil(^arg(:feed_stream_id)),
do: feed_stream_id == ^arg(:feed_stream_id),
else: true
)
)
end

create :create do
primary? true
upsert? true
upsert_identity :feed_segment_path

upsert_fields [
:playlist_path,
:duration,
:start_time,
:end_time,
:bucket,
:bucket_region,
:cloudfront_url,
:playlist_timestamp,
:playlist_m3u8_path,
:file_name
]

accept [
:start_time,
:end_time,
:duration,
:bucket,
:bucket_region,
:cloudfront_url,
:playlist_timestamp,
:playlist_m3u8_path,
:playlist_path,
:file_name
]

argument :feed, :map, allow_nil?: false
argument :feed_stream, :map, allow_nil?: false

argument :segment_path, :string do
allow_nil? false
constraints allow_empty?: false
end

change set_attribute(:segment_path, arg(:segment_path))
change manage_relationship(:feed, type: :append)
change manage_relationship(:feed_stream, type: :append)

change fn changeset, context ->
feed = Ash.Changeset.get_argument_or_attribute(changeset, :feed)

playlist_timestamp =
changeset
|> Ash.Changeset.get_attribute(:playlist_timestamp)

feed
|> Map.take([:bucket, :bucket_region, :cloudfront_url])
|> Enum.reduce(changeset, fn {attribute, value}, acc ->
acc
|> Ash.Changeset.change_new_attribute(attribute, value)
end)
|> Ash.Changeset.change_new_attribute(
:start_time,
playlist_timestamp
|> String.to_integer()
|> DateTime.from_unix!()
)
|> Ash.Changeset.change_new_attribute(
:playlist_path,
"/#{feed.node_name}/hls/#{playlist_timestamp}/"
)
|> Ash.Changeset.change_new_attribute(
:playlist_m3u8_path,
"/#{feed.node_name}/hls/#{playlist_timestamp}/live.m3u8"
)
end
end
end

json_api do
type "feed_segment"

routes do
base "/feed_segments"

index :index
end
end

graphql do
type :feed_segment

queries do
list :feed_segments, :index
end
end
end
Loading
Loading