From c6a7dd93487873b7575d2e9241740baa9a5f41b5 Mon Sep 17 00:00:00 2001 From: Skander Mzali Date: Tue, 27 Aug 2024 19:03:06 -0700 Subject: [PATCH] Add working spectrogram creation/generation --- server/lib/orcasite/radio/audio_image.ex | 70 +++- server/lib/orcasite/radio/aws_client.ex | 13 +- .../radio/workers/generate_spectrogram.ex | 17 + .../radio/workers/update_feed_segments.ex | 8 +- ...26204458_add_last_error_to_audio_image.exs | 29 ++ .../repo/audio_images/20240826204458.json | 311 ++++++++++++++++++ 6 files changed, 432 insertions(+), 16 deletions(-) create mode 100644 server/lib/orcasite/radio/workers/generate_spectrogram.ex create mode 100644 server/priv/repo/migrations/20240826204458_add_last_error_to_audio_image.exs create mode 100644 server/priv/resource_snapshots/repo/audio_images/20240826204458.json diff --git a/server/lib/orcasite/radio/audio_image.ex b/server/lib/orcasite/radio/audio_image.ex index 848477c8..6ebeccea 100644 --- a/server/lib/orcasite/radio/audio_image.ex +++ b/server/lib/orcasite/radio/audio_image.ex @@ -2,6 +2,7 @@ defmodule Orcasite.Radio.AudioImage do require Ash.Resource.Change.Builtins require Ash.Resource.Change.Builtins require Ash.Resource.Change.Builtins + require Ash.Resource.Change.Builtins use Ash.Resource, otp_app: :orcasite, @@ -54,6 +55,7 @@ defmodule Orcasite.Radio.AudioImage do attribute :bucket, :string, public?: true attribute :bucket_region, :string, public?: true attribute :object_path, :string, public?: true + attribute :last_error, :string timestamps() end @@ -113,30 +115,78 @@ defmodule Orcasite.Radio.AudioImage do end) change after_action( - fn record, _context -> - feed = record |> Ash.load(:feed) |> Map.get(:feed) + fn _change, record, _context -> + feed = record |> Ash.load!(:feed) |> Map.get(:feed) record |> Ash.Changeset.for_update(:update, %{ - object_path: "/#{feed.node_name}/#{record.id}.png" + object_path: "/#{feed.node_name}/spectrograms/#{record.id}.png" }) |> Ash.update(authorize?: false) end, prepend?: true ) - change after_action(fn record, _context -> - record - |> Ash.Changeset.for_update(:generate_spectrogram) - |> Ash.update(authorize?: false) + change after_action(fn _change, record, _context -> + %{audio_image_id: record.id} + |> Orcasite.Radio.Workers.GenerateSpectrogram.new() + |> Oban.insert() + + {:ok, record} end) end update :generate_spectrogram do change set_attribute(:status, :processing) - change fn change, _context -> - change - end + + change after_action( + fn _change, image, _context -> + # Only one feed segment at a time for now + [feed_segment] = image |> Ash.load!(:feed_segments) |> Map.get(:feed_segments) + + %{ + image_id: image.id, + audio_bucket: feed_segment.bucket, + audio_key: feed_segment.segment_path, + image_bucket: image.bucket, + image_key: image.object_path + } + |> Orcasite.Radio.AwsClient.generate_spectrogram() + |> case do + {:ok, %{"errorMessage" => _} = error} -> + image + |> Ash.Changeset.for_update(:update, %{ + status: :failed + }) + |> Ash.Changeset.force_change_attribute(:last_error, inspect(error)) + |> Ash.update(authorize?: false) + + {:error, :spectrogram_failed} + + {:ok, %{image_size: image_size, sample_rate: _sample_rate}} -> + image + |> Ash.Changeset.for_update(:update, %{ + status: :complete, + image_size: image_size + }) + |> Ash.Changeset.force_change_attribute(:last_error, nil) + |> Ash.update(authorize?: false) + + {:ok, image} + + {:error, error} -> + image + |> Ash.Changeset.for_update(:update, %{ + status: :failed + }) + |> Ash.Changeset.force_change_attribute(:last_error, inspect(error)) + |> Ash.update(authorize?: false) + + error + end + end, + prepend?: true + ) end end diff --git a/server/lib/orcasite/radio/aws_client.ex b/server/lib/orcasite/radio/aws_client.ex index b65be6dd..dd2d9b54 100644 --- a/server/lib/orcasite/radio/aws_client.ex +++ b/server/lib/orcasite/radio/aws_client.ex @@ -10,7 +10,6 @@ defmodule Orcasite.Radio.AwsClient do image_bucket: image_bucket, image_key: image_key }) do - ExAws.S3.head_object(image_bucket, String.trim_leading(image_key, "/")) |> ExAws.request() |> case do @@ -29,6 +28,7 @@ defmodule Orcasite.Radio.AwsClient do ExAws.Lambda.list_functions() |> ExAws.request!() |> Map.get("Functions") + |> Enum.sort_by(&Map.get(&1, "LastModified"), :desc) |> Enum.find_value(fn %{"FunctionName" => name} -> if String.contains?(name, "AudioVizFunction"), do: {:ok, name} end) @@ -46,12 +46,17 @@ defmodule Orcasite.Radio.AwsClient do %{}, invocation_type: :request_response ) - |> ExAws.request() + |> ExAws.request(timeout: :timer.minutes(2)) + |> case do + {:ok, %{"image_size" => image_size, "sample_rate" => sample_rate}} -> + {:ok, %{image_size: image_size, sample_rate: sample_rate}} - # TODO: Set sample rate, size + {:error, err} -> + {:error, err} + end _ -> - nil + {:error, :lambda_not_found} end end end diff --git a/server/lib/orcasite/radio/workers/generate_spectrogram.ex b/server/lib/orcasite/radio/workers/generate_spectrogram.ex new file mode 100644 index 00000000..a3b33852 --- /dev/null +++ b/server/lib/orcasite/radio/workers/generate_spectrogram.ex @@ -0,0 +1,17 @@ +defmodule Orcasite.Radio.Workers.GenerateSpectrogram do + use Oban.Worker, + queue: :feeds, + unique: [ + keys: [:audio_image_id], + period: :infinity, + states: [:available, :scheduled, :executing] + ] + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"audio_image_id" => audio_image_id}}) do + Orcasite.Radio.AudioImage + |> Ash.get!(audio_image_id) + |> Ash.Changeset.for_update(:generate_spectrogram) + |> Ash.update(authorize?: false) + end +end diff --git a/server/lib/orcasite/radio/workers/update_feed_segments.ex b/server/lib/orcasite/radio/workers/update_feed_segments.ex index ee75e6d8..a9629ba0 100644 --- a/server/lib/orcasite/radio/workers/update_feed_segments.ex +++ b/server/lib/orcasite/radio/workers/update_feed_segments.ex @@ -1,13 +1,17 @@ defmodule Orcasite.Radio.Workers.UpdateFeedSegments do use Oban.Worker, queue: :feeds, - unique: [keys: [:feed_stream_id], period: :infinity, states: [:available, :scheduled, :executing]] + unique: [ + keys: [:feed_stream_id], + period: :infinity, + states: [:available, :scheduled, :executing] + ] @impl Oban.Worker def perform(%Oban.Job{args: %{"feed_stream_id" => feed_stream_id}}) do Orcasite.Radio.FeedStream |> Ash.get!(feed_stream_id) |> Ash.Changeset.for_update(:update_segments) - |> Ash.update() + |> Ash.update(authorize?: false) end end diff --git a/server/priv/repo/migrations/20240826204458_add_last_error_to_audio_image.exs b/server/priv/repo/migrations/20240826204458_add_last_error_to_audio_image.exs new file mode 100644 index 00000000..d87decb7 --- /dev/null +++ b/server/priv/repo/migrations/20240826204458_add_last_error_to_audio_image.exs @@ -0,0 +1,29 @@ +defmodule Orcasite.Repo.Migrations.AddLastErrorToAudioImage do + @moduledoc """ + Updates resources based on their most recent snapshots. + + This file was autogenerated with `mix ash_postgres.generate_migrations` + """ + + use Ecto.Migration + + def up do + alter table(:audio_images) do + modify :feed_id, :uuid, null: false + modify :end_time, :utc_datetime_usec, null: false + modify :start_time, :utc_datetime_usec, null: false + modify :status, :text, null: false + add :last_error, :text + end + end + + def down do + alter table(:audio_images) do + remove :last_error + modify :status, :text, null: true + modify :start_time, :utc_datetime_usec, null: true + modify :end_time, :utc_datetime_usec, null: true + modify :feed_id, :uuid, null: true + end + end +end diff --git a/server/priv/resource_snapshots/repo/audio_images/20240826204458.json b/server/priv/resource_snapshots/repo/audio_images/20240826204458.json new file mode 100644 index 00000000..3f8eba56 --- /dev/null +++ b/server/priv/resource_snapshots/repo/audio_images/20240826204458.json @@ -0,0 +1,311 @@ +{ + "attributes": [ + { + "allow_nil?": false, + "default": "fragment(\"gen_random_uuid()\")", + "generated?": false, + "primary_key?": true, + "references": null, + "size": null, + "source": "id", + "type": "uuid" + }, + { + "allow_nil?": true, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "image_type", + "type": "text" + }, + { + "allow_nil?": false, + "default": "\"new\"", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "status", + "type": "text" + }, + { + "allow_nil?": false, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "start_time", + "type": "utc_datetime_usec" + }, + { + "allow_nil?": false, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "end_time", + "type": "utc_datetime_usec" + }, + { + "allow_nil?": true, + "default": "%{}", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "parameters", + "type": "map" + }, + { + "allow_nil?": true, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "image_size", + "type": "bigint" + }, + { + "allow_nil?": true, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "bucket", + "type": "text" + }, + { + "allow_nil?": true, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "bucket_region", + "type": "text" + }, + { + "allow_nil?": true, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "object_path", + "type": "text" + }, + { + "allow_nil?": true, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "last_error", + "type": "text" + }, + { + "allow_nil?": false, + "default": "fragment(\"(now() AT TIME ZONE 'utc')\")", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "inserted_at", + "type": "utc_datetime_usec" + }, + { + "allow_nil?": false, + "default": "fragment(\"(now() AT TIME ZONE 'utc')\")", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "updated_at", + "type": "utc_datetime_usec" + }, + { + "allow_nil?": false, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": { + "deferrable": false, + "destination_attribute": "id", + "destination_attribute_default": null, + "destination_attribute_generated": null, + "index?": false, + "match_type": null, + "match_with": null, + "multitenancy": { + "attribute": null, + "global": null, + "strategy": null + }, + "name": "audio_images_feed_id_fkey", + "on_delete": null, + "on_update": null, + "primary_key?": true, + "schema": null, + "table": "feeds" + }, + "size": null, + "source": "feed_id", + "type": "uuid" + } + ], + "base_filter": null, + "check_constraints": [], + "custom_indexes": [ + { + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "start_time" + ], + "fields": [ + { + "type": "atom", + "value": "start_time" + } + ], + "include": null, + "message": null, + "name": null, + "nulls_distinct": true, + "prefix": null, + "table": null, + "unique": false, + "using": null, + "where": null + }, + { + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "end_time" + ], + "fields": [ + { + "type": "atom", + "value": "end_time" + } + ], + "include": null, + "message": null, + "name": null, + "nulls_distinct": true, + "prefix": null, + "table": null, + "unique": false, + "using": null, + "where": null + }, + { + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "feed_id" + ], + "fields": [ + { + "type": "atom", + "value": "feed_id" + } + ], + "include": null, + "message": null, + "name": null, + "nulls_distinct": true, + "prefix": null, + "table": null, + "unique": false, + "using": null, + "where": null + }, + { + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "bucket" + ], + "fields": [ + { + "type": "atom", + "value": "bucket" + } + ], + "include": null, + "message": null, + "name": null, + "nulls_distinct": true, + "prefix": null, + "table": null, + "unique": false, + "using": null, + "where": null + }, + { + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "image_type" + ], + "fields": [ + { + "type": "atom", + "value": "image_type" + } + ], + "include": null, + "message": null, + "name": null, + "nulls_distinct": true, + "prefix": null, + "table": null, + "unique": false, + "using": null, + "where": null + }, + { + "all_tenants?": false, + "concurrently": false, + "error_fields": [ + "status" + ], + "fields": [ + { + "type": "atom", + "value": "status" + } + ], + "include": null, + "message": null, + "name": null, + "nulls_distinct": true, + "prefix": null, + "table": null, + "unique": false, + "using": null, + "where": null + } + ], + "custom_statements": [], + "has_create_action": true, + "hash": "7B07672D12D04EC10FFF006982C3B991C93DC2D2828C3CE61DF34577E332CDC8", + "identities": [], + "multitenancy": { + "attribute": null, + "global": null, + "strategy": null + }, + "repo": "Elixir.Orcasite.Repo", + "schema": null, + "table": "audio_images" +} \ No newline at end of file