diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ee9977..45a68d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,27 @@ # Changelog +## 0.1.3 +This release introduces a [single-file Phoenix LiveView example](examples/phoenix.exs) that can be run with `elixir examples/phoenix.exs`. + +In addition, this release removes the use of `Registry`, adds better error handling, and improves `GenServer` usage, including better child specs. + +### Features +- Added `examples/phoenix.exs` example +- Added `Agens.Prefixes` +- Added pass-through `args` and `finalize` function to `Agens.Serving` +- Added `{:job_error, {job.name, step_index}, {:error, reason | exception}}` event to `Agens.Job` +- Added child specs to `Agens` and `Agens.Supervisor` +- Added `{:error, :job_already_running}` when calling `Agens.Job.run/2` on running job +- Added `{:error, :input_required}` when calling `Message.send/1` with empty `input` + +### Breaking Changes +- Removed `Registry` usage and `registry` configuration option +- Changed `prompts` to `prefixes` on `Agens.Serving.Config` +- Added `input` to `@enforce_keys` and removed `nil` as accepted `input` type in `Agens.Message` + +### Fixes +- Removed `restart: :transient` from `Agens.Serving` and `Agens.Agent` child specs + ## 0.1.2 This release removes [application environment configuration](https://hexdocs.pm/elixir/1.17.2/design-anti-patterns.html#using-application-configuration-for-libraries) and moves to an opts-based configuration. See [README.md](README.md#configuration) for more info. diff --git a/README.md b/README.md index bb904d3..3bd20b1 100644 --- a/README.md +++ b/README.md @@ -122,13 +122,23 @@ See `Agens.Job` for more information --- +## Examples +The `examples` directory includes a [single-file Phoenix LiveView application](examples/phoenix.exs) showcasing the basic usage of Agens. + +To run the example, use the following command in your terminal: + +```bash +elixir examples/phoenix.exs +``` + +This will start a local Phoenix server, accessible at [http://localhost:8080](http://localhost:8080). + ## Configuration Additional options can be passed to `Agens.Supervisor` in order to override the default values: ```elixir opts = [ - registry: Agens.MyCustomRegistry, - prompts: custom_prompt_prefixes + prefixes: custom_prompt_prefixes ] children = [ @@ -138,10 +148,10 @@ children = [ Supervisor.start_link(children, strategy: :one_for_one) ``` -The following default prompt prefixes can be copied, customized and used for the `prompts` option above: +The following default prompt prefixes can be copied, customized and used for the `prefixes` option above: ```elixir -%{ +%Agens.Prefixes{ prompt: {"Agent", "You are a specialized agent with the following capabilities and expertise"}, @@ -169,12 +179,12 @@ The following default prompt prefixes can be copied, customized and used for the } ``` -See the [Prompting](#prompting) section below or `Agens.Message` for more information on prompt prefixes. +See the [Prompting](#prompting) section below or `Agens.Prefixes` for more information on prompt prefixes. You can also see `Agens.Supervisor` for more information on configuration options. ## Prompting -Agens provides a variety of different ways to customize the final prompt sent to the language model (LM) or Serving. A natural language string can be assigned to the entity's specialized field (see below), while `nil` values will omit that field from the final prompt. This approach allows for precise control over the prompt’s content. +Agens provides a variety of different ways to customize the final prompt sent to the language model (LM) or Serving. A natural language string can be assigned to the entity's specialized field (see below), while `nil` values will omit that field from the final prompt. This approach allows for precise control over the prompt content. All fields with values, in addition to user input, will be included in the final prompt. The goal should be to balance detailed prompts with efficient token usage by focusing on relevant fields and using concise language. This approach will yield the best results with minimal token usage, keeping costs low and performance high. diff --git a/examples/phoenix.exs b/examples/phoenix.exs new file mode 100644 index 0000000..babe7b1 --- /dev/null +++ b/examples/phoenix.exs @@ -0,0 +1,436 @@ +Application.put_env(:agens_demo, AgensDemo.Endpoint, + http: [ip: {127, 0, 0, 1}, port: 8080], + server: true, + live_view: [signing_salt: "agensdemo"], + secret_key_base: String.duplicate("a", 64) +) + +Mix.install([ + {:plug_cowboy, "~> 2.6"}, + {:phoenix, "1.7.10"}, + {:phoenix_live_view, "0.20.1"}, + {:bumblebee, "~> 0.5.0"}, + {:exla, "~> 0.7.0"}, + {:agens, "~> 0.1.2"} + # {:agens, path: Path.expand("../agens")} +]) + +Application.put_env(:nx, :default_backend, EXLA.Backend) + +defmodule AgensDemo.CustomServing do + use GenServer + + alias Agens.{Message, Serving} + + def start_link(args) do + {config, args} = Keyword.pop(args, :config) + GenServer.start_link(__MODULE__, config, args) + end + + def init(%Serving.Config{} = config) do + {:ok, config} + end + + @impl true + def handle_call({:run, %Message{input: input}}, _from, state) do + result = "RESULT: #{input}" + {:reply, result, state} + end +end + +defmodule AgensDemo.Layouts do + use Phoenix.Component + + def render("live.html", assigns) do + ~H""" + + + + + <%= @inner_content %> + """ + end +end + +defmodule AgensDemo.ErrorView do + def render(_, _), do: "error" +end + +defmodule AgensDemo.MainLive do + use Phoenix.LiveView, layout: {AgensDemo.Layouts, :live} + + require Logger + + alias Agens.{Agent, Job, Message, Serving} + + @impl true + def mount(_params, _session, socket) do + send(self(), :mounted) + + {:ok, + assign(socket, + text: "", + destination: "nx_serving", + ready: false, + result: nil, + job: nil, + logs: [] + )} + end + + @impl true + def render(assigns) do + ~H""" +
+
+ <.heading>Input: + <.input text={@text} destination={@destination} ready={@ready} /> + <.buttons ready={@ready} /> + <.result result={@result} /> +
+
+ <.heading>Logs: + <.logs logs={@logs} /> +
+
+ """ + end + + attr(:text, :string, required: true) + attr(:destination, :string, required: true) + attr(:ready, :boolean, required: true) + + defp input(assigns) do + ~H""" +
+ + +
+ """ + end + + attr(:ready, :boolean, required: true) + + defp buttons(assigns) do + ~H""" +
+ + +
+ """ + end + + attr(:result, :map, required: true) + + defp result(assigns) do + ~H""" +
+ <.heading>Result: + <.async_result :let={result} assign={@result} :if={@result}> + <:loading> + <.spinner /> + + <:failed :let={reason}> +
<%= inspect(reason) %>
+ +

<%= result %>

+ +
+ """ + end + + attr(:logs, :list, required: true) + + defp logs(assigns) do + ~H""" +
+
<%= log %>
+
+ """ + end + + slot(:inner_block) + + defp heading(assigns) do + ~H""" +
+

<%= render_slot(@inner_block) %>

+
+ """ + end + + defp spinner(assigns) do + ~H""" + + + + + """ + end + + @impl true + def handle_event("update_inputs", %{"text" => text, "destination" => destination}, socket) do + {:noreply, socket |> assign(text: text, destination: destination)} + end + + @impl true + def handle_event("send_message", _, %{assigns: assigns} = socket) do + name = + case assigns.destination do + "invalid_serving" -> :invalid_serving + "invalid_agent" -> :invalid_agent + destination -> String.to_existing_atom(destination) + end + + {key, type} = + if name in [:nx_agent, :gs_agent, :invalid_agent], + do: {:agent_name, "agent"}, + else: {:serving_name, "serving"} + + socket = + socket + |> assign(:result, nil) + |> assign_async(:result, fn -> + %Message{ + input: assigns.text + } + |> Map.put(key, name) + |> Message.send() + |> case do + %Message{result: result} -> {:ok, %{result: result}} + {:error, reason} -> {:error, reason} + end + end) + |> assign(:logs, ["Sent message to #{type}: #{name}" | assigns.logs]) + + {:noreply, socket} + end + + @impl true + def handle_event("start_job", _, %{assigns: assigns} = socket) do + name = :my_job + + socket = + socket + |> assign(:text, assigns.text) + |> assign(:result, nil) + + %Job.Config{ + name: name, + steps: [ + %Job.Step{ + agent: :nx_agent + }, + %Job.Step{ + agent: :gs_agent + }, + %Job.Step{ + agent: :nx_agent, + conditions: %{ + "__DEFAULT__" => :end + } + } + ] + } + |> Job.start() + |> case do + {:ok, _pid} -> + send(self(), :job_started) + {:noreply, socket |> assign(:logs, ["Job started: #{name}" | assigns.logs])} + + {:error, {:already_started, _pid}} -> + send(self(), :job_started) + {:noreply, socket |> assign(:logs, ["Job already started: #{name}" | assigns.logs])} + end + end + + @impl true + def handle_info(:job_started, %{assigns: assigns} = socket) do + name = :my_job + + name + |> Job.run(assigns.text) + |> case do + :ok -> + {:noreply, socket |> assign(:logs, ["Job running: #{name}" | assigns.logs])} + {:error, reason} -> + {:noreply, socket |> assign(:logs, ["Job error: #{name}: #{inspect(reason)}" | assigns.logs])} + end + end + + @impl true + def handle_info(:mounted, %{assigns: assigns} = socket) do + name_nx = :nx_serving + name_gs = :gs_serving + + Serving.start(%Serving.Config{ + name: name_nx, + serving: serving() + }) + + Serving.start(%Serving.Config{ + name: name_gs, + serving: AgensDemo.CustomServing + }) + + send(self(), :servings_started) + + {:noreply, + socket |> assign(:logs, ["Servings started: #{name_nx}, #{name_gs}" | assigns.logs])} + end + + @impl true + def handle_info(:servings_started, %{assigns: assigns} = socket) do + name_nx = :nx_agent + name_gs = :gs_agent + + Agent.start([ + %Agent.Config{ + name: name_nx, + serving: :nx_serving + }, + %Agent.Config{ + name: name_gs, + serving: :gs_serving + } + ]) + + send(self(), :agents_started) + {:noreply, socket |> assign(:logs, ["Agents started: #{name_nx}, #{name_gs}" | assigns.logs])} + end + + @impl true + def handle_info(:agents_started, %{assigns: assigns} = socket) do + {:noreply, socket |> assign(:logs, ["Ready" | assigns.logs]) |> assign(:ready, true)} + end + + # Agens events + @impl true + def handle_info({:job_started, job_name}, %{assigns: assigns} = socket) do + debug("#{job_name} started") + {:noreply, socket |> assign(:logs, ["Agens event: job_started" | assigns.logs])} + end + + @impl true + def handle_info({:step_started, {job_name, step_index}, input}, %{assigns: assigns} = socket) do + debug("#{job_name} step #{step_index} started with: #{input}") + + {:noreply, + socket |> assign(:logs, ["Agens event: step_started (#{step_index})" | assigns.logs])} + end + + @impl true + def handle_info({:step_result, {job_name, step_index}, result}, %{assigns: assigns} = socket) do + debug("#{job_name} step #{step_index} result: #{result}") + + {:noreply, + socket |> assign(:logs, ["Agens event: step_result (#{step_index})" | assigns.logs])} + end + + @impl true + def handle_info({:job_ended, job_name, result}, %{assigns: assigns} = socket) do + debug("#{job_name} ended: #{result}") + + {:noreply, + socket |> assign(:logs, ["Agens event: job_ended (#{job_name}) #{result}" | assigns.logs])} + end + + @impl true + def handle_info({:job_error, {job_name, step_index}, err}, %{assigns: assigns} = socket) do + debug("#{job_name} error (step #{step_index}): #{inspect(err)}") + + {:noreply, + socket |> assign(:logs, ["Agens event: job_error (#{job_name}, step #{step_index}) #{inspect(err)}" | assigns.logs])} + end + + # Helpers + defp serving() do + {:ok, gpt2} = Bumblebee.load_model({:hf, "openai-community/gpt2"}) + {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai-community/gpt2"}) + {:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai-community/gpt2"}) + + Bumblebee.Text.generation(gpt2, tokenizer, generation_config) + end + + defp debug(msg) do + Logger.debug(msg) + end +end + +defmodule AgensDemo.Router do + use Phoenix.Router + + import Phoenix.LiveView.Router + + pipeline :browser do + plug(:accepts, ["html"]) + end + + scope "/", AgensDemo do + pipe_through(:browser) + + live("/", MainLive, :index) + end +end + +defmodule AgensDemo.Endpoint do + use Phoenix.Endpoint, otp_app: :agens_demo + + socket("/live", Phoenix.LiveView.Socket) + plug(AgensDemo.Router) +end + +{:ok, _} = + Supervisor.start_link( + [ + {Agens.Supervisor, name: Agens.Supervisor}, + AgensDemo.Endpoint + ], + strategy: :one_for_one + ) + +Process.sleep(:infinity) diff --git a/lib/agens.ex b/lib/agens.ex index 5581286..5d06cd9 100644 --- a/lib/agens.ex +++ b/lib/agens.ex @@ -26,6 +26,16 @@ defmodule Agens do use DynamicSupervisor + @doc false + @spec child_spec(keyword()) :: Supervisor.child_spec() + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + type: :supervisor + } + end + @doc false @spec start_link(keyword()) :: Supervisor.on_start() def start_link(args) do diff --git a/lib/agens/agent.ex b/lib/agens/agent.ex index e4726b7..a649654 100644 --- a/lib/agens/agent.ex +++ b/lib/agens/agent.ex @@ -40,7 +40,7 @@ defmodule Agens.Agent do The Config struct represents the configuration for an Agent process. ## Fields - - `:name` - The name of the Agent process. + - `:name` - The unique name for the Agent process. - `:serving` - The name of the Serving specified in `Agens.Serving.Config`. - `:knowledge` - The knowledge base or data source of the Agent. Default is nil. (Coming soon) - `:prompt` - The string or `Agens.Agent.Prompt` struct defining the Agent. Default is nil. @@ -63,12 +63,11 @@ defmodule Agens.Agent do @moduledoc false @type t :: %__MODULE__{ - registry: atom(), config: Agens.Agent.Config.t() } - @enforce_keys [:registry, :config] - defstruct [:registry, :config] + @enforce_keys [:config] + defstruct [:config] end use GenServer @@ -98,7 +97,6 @@ defmodule Agens.Agent do @spec stop(atom()) :: :ok | {:error, :agent_not_found} def stop(agent_name) do Agens.name_to_pid(agent_name, {:error, :agent_not_found}, fn pid -> - GenServer.call(pid, {:stop, agent_name}) :ok = DynamicSupervisor.terminate_child(Agens, pid) end) end @@ -124,9 +122,7 @@ defmodule Agens.Agent do def child_spec(%Config{} = config) do %{ id: config.name, - start: {__MODULE__, :start_link, [config]}, - type: :worker, - restart: :transient + start: {__MODULE__, :start_link, [config]} } end @@ -141,11 +137,8 @@ defmodule Agens.Agent do @impl true @spec init(keyword()) :: {:ok, State.t()} def init(opts) do - registry = Keyword.fetch!(opts, :registry) config = Keyword.fetch!(opts, :config) - state = %State{registry: registry, config: config} - - {:ok, _} = Registry.register(registry, config.name, {self(), config}) + state = %State{config: config} {:ok, state} end @@ -154,14 +147,6 @@ defmodule Agens.Agent do # Callbacks # =========================================================================== - @doc false - @impl true - @spec handle_call({:stop, atom()}, {pid, term}, State.t()) :: {:reply, :ok, State.t()} - def handle_call({:stop, agent_name}, _from, state) do - Registry.unregister(state.registry, agent_name) - {:reply, :ok, state} - end - @doc false @impl true @spec handle_call(:get_config, {pid, term}, State.t()) :: {:reply, Config.t(), State.t()} diff --git a/lib/agens/job.ex b/lib/agens/job.ex index f493b74..5ed08b2 100644 --- a/lib/agens/job.ex +++ b/lib/agens/job.ex @@ -17,10 +17,16 @@ defmodule Agens.Job do Emitted when a job has started. ``` - {:job_ended, job.name, :completed | {:error, error}} + {:job_ended, job.name, :complete} ``` - Emitted when a job has ended, either due to completion or an error. + Emitted when a job has been completed. + + ``` + {:job_error, {job.name, step_index}, {:error, reason | exception}} + ``` + + Emitted when a job has ended due to an error or unhandled exception. #### Step ``` @@ -82,7 +88,7 @@ defmodule Agens.Job do The Config struct defines the details of a Job. ## Fields - - `name` - An atom that identifies the Job. + - `name` - The unique name used to identify the Job. - `description` - An optional string to be added to the LM prompt that describes the basic goal of the Job. - `steps` - A list of `Agens.Job.Step` structs that define the sequence of agent actions to be performed. """ @@ -104,12 +110,11 @@ defmodule Agens.Job do status: :init | :running | :error | :completed, step_index: non_neg_integer() | nil, config: Config.t(), - parent: pid() | nil, - registry: atom() + parent: pid() | nil } - @enforce_keys [:status, :config, :registry] - defstruct [:status, :step_index, :config, :parent, :registry] + @enforce_keys [:status, :config] + defstruct [:status, :step_index, :config, :parent] end use GenServer @@ -147,7 +152,7 @@ defmodule Agens.Job do A supervised process for the Job must be started first using `start/1`. """ - @spec run(pid | atom, String.t()) :: {:ok, term} | {:error, :job_not_found} + @spec run(pid | atom, String.t()) :: :ok | {:error, :job_not_found} def run(job_name, input) when is_atom(job_name) do Agens.name_to_pid(job_name, {:error, :job_not_found}, fn pid -> run(pid, input) end) end @@ -166,7 +171,6 @@ defmodule Agens.Job do %{ id: config.name, start: {__MODULE__, :start_link, [config]}, - type: :worker, restart: :transient } end @@ -182,9 +186,8 @@ defmodule Agens.Job do @impl true @spec init(keyword()) :: {:ok, State.t()} def init(opts) do - registry = Keyword.fetch!(opts, :registry) config = Keyword.fetch!(opts, :config) - {:ok, %State{status: :init, config: config, registry: registry}} + {:ok, %State{status: :init, config: config}} end # =========================================================================== @@ -201,6 +204,10 @@ defmodule Agens.Job do @doc false @impl true @spec handle_call({:run, String.t()}, {pid, term}, State.t()) :: {:reply, :ok, State.t()} + def handle_call({:run, _}, _, %{status: :running} = state) do + {:reply, {:error, :job_already_running}, state} + end + def handle_call({:run, input}, {parent, _}, state) do new_state = %State{state | status: :running, step_index: 0, parent: parent} {:reply, :ok, new_state, {:continue, {:run, input}}} @@ -239,22 +246,31 @@ defmodule Agens.Job do @doc false @impl true - @spec handle_cast(:end, State.t()) :: {:stop, :complete, State.t()} - def handle_cast(:end, %State{} = state) do + @spec handle_cast(:end, State.t()) :: {:stop, :normal, State.t()} + def handle_cast(:end, %State{config: %Config{name: name}} = state) do new_state = %State{state | status: :complete} - {:stop, :complete, new_state} + send(state.parent, {:job_ended, name, :complete}) + {:stop, :normal, new_state} end @doc false @impl true - @spec terminate(:complete | {term(), list()}, State.t()) :: :ok - def terminate(:complete, %State{config: %{name: name}} = state) do - send(state.parent, {:job_ended, name, :complete}) - :ok + @spec handle_cast({:error, atom()}, State.t()) :: {:stop, :shutdown, State.t()} + def handle_cast({:error, _reason} = err, %State{config: %Config{name: name}} = state) do + new_state = %State{state | status: :error} + send(state.parent, {:job_error, {name, state.step_index}, err}) + {:stop, :shutdown, new_state} end + @doc false + @impl true + @spec terminate(:normal | :shutdown | {term(), list()}, State.t()) :: :ok def terminate({exception, _}, %State{config: %{name: name}} = state) do - send(state.parent, {:job_ended, name, {:error, exception}}) + send(state.parent, {:job_error, {name, state.step_index}, {:error, exception}}) + :ok + end + + def terminate(_reason, _state) do :ok end @@ -278,13 +294,21 @@ defmodule Agens.Job do } send(state.parent, {:step_started, {message.job_name, message.step_index}, message.input}) - message = Message.send(message) - send(state.parent, {:step_result, {message.job_name, message.step_index}, message.result}) - if step.conditions do - do_conditions(step.conditions, message) - else - GenServer.cast(self(), {:next, message}) + message + |> Message.send() + |> case do + %Message{} = message -> + send(state.parent, {:step_result, {message.job_name, message.step_index}, message.result}) + + if step.conditions do + do_conditions(step.conditions, message) + else + GenServer.cast(self(), {:next, message}) + end + + {:error, reason} -> + GenServer.cast(self(), {:error, reason}) end end diff --git a/lib/agens/message.ex b/lib/agens/message.ex index 0eb7689..fb0b93f 100644 --- a/lib/agens/message.ex +++ b/lib/agens/message.ex @@ -5,7 +5,7 @@ defmodule Agens.Message do ## Fields * `:parent_pid` - The process identifier of the parent/caller process. - * `:input` - The input string for the message. + * `:input` - The input string for the message. Required. * `:prompt` - The final prompt string constructed for `Agens.Serving.run/1`. * `:result` - The result string for the message. * `:agent_name` - The name of the `Agens.Agent`. @@ -18,7 +18,7 @@ defmodule Agens.Message do @type t :: %__MODULE__{ parent_pid: pid() | nil, - input: String.t() | nil, + input: String.t(), prompt: String.t() | Agens.Agent.Prompt.t() | nil, result: String.t() | nil, agent_name: atom() | nil, @@ -29,7 +29,7 @@ defmodule Agens.Message do step_objective: String.t() | nil } - @enforce_keys [] + @enforce_keys [:input] defstruct [ :parent_pid, :input, @@ -43,22 +43,25 @@ defmodule Agens.Message do :step_objective ] - alias Agens.{Agent, Serving} + alias Agens.{Agent, Prefixes, Serving} @doc """ Sends an `Agens.Message` to an `Agens.Agent` or `Agens.Serving`. """ @spec send(t()) :: t() | {:error, atom()} + def send(%__MODULE__{input: input}) when input in ["", nil] do + {:error, :input_required} + end + def send(%__MODULE__{agent_name: nil, serving_name: nil}) do {:error, :no_agent_or_serving_name} end def send(%__MODULE__{} = message) do with {:ok, agent_config} <- maybe_get_agent_config(message.agent_name), - {:ok, serving_config} <- get_serving_config(agent_config, message) do - base = build_prompt(agent_config, message, serving_config.prompts) - prompt = "[INST]#{base}[/INST]" - + {:ok, serving_config} <- get_serving_config(agent_config, message), + base <- build_prompt(agent_config, message, serving_config.prefixes), + {:ok, prompt} <- Serving.finalize(serving_config.name, base) do message = message |> Map.put(:prompt, prompt) @@ -75,8 +78,8 @@ defmodule Agens.Message do end @doc false - @spec build_prompt(Agent.Config.t() | nil, t(), map()) :: String.t() - defp build_prompt(agent_config, %__MODULE__{} = message, prompts) do + @spec build_prompt(Agent.Config.t() | nil, t(), Prefixes.t()) :: String.t() + defp build_prompt(agent_config, %__MODULE__{} = message, prefixes) do %{ objective: message.step_objective, description: message.job_description @@ -85,7 +88,7 @@ defmodule Agens.Message do |> maybe_add_tool(agent_config) |> maybe_prep_input(message.input, agent_config) |> Enum.reject(&filter_empty/1) - |> Enum.map(fn {key, value} -> field({key, value}, prompts) end) + |> Enum.map(fn {key, value} -> field({key, value}, prefixes) end) |> Enum.map(&to_prompt/1) |> Enum.join("\n\n") end @@ -95,9 +98,9 @@ defmodule Agens.Message do defp filter_empty({_, value}), do: value == "" or is_nil(value) @doc false - @spec field({atom(), String.t()}, map()) :: {String.t(), String.t()} - defp field({key, value}, prompts) do - {Map.get(prompts, key), value} + @spec field({atom(), String.t()}, Prefixes.t()) :: {String.t(), String.t()} + defp field({key, value}, prefixes) do + {Map.get(prefixes, key), value} end @doc false diff --git a/lib/agens/prefixes.ex b/lib/agens/prefixes.ex new file mode 100644 index 0000000..23c47dd --- /dev/null +++ b/lib/agens/prefixes.ex @@ -0,0 +1,84 @@ +defmodule Agens.Prefixes do + @moduledoc """ + The Prefixes struct is used to configure prompt prefixes for building advanced prompts. + + For each field used in the prompt (based on the configuration of Agents, Servings, and Jobs), a `heading` will be added, as well as some additional `detail`. + + For example, if you are running an `Agens.Job` and have defined an `objective` for the current `Agens.Job.Step`, the following will be added to the prompt: + + ```markdown + ## Step Objective + + The objective of this step is to {{step.objective}} + ``` + + However, if you have not defined an `objective` for the current `Agens.Job.Step`, the `heading` and `detail` will also be omitted. + + Default prompt prefixes can be overridden globally with the `prefixes` option in `Agens.Supervisor`, or for individual servings with `Agens.Serving.Config`. + + See the [Prompting](README.md#prompting) section in the README for more information. + """ + + @type pair :: {heading :: String.t(), detail :: String.t()} + @type t :: %__MODULE__{ + prompt: pair(), + identity: pair(), + context: pair(), + constraints: pair(), + examples: pair(), + reflection: pair(), + instructions: pair(), + objective: pair(), + description: pair(), + input: pair() + } + + @enforce_keys [ + :prompt, + :identity, + :context, + :constraints, + :examples, + :reflection, + :instructions, + :objective, + :description, + :input + ] + defstruct [ + :prompt, + :identity, + :context, + :constraints, + :examples, + :reflection, + :instructions, + :objective, + :description, + :input + ] + + @doc false + @spec default() :: t + def default() do + %__MODULE__{ + prompt: + {"Agent", "You are a specialized agent with the following capabilities and expertise"}, + identity: + {"Identity", "You are a specialized agent with the following capabilities and expertise"}, + context: {"Context", "The purpose or goal behind your tasks are to"}, + constraints: + {"Constraints", "You must operate with the following constraints or limitations"}, + examples: + {"Examples", "You should consider the following examples before returning results"}, + reflection: + {"Reflection", "You should reflect on the following factors before returning results"}, + instructions: + {"Tool Instructions", + "You should provide structured output for function calling based on the following instructions"}, + objective: {"Step Objective", "The objective of this step is to"}, + description: {"Job Description", "This is part of multi-step job to achieve the following"}, + input: {"Input", "The following is the actual input from the user, system or another agent"} + } + end +end diff --git a/lib/agens/serving.ex b/lib/agens/serving.ex index 6b18c1f..7f9a619 100644 --- a/lib/agens/serving.ex +++ b/lib/agens/serving.ex @@ -16,39 +16,41 @@ defmodule Agens.Serving do The Config struct represents the configuration for a Serving process. ## Fields - - `:name` - The name of the `Agens.Serving` process. + - `:name` - The unique name for the Serving process. - `:serving` - The `Nx.Serving` struct or `GenServer` module for the `Agens.Serving`. - - `:prompts` - A map of custom prompt prefixes. If `nil`, default prompt prefixes will be used instead. Default prompt prefixes can also be overridden by using the `prompts` options in `Agens.Supervisor`. + - `:prefixes` - An `Agens.Prefixes` struct of custom prompt prefixes. If `nil`, default prompt prefixes will be used instead. Default prompt prefixes can also be overridden by using the `prefixes` options in `Agens.Supervisor`. + - `:finalize` - A function that accepts the prepared prompt (including any applied prefixes) and returns a modified version of the prompt. Useful for wrapping the prompt or applying final processing before sending to the LM for inference. If `nil`, the prepared prompt will be used as-is. + - `:args` - Additional arguments to be passed to the `Nx.Serving` or `GenServer` module. See the [Nx.Serving](https://hexdocs.pm/nx/Nx.Serving.html) or [GenServer](https://hexdocs.pm/elixir/GenServer.html) documentation for more information. """ @type t :: %__MODULE__{ name: atom(), serving: Nx.Serving.t() | module(), - prompts: map() | nil + args: keyword(), + prefixes: Agens.Prefixes.t() | nil, + finalize: (String.t() -> String.t()) | nil } @enforce_keys [:name, :serving] - defstruct [:name, :serving, :prompts] + defstruct [:name, :serving, :prefixes, :finalize, args: []] end defmodule State do @moduledoc false @type t :: %__MODULE__{ - registry: atom(), config: Config.t() } - @enforce_keys [:registry, :config] - defstruct [:registry, :config] + @enforce_keys [:config] + defstruct [:config] end use GenServer alias Agens.Message - @suffix "Supervisor" - @parent "Wrapper" + @suffix "Serving" # =========================================================================== # Public API @@ -68,9 +70,7 @@ defmodule Agens.Serving do @spec stop(atom()) :: :ok | {:error, :serving_not_found} def stop(name) when is_atom(name) do name - |> parent_name() |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> - GenServer.call(pid, {:stop, name}) :ok = DynamicSupervisor.terminate_child(Agens, pid) end) end @@ -81,7 +81,6 @@ defmodule Agens.Serving do @spec get_config(atom() | pid()) :: {:ok, Config.t()} | {:error, :serving_not_found} def get_config(name) when is_atom(name) do name - |> parent_name() |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> get_config(pid) end) end @@ -95,12 +94,24 @@ defmodule Agens.Serving do @spec run(Message.t()) :: String.t() | {:error, :serving_not_found} def run(%Message{serving_name: name} = message) when is_atom(name) do name - |> parent_name() |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> GenServer.call(pid, {:run, message}) end) end + @doc false + @spec finalize(atom() | pid(), String.t()) :: {:ok, String.t()} | {:error, :serving_not_found} + def finalize(name, prompt) when is_atom(name) do + name + |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> + finalize(pid, prompt) + end) + end + + def finalize(pid, prompt) when is_pid(pid) do + {:ok, GenServer.call(pid, {:finalize, prompt})} + end + # =========================================================================== # Setup # =========================================================================== @@ -108,42 +119,32 @@ defmodule Agens.Serving do @doc false @spec child_spec(Config.t()) :: Supervisor.child_spec() def child_spec(%Config{} = config) do - name = parent_name(config.name) - %{ - id: name, - start: {__MODULE__, :start_link, [config]}, - type: :worker, - restart: :transient + id: config.name, + start: {__MODULE__, :start_link, [config]} } end @doc false @spec start_link(keyword(), Config.t()) :: GenServer.on_start() def start_link(extra, config) do - name = parent_name(config.name) opts = Keyword.put(extra, :config, config) - GenServer.start_link(__MODULE__, opts, name: name) + GenServer.start_link(__MODULE__, opts, name: config.name) end @doc false @impl true @spec init(keyword()) :: {:ok, State.t()} | {:stop, term(), State.t()} def init(opts) do - registry = Keyword.fetch!(opts, :registry) - prompts = Keyword.fetch!(opts, :prompts) + prefixes = Keyword.fetch!(opts, :prefixes) config = Keyword.fetch!(opts, :config) - config = if is_nil(config.prompts), do: Map.put(config, :prompts, prompts), else: config - state = %State{config: config, registry: registry} - {m, f, a} = start_function(config) + config = if is_nil(config.prefixes), do: Map.put(config, :prefixes, prefixes), else: config + state = %State{config: config} - m - |> apply(f, a) + config + |> start_serving() |> case do {:ok, pid} when is_pid(pid) -> - name = serving_name(config.name) - {:ok, _} = Registry.register(registry, name, {pid, config}) - {:ok, state} {:error, reason} -> @@ -155,15 +156,6 @@ defmodule Agens.Serving do # Callbacks # =========================================================================== - @doc false - @impl true - @spec handle_call({:stop, atom()}, {pid, term}, State.t()) :: {:reply, :ok, State.t()} - def handle_call({:stop, serving_name}, _from, state) do - serving_name = serving_name(serving_name) - Registry.unregister(state.registry, serving_name) - {:reply, :ok, state} - end - @doc false @impl true @spec handle_call(:get_config, {pid, term}, State.t()) :: {:reply, Config.t(), State.t()} @@ -180,6 +172,20 @@ defmodule Agens.Serving do {:reply, result, state} end + @doc false + @impl true + @spec handle_call({:finalize, String.t()}, {pid, term}, State.t()) :: + {:reply, String.t(), State.t()} + def handle_call({:finalize, prompt}, _, %State{config: %Config{finalize: finalize}} = state) do + final = + case finalize do + fun when is_function(fun, 1) -> fun.(prompt) + _ -> prompt + end + + {:reply, final, state} + end + # =========================================================================== # Private # =========================================================================== @@ -188,6 +194,7 @@ defmodule Agens.Serving do @spec do_run(Config.t(), Message.t()) :: String.t() defp do_run(%Config{serving: %Nx.Serving{}}, %Message{} = message) do message.serving_name + |> serving_name() |> Nx.Serving.batched_run(message.prompt) |> case do %{results: [%{text: result}]} -> result @@ -196,30 +203,32 @@ defmodule Agens.Serving do end defp do_run(_, %Message{} = message) do - serving_name = serving_name(message.serving_name) - # need to get pid? - GenServer.call(serving_name, {:run, message}) + message.serving_name + |> serving_name() + |> GenServer.call({:run, message}) end @doc false - @spec start_function(Config.t()) :: tuple() - defp start_function(%Config{serving: %Nx.Serving{} = serving} = config) do - {Nx.Serving, :start_link, [[serving: serving, name: config.name]]} + @spec start_serving(Config.t()) :: tuple() + defp start_serving(%Config{serving: %Nx.Serving{} = serving, args: args} = config) do + name = serving_name(config.name) + + opts = + args + |> Keyword.put(:serving, serving) + |> Keyword.put(:name, name) + + Nx.Serving.start_link(opts) end - @doc false - @spec start_function(Config.t()) :: tuple() - # Module.concat with "Supervisor" for Nx.Serving parity - defp start_function(%Config{serving: serving} = config) when is_atom(serving) do + defp start_serving(%Config{serving: serving, args: args} = config) when is_atom(serving) do name = serving_name(config.name) - {serving, :start_link, [[name: name, config: config]]} + opts = Keyword.put(args, :name, name) + + GenServer.start_link(serving, config, opts) end @doc false @spec serving_name(atom) :: atom defp serving_name(name) when is_atom(name), do: Module.concat(name, @suffix) - - @doc false - @spec parent_name(atom) :: atom - defp parent_name(name) when is_atom(name), do: Module.concat(name, @parent) end diff --git a/lib/agens/supervisor.ex b/lib/agens/supervisor.ex index a5d34ca..0ab7219 100644 --- a/lib/agens/supervisor.ex +++ b/lib/agens/supervisor.ex @@ -2,7 +2,7 @@ defmodule Agens.Supervisor do @moduledoc """ The Supervisor module for the Agens application. - `Agens.Supervisor` starts a `DynamicSupervisor` for managing `Agens.Agent`, `Agens.Serving`, and `Agens.Job` processes. It also starts a `Registry` for keeping track of these processes. + `Agens.Supervisor` starts a `DynamicSupervisor` for managing `Agens.Agent`, `Agens.Serving`, and `Agens.Job` processes. In order to use `Agens` simply add `Agens.Supervisor` to your application supervision tree: @@ -16,33 +16,25 @@ defmodule Agens.Supervisor do ``` ### Options - * `:registry` (`atom`) - The default registry can be overriden with this option. Default is `Agens.Registry`. - * `:prompts` (`map`) - The default prompt prefixes can be overriden with this option. Each `Agens.Serving.Config` can also override the defaults on a per-serving basis. + * `:prefixes` (`Agens.Prefixes`) - The default prompt prefixes can be overriden with this option. Each `Agens.Serving.Config` can also override the defaults on a per-serving basis. See the [README.md](README.md#configuration) for more info. """ use Supervisor - @default_registry Agens.Registry - @default_prompts %{ - prompt: - {"Agent", "You are a specialized agent with the following capabilities and expertise"}, - identity: - {"Identity", "You are a specialized agent with the following capabilities and expertise"}, - context: {"Context", "The purpose or goal behind your tasks are to"}, - constraints: - {"Constraints", "You must operate with the following constraints or limitations"}, - examples: {"Examples", "You should consider the following examples before returning results"}, - reflection: - {"Reflection", "You should reflect on the following factors before returning results"}, - instructions: - {"Tool Instructions", - "You should provide structured output for function calling based on the following instructions"}, - objective: {"Step Objective", "The objective of this step is to"}, - description: {"Job Description", "This is part of multi-step job to achieve the following"}, - input: {"Input", "The following is the actual input from the user, system or another agent"} - } - @default_opts [registry: @default_registry, prompts: @default_prompts] + alias Agens.Prefixes + + @default_opts [prefixes: Prefixes.default()] + + @doc false + @spec child_spec(keyword()) :: Supervisor.child_spec() + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + type: :supervisor + } + end @doc false @spec start_link(keyword()) :: Supervisor.on_start() @@ -61,11 +53,8 @@ defmodule Agens.Supervisor do [:supervisor.child_spec() | (old_erlang_child_spec :: :supervisor.child_spec())]}} | :ignore def init(opts) do - registry = Keyword.fetch!(opts, :registry) - children = [ - {Agens, name: Agens, opts: opts}, - {Registry, keys: :unique, name: registry} + {Agens, name: Agens, opts: opts} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/test/agens/job_test.exs b/test/agens/job_test.exs index fa659d5..44fa136 100644 --- a/test/agens/job_test.exs +++ b/test/agens/job_test.exs @@ -57,17 +57,30 @@ defmodule Agens.JobTest do end describe "errors" do - setup [:start_agens, :start_job] + setup [:start_agens, :start_serving, :start_job] - test "start running", %{job: job, pid: pid} do + test "already started", %{job: job, pid: pid} do assert is_pid(pid) assert {:error, {:already_started, ^pid}} = Job.start(job) end - test "job missing" do + test "job not found" do assert {:error, :job_not_found} == Job.run(:missing_job, "input") end + + test "job already running", %{job: job} do + assert :ok == Job.run(job.name, "input") + assert {:error, :job_already_running} == Job.run(job.name, "input") + end + + test "job error", %{job: %{name: name}} do + assert :ok == Job.run(name, nil) + assert_receive {:job_started, ^name} + + assert_receive {:step_started, {^name, 0}, nil} + assert_receive {:job_error, {^name, 0}, {:error, :input_required}} + end end describe "config" do @@ -92,10 +105,11 @@ defmodule Agens.JobTest do setup [:start_agens, :start_serving, :start_job] @tag capture_log: true - test "start", %{job: %{name: name}, pid: pid} do + test "start", %{job: %{name: name} = job, pid: pid} do input = "D" assert is_pid(pid) + assert {:error, {:already_started, ^pid}} = Job.start(job) assert Job.run(name, input) == :ok assert_receive {:job_started, ^name} @@ -125,6 +139,12 @@ defmodule Agens.JobTest do assert_receive {:step_result, {^name, 2}, "TRUE"} assert_receive {:job_ended, ^name, :complete} + + # Job must be started manually after job completion (transient restart) + assert job.name |> Process.whereis() |> is_nil() + assert {:error, :job_not_found} = Job.run(name, input) + assert {:ok, pid} = Job.start(job) + assert is_pid(pid) end end @@ -178,7 +198,7 @@ defmodule Agens.JobTest do assert_receive {:step_started, {^name, 1}, "E"} assert_receive {:step_result, {^name, 1}, "E"} - assert_receive {:job_ended, ^name, + assert_receive {:job_error, {^name, 1}, {:error, %RuntimeError{message: "Invalid step index: :invalid"}}} assert_receive {:DOWN, ^ref, :process, ^pid, _reason} diff --git a/test/agens/message_test.exs b/test/agens/message_test.exs index 08c7f75..869a40e 100644 --- a/test/agens/message_test.exs +++ b/test/agens/message_test.exs @@ -3,6 +3,8 @@ defmodule Agens.MessageTest do alias Agens.Message + def wrap_prompt(prompt), do: "[INST]#{prompt}[/INST]" + defp start_agens(_ctx) do {:ok, _pid} = start_supervised({Agens.Supervisor, name: Agens.Supervisor}) :ok @@ -11,16 +13,17 @@ defmodule Agens.MessageTest do defp start_serving(_ctx) do %Agens.Serving.Config{ name: :text_generation, - serving: Test.Support.Serving.Stub + serving: Test.Support.Serving.Stub, + finalize: &wrap_prompt/1 } |> Agens.Serving.start() :ok end - describe "no agent or serving" do - test "returns error" do - assert {:error, :no_agent_or_serving_name} == Message.send(%Message{}) + describe "errors" do + test "no agent or serving" do + assert {:error, :no_agent_or_serving_name} == Message.send(%Message{input: "test"}) end end @@ -30,12 +33,16 @@ defmodule Agens.MessageTest do test "works with explicit serving" do serving_name = :text_generation + wrapped = + wrap_prompt( + "## Input\nThe following is the actual input from the user, system or another agent: test\n" + ) + assert %Message{ input: "test", serving_name: serving_name, result: "sent 'test' to: ", - prompt: - "[INST]## Input\nThe following is the actual input from the user, system or another agent: test\n[/INST]" + prompt: wrapped } == Message.send(%Message{serving_name: serving_name, input: "test"}) end end diff --git a/test/agens/prefixes_test.exs b/test/agens/prefixes_test.exs new file mode 100644 index 0000000..fec1846 --- /dev/null +++ b/test/agens/prefixes_test.exs @@ -0,0 +1,36 @@ +defmodule Agens.PrefixesTest do + use ExUnit.Case, async: false + + alias Agens.Prefixes + + describe "prefixes" do + test "default" do + assert %Prefixes{ + prompt: + {"Agent", + "You are a specialized agent with the following capabilities and expertise"}, + identity: + {"Identity", + "You are a specialized agent with the following capabilities and expertise"}, + context: {"Context", "The purpose or goal behind your tasks are to"}, + constraints: + {"Constraints", "You must operate with the following constraints or limitations"}, + examples: + {"Examples", + "You should consider the following examples before returning results"}, + reflection: + {"Reflection", + "You should reflect on the following factors before returning results"}, + instructions: + {"Tool Instructions", + "You should provide structured output for function calling based on the following instructions"}, + objective: {"Step Objective", "The objective of this step is to"}, + description: + {"Job Description", "This is part of multi-step job to achieve the following"}, + input: + {"Input", + "The following is the actual input from the user, system or another agent"} + } = Prefixes.default() + end + end +end diff --git a/test/agens/serving_test.exs b/test/agens/serving_test.exs index 0bcf107..d5fa993 100644 --- a/test/agens/serving_test.exs +++ b/test/agens/serving_test.exs @@ -4,8 +4,7 @@ defmodule Agens.ServingTest do alias Agens.{Message, Serving} defp start_agens(_ctx) do - opts = [registry: Agens.CustomRegistry] - {:ok, _pid} = start_supervised({Agens.Supervisor, name: Agens.Supervisor, opts: opts}) + {:ok, _pid} = start_supervised({Agens.Supervisor, name: Agens.Supervisor}) :ok end @@ -102,7 +101,7 @@ defmodule Agens.ServingTest do batch = Nx.Batch.stack([Nx.tensor([1, 2, 3])]) - message = %Message{serving_name: serving_name, prompt: batch} + message = %Message{serving_name: serving_name, prompt: batch, input: "not used"} assert %Nx.Tensor{ type: {:s, 64},