From 44edcfb5a9bc45f1a347f0b19b6992013d696a87 Mon Sep 17 00:00:00 2001 From: Jesse Drelick Date: Wed, 4 Sep 2024 21:47:52 -0400 Subject: [PATCH 01/28] phoenix example --- examples/phoenix.exs | 356 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 examples/phoenix.exs diff --git a/examples/phoenix.exs b/examples/phoenix.exs new file mode 100644 index 0000000..2bdc454 --- /dev/null +++ b/examples/phoenix.exs @@ -0,0 +1,356 @@ +Application.put_env(:agens_demo, AgensDemo.Endpoint, + http: [ip: {127, 0, 0, 1}, port: 8080], + server: true, + live_view: [signing_salt: "agensdemo"], + secret_key_base: String.duplicate("a", 64) +) + +Mix.install([ + {:plug_cowboy, "~> 2.6"}, + {:phoenix, "1.7.10"}, + {:phoenix_live_view, "0.20.1"}, + {:bumblebee, "~> 0.5.0"}, + {:exla, "~> 0.7.0"}, + {:agens, "~> 0.1.2"} +]) + +Application.put_env(:nx, :default_backend, EXLA.Backend) + +defmodule AgensDemo.Layouts do + use Phoenix.Component + + def render("live.html", assigns) do + ~H""" + + + + + <%= @inner_content %> + """ + end +end + +defmodule AgensDemo.ErrorView do + def render(_, _), do: "error" +end + +defmodule AgensDemo.MainLive do + use Phoenix.LiveView, layout: {AgensDemo.Layouts, :live} + + require Logger + + alias Agens.{Agent, Job, Message, Serving} + + @impl true + def mount(_params, _session, socket) do + send(self(), :mounted) + {:ok, assign(socket, text: "", ready: false, result: nil, job: nil, logs: [])} + end + + @impl true + def render(assigns) do + ~H""" +
+
+ <.heading>Input: + <.input text={@text} ready={@ready} /> + <.buttons ready={@ready} /> + <.result result={@result} /> +
+
+ <.heading>Logs: + <.logs logs={@logs} /> +
+
+ """ + end + + attr(:text, :string, required: true) + attr(:ready, :boolean, required: true) + + defp input(assigns) do + ~H""" +
+ +
+ """ + end + + attr(:ready, :boolean, required: true) + + defp buttons(assigns) do + ~H""" +
+ + +
+ """ + end + + attr(:result, :map, required: true) + + defp result(assigns) do + ~H""" +
+ <.heading>Result: + <.async_result :let={result} assign={@result} :if={@result}> + <:loading> + <.spinner /> + + <:failed :let={_reason}> + Oops, something went wrong! + +

<%= result %>

+ +
+ """ + end + + attr(:logs, :list, required: true) + + defp logs(assigns) do + ~H""" +
+
<%= log %>
+
+ """ + end + + slot(:inner_block) + + defp heading(assigns) do + ~H""" +
+

<%= render_slot(@inner_block) %>

+
+ """ + end + + defp spinner(assigns) do + ~H""" + + + + + """ + end + + @impl true + def handle_event("update_text", %{"text" => text}, socket) do + {:noreply, socket |> assign(:text, text)} + end + + @impl true + def handle_event("send_message", _, %{assigns: assigns} = socket) do + name = :my_agent + + socket = + socket + |> assign(:result, nil) + |> assign_async(:result, fn -> + %Message{result: result} = + %Message{ + input: assigns.text, + agent_name: name + } + |> Message.send() + + {:ok, %{result: result}} + end) + |> assign(:logs, ["Sent message to agent: #{name}" | assigns.logs]) + + {:noreply, socket} + end + + @impl true + def handle_event("start_job", _, %{assigns: assigns} = socket) do + name = :my_job + + socket = + socket + |> assign(:text, assigns.text) + |> assign(:result, nil) + + %Job.Config{ + name: name, + steps: [ + %Job.Step{ + agent: :my_agent + }, + %Job.Step{ + agent: :my_agent + }, + %Job.Step{ + agent: :my_agent, + conditions: %{ + "__DEFAULT__" => :end + } + } + ] + } + |> Job.start() + |> case do + {:ok, _pid} -> + send(self(), :job_started) + {:noreply, socket |> assign(:logs, ["Job started: #{name}" | assigns.logs])} + + {:error, {:already_started, _pid}} -> + send(self(), :job_started) + {:noreply, socket |> assign(:logs, ["Job already started: #{name}" | assigns.logs])} + end + end + + @impl true + def handle_info(:job_started, %{assigns: assigns} = socket) do + name = :my_job + Job.run(name, assigns.text) + + {:noreply, socket |> assign(:logs, ["Job running: #{name}" | assigns.logs])} + end + + @impl true + def handle_info(:mounted, %{assigns: assigns} = socket) do + name = :text_generation + + Serving.start(%Serving.Config{ + name: name, + serving: serving() + }) + + send(self(), :serving_started) + {:noreply, socket |> assign(:logs, ["Serving started: #{name}" | assigns.logs])} + end + + @impl true + def handle_info(:serving_started, %{assigns: assigns} = socket) do + name = :my_agent + + Agent.start(%Agent.Config{ + name: name, + serving: :text_generation + }) + + send(self(), :agent_started) + {:noreply, socket |> assign(:logs, ["Agent started: #{name}" | assigns.logs])} + end + + @impl true + def handle_info(:agent_started, %{assigns: assigns} = socket) do + {:noreply, socket |> assign(:logs, ["Ready" | assigns.logs]) |> assign(:ready, true)} + end + + # Agens events + @impl true + def handle_info({:job_started, job_name}, %{assigns: assigns} = socket) do + debug("#{job_name} started") + {:noreply, socket |> assign(:logs, ["Agens event: job_started" | assigns.logs])} + end + + @impl true + def handle_info({:step_started, {job_name, step_index}, input}, %{assigns: assigns} = socket) do + debug("#{job_name} step #{step_index} started with: #{input}") + + {:noreply, + socket |> assign(:logs, ["Agens event: step_started (#{step_index})" | assigns.logs])} + end + + @impl true + def handle_info({:step_result, {job_name, step_index}, result}, %{assigns: assigns} = socket) do + debug("#{job_name} step #{step_index} result: #{result}") + + {:noreply, + socket |> assign(:logs, ["Agens event: step_result (#{step_index})" | assigns.logs])} + end + + @impl true + def handle_info({:job_ended, job_name, result}, %{assigns: assigns} = socket) do + debug("#{job_name} ended: #{result}") + + {:noreply, + socket |> assign(:logs, ["Agens event: job_ended (#{job_name}) #{result}" | assigns.logs])} + end + + # Helpers + defp serving() do + {:ok, gpt2} = Bumblebee.load_model({:hf, "openai-community/gpt2"}) + {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai-community/gpt2"}) + {:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai-community/gpt2"}) + + Bumblebee.Text.generation(gpt2, tokenizer, generation_config) + end + + defp debug(msg) do + Logger.debug(msg) + end +end + +defmodule AgensDemo.Router do + use Phoenix.Router + + import Phoenix.LiveView.Router + + pipeline :browser do + plug(:accepts, ["html"]) + end + + scope "/", AgensDemo do + pipe_through(:browser) + + live("/", MainLive, :index) + end +end + +defmodule AgensDemo.Endpoint do + use Phoenix.Endpoint, otp_app: :agens_demo + + socket("/live", Phoenix.LiveView.Socket) + plug(AgensDemo.Router) +end + +{:ok, _} = + Supervisor.start_link( + [ + {Agens.Supervisor, name: Agens.Supervisor}, + AgensDemo.Endpoint + ], + strategy: :one_for_one + ) + +Process.sleep(:infinity) From e8eb7cf6e01736ec30d03d4f4c6981f34d33f815 Mon Sep 17 00:00:00 2001 From: Jesse Drelick Date: Wed, 4 Sep 2024 22:10:25 -0400 Subject: [PATCH 02/28] examples readme --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index bb904d3..3556d61 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,17 @@ See `Agens.Job` for more information --- +## Examples +The `examples` directory includes a [single-file Phoenix LiveView application](examples/phoenix.exs) showcasing the basic usage of Agens. + +To run the example, use the following command in your terminal: + +```bash +elixir examples/phoenix.exs +``` + +This will start a local Phoenix server, accessible at [http://localhost:8080](http://localhost:8080). + ## Configuration Additional options can be passed to `Agens.Supervisor` in order to override the default values: From 6bd04255746ded5f00f9db5f56fa7b6b6aabec3f Mon Sep 17 00:00:00 2001 From: Jesse Drelick Date: Thu, 5 Sep 2024 16:29:26 -0400 Subject: [PATCH 03/28] remove serving parent name --- examples/phoenix.exs | 73 ++++++++++++++++++++++++++++++++------------ lib/agens/serving.ex | 43 +++++++++----------------- 2 files changed, 69 insertions(+), 47 deletions(-) diff --git a/examples/phoenix.exs b/examples/phoenix.exs index 2bdc454..4d9ba23 100644 --- a/examples/phoenix.exs +++ b/examples/phoenix.exs @@ -12,10 +12,32 @@ Mix.install([ {:bumblebee, "~> 0.5.0"}, {:exla, "~> 0.7.0"}, {:agens, "~> 0.1.2"} + # {:agens, path: Path.expand("../agens")} ]) Application.put_env(:nx, :default_backend, EXLA.Backend) +defmodule AgensDemo.CustomServing do + use GenServer + + alias Agens.{Message, Serving} + + def start_link(args) do + {config, args} = Keyword.pop(args, :config) + GenServer.start_link(__MODULE__, config, args) + end + + def init(%Serving.Config{} = config) do + {:ok, config} + end + + @impl true + def handle_call({:run, %Message{input: input}}, _from, state) do + result = "RESULT: #{input}" + {:reply, result, state} + end +end + defmodule AgensDemo.Layouts do use Phoenix.Component @@ -179,7 +201,7 @@ defmodule AgensDemo.MainLive do @impl true def handle_event("send_message", _, %{assigns: assigns} = socket) do - name = :my_agent + name = :nx_agent socket = socket @@ -212,13 +234,13 @@ defmodule AgensDemo.MainLive do name: name, steps: [ %Job.Step{ - agent: :my_agent + agent: :nx_agent }, %Job.Step{ - agent: :my_agent + agent: :gs_agent }, %Job.Step{ - agent: :my_agent, + agent: :nx_agent, conditions: %{ "__DEFAULT__" => :end } @@ -247,32 +269,45 @@ defmodule AgensDemo.MainLive do @impl true def handle_info(:mounted, %{assigns: assigns} = socket) do - name = :text_generation + name_nx = :nx_serving + name_gs = :gs_serving Serving.start(%Serving.Config{ - name: name, + name: name_nx, serving: serving() }) - send(self(), :serving_started) - {:noreply, socket |> assign(:logs, ["Serving started: #{name}" | assigns.logs])} + Serving.start(%Serving.Config{ + name: name_gs, + serving: AgensDemo.CustomServing + }) + + send(self(), :servings_started) + {:noreply, socket |> assign(:logs, ["Servings started: #{name_nx}, #{name_gs}" | assigns.logs])} end @impl true - def handle_info(:serving_started, %{assigns: assigns} = socket) do - name = :my_agent - - Agent.start(%Agent.Config{ - name: name, - serving: :text_generation - }) - - send(self(), :agent_started) - {:noreply, socket |> assign(:logs, ["Agent started: #{name}" | assigns.logs])} + def handle_info(:servings_started, %{assigns: assigns} = socket) do + name_nx = :nx_agent + name_gs = :gs_agent + + Agent.start([ + %Agent.Config{ + name: name_nx, + serving: :nx_serving + }, + %Agent.Config{ + name: name_gs, + serving: :gs_serving + } + ]) + + send(self(), :agents_started) + {:noreply, socket |> assign(:logs, ["Agents started: #{name_nx}, #{name_gs}" | assigns.logs])} end @impl true - def handle_info(:agent_started, %{assigns: assigns} = socket) do + def handle_info(:agents_started, %{assigns: assigns} = socket) do {:noreply, socket |> assign(:logs, ["Ready" | assigns.logs]) |> assign(:ready, true)} end diff --git a/lib/agens/serving.ex b/lib/agens/serving.ex index 6b18c1f..6f7b79d 100644 --- a/lib/agens/serving.ex +++ b/lib/agens/serving.ex @@ -47,8 +47,7 @@ defmodule Agens.Serving do alias Agens.Message - @suffix "Supervisor" - @parent "Wrapper" + @suffix "Serving" # =========================================================================== # Public API @@ -68,7 +67,6 @@ defmodule Agens.Serving do @spec stop(atom()) :: :ok | {:error, :serving_not_found} def stop(name) when is_atom(name) do name - |> parent_name() |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> GenServer.call(pid, {:stop, name}) :ok = DynamicSupervisor.terminate_child(Agens, pid) @@ -81,7 +79,6 @@ defmodule Agens.Serving do @spec get_config(atom() | pid()) :: {:ok, Config.t()} | {:error, :serving_not_found} def get_config(name) when is_atom(name) do name - |> parent_name() |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> get_config(pid) end) end @@ -95,7 +92,6 @@ defmodule Agens.Serving do @spec run(Message.t()) :: String.t() | {:error, :serving_not_found} def run(%Message{serving_name: name} = message) when is_atom(name) do name - |> parent_name() |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> GenServer.call(pid, {:run, message}) end) @@ -108,10 +104,8 @@ defmodule Agens.Serving do @doc false @spec child_spec(Config.t()) :: Supervisor.child_spec() def child_spec(%Config{} = config) do - name = parent_name(config.name) - %{ - id: name, + id: config.name, start: {__MODULE__, :start_link, [config]}, type: :worker, restart: :transient @@ -121,9 +115,8 @@ defmodule Agens.Serving do @doc false @spec start_link(keyword(), Config.t()) :: GenServer.on_start() def start_link(extra, config) do - name = parent_name(config.name) opts = Keyword.put(extra, :config, config) - GenServer.start_link(__MODULE__, opts, name: name) + GenServer.start_link(__MODULE__, opts, name: config.name) end @doc false @@ -135,10 +128,9 @@ defmodule Agens.Serving do config = Keyword.fetch!(opts, :config) config = if is_nil(config.prompts), do: Map.put(config, :prompts, prompts), else: config state = %State{config: config, registry: registry} - {m, f, a} = start_function(config) - m - |> apply(f, a) + config + |> start_serving() |> case do {:ok, pid} when is_pid(pid) -> name = serving_name(config.name) @@ -188,6 +180,7 @@ defmodule Agens.Serving do @spec do_run(Config.t(), Message.t()) :: String.t() defp do_run(%Config{serving: %Nx.Serving{}}, %Message{} = message) do message.serving_name + |> serving_name() |> Nx.Serving.batched_run(message.prompt) |> case do %{results: [%{text: result}]} -> result @@ -196,30 +189,24 @@ defmodule Agens.Serving do end defp do_run(_, %Message{} = message) do - serving_name = serving_name(message.serving_name) - # need to get pid? - GenServer.call(serving_name, {:run, message}) + message.serving_name + |> serving_name() + |> GenServer.call({:run, message}) end @doc false - @spec start_function(Config.t()) :: tuple() - defp start_function(%Config{serving: %Nx.Serving{} = serving} = config) do - {Nx.Serving, :start_link, [[serving: serving, name: config.name]]} + @spec start_serving(Config.t()) :: tuple() + defp start_serving(%Config{serving: %Nx.Serving{} = serving} = config) do + name = serving_name(config.name) + Nx.Serving.start_link([serving: serving, name: name]) end - @doc false - @spec start_function(Config.t()) :: tuple() - # Module.concat with "Supervisor" for Nx.Serving parity - defp start_function(%Config{serving: serving} = config) when is_atom(serving) do + defp start_serving(%Config{serving: serving} = config) when is_atom(serving) do name = serving_name(config.name) - {serving, :start_link, [[name: name, config: config]]} + GenServer.start_link(serving, config, name: name) end @doc false @spec serving_name(atom) :: atom defp serving_name(name) when is_atom(name), do: Module.concat(name, @suffix) - - @doc false - @spec parent_name(atom) :: atom - defp parent_name(name) when is_atom(name), do: Module.concat(name, @parent) end From 4cd3d1219e3747716ab28804d99cc029db80e915 Mon Sep 17 00:00:00 2001 From: Jesse Drelick Date: Thu, 5 Sep 2024 16:51:30 -0400 Subject: [PATCH 04/28] destination dropdown --- examples/phoenix.exs | 45 +++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/examples/phoenix.exs b/examples/phoenix.exs index 4d9ba23..0c4015d 100644 --- a/examples/phoenix.exs +++ b/examples/phoenix.exs @@ -74,7 +74,16 @@ defmodule AgensDemo.MainLive do @impl true def mount(_params, _session, socket) do send(self(), :mounted) - {:ok, assign(socket, text: "", ready: false, result: nil, job: nil, logs: [])} + + {:ok, + assign(socket, + text: "", + destination: "nx_serving", + ready: false, + result: nil, + job: nil, + logs: [] + )} end @impl true @@ -83,7 +92,7 @@ defmodule AgensDemo.MainLive do
<.heading>Input: - <.input text={@text} ready={@ready} /> + <.input text={@text} destination={@destination} ready={@ready} /> <.buttons ready={@ready} /> <.result result={@result} />
@@ -96,11 +105,18 @@ defmodule AgensDemo.MainLive do end attr(:text, :string, required: true) + attr(:destination, :string, required: true) attr(:ready, :boolean, required: true) defp input(assigns) do ~H""" -
+ +