From 542cc4f6baf2e3af48943caaf5ae83e3d4c638f5 Mon Sep 17 00:00:00 2001 From: jessedrelick Date: Fri, 30 Aug 2024 12:21:01 -0400 Subject: [PATCH] Opts config (#2) * serving init opts * serving run pass * serving tests passing * handle extra args * move message send logic * message send * organize * format * main typespecs * other typespecs * message tests * unreachable code * remove serving already started case * finish typespecs * name to pid * update docs * docs final * changelog --- CHANGELOG.md | 18 ++++ README.md | 118 +++++++++++---------- config/config.exs | 22 ---- lib/agens.ex | 160 ++++------------------------- lib/agens/agent.ex | 125 ++++++++++++++++------- lib/agens/job.ex | 108 +++++++++----------- lib/agens/message.ex | 176 ++++++++++++++++++++++++++++++++ lib/agens/serving.ex | 198 ++++++++++++++++++++++++++++-------- lib/agens/supervisor.ex | 52 +++++++--- mix.exs | 2 +- test/agens/agent_test.exs | 4 +- test/agens/job_test.exs | 4 +- test/agens/message_test.exs | 42 ++++++++ test/agens/serving_test.exs | 20 +++- 14 files changed, 668 insertions(+), 381 deletions(-) create mode 100644 CHANGELOG.md delete mode 100644 config/config.exs create mode 100644 lib/agens/message.ex create mode 100644 test/agens/message_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..9ee9977 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,18 @@ +# Changelog + +## 0.1.2 +This release removes [application environment configuration](https://hexdocs.pm/elixir/1.17.2/design-anti-patterns.html#using-application-configuration-for-libraries) and moves to an opts-based configuration. See [README.md](README.md#configuration) for more info. + +### Features +- Configure `Agens` via `Supervisor` opts instead of `Application` environment +- Add `Agens.Agent.get_config/1` +- Add `Agens.Serving.get_config/1` +- Support sending `Agens.Message` without `Agens.Agent` +- Override default prompt prefixes with `Agens.Serving` + +### Fixes +- `Agens.Job.get_config/1` now wraps return value with `:ok` tuple: `{:ok, Agens.Job.Config.t()}` +- Replaced `module() | Nx.Serving.t()` with `atom()` in `Agens.Agent.Config.t()` + +## 0.1.1 +Initial release \ No newline at end of file diff --git a/README.md b/README.md index 793157f..f2dbdb6 100644 --- a/README.md +++ b/README.md @@ -28,46 +28,6 @@ def deps do end ``` -## Configuration -Future versions of Agens will be configurable by providing options to `Agens.Supervisor` in order to [avoid using application configuration](https://hexdocs.pm/elixir/1.17.2/design-anti-patterns.html#using-application-configuration-for-libraries). For now, however, you can change the Agens `Registry` if needed via config: - -```elixir -config :agens, registry: Agens.Registry -``` - -In addition, you can also override the prompt prefixes: - -```elixir -config :agens, prompts: %{ - prompt: - {"Agent", - "You are a specialized agent with the following capabilities and expertise"}, - identity: - {"Identity", - "You are a specialized agent with the following capabilities and expertise"}, - context: {"Context", "The purpose or goal behind your tasks are to"}, - constraints: - {"Constraints", "You must operate with the following constraints or limitations"}, - examples: - {"Examples", - "You should consider the following examples before returning results"}, - reflection: - {"Reflection", - "You should reflect on the following factors before returning results"}, - instructions: - {"Tool Instructions", - "You should provide structured output for function calling based on the following instructions"}, - objective: {"Step Objective", "The objective of this step is to"}, - description: - {"Job Description", "This is part of multi-step job to achieve the following"}, - input: - {"Input", - "The following is the actual input from the user, system or another agent"} -} -``` - -See the [Prompting](#prompting) section below or `Agens.Message` for more information on prompt prefixes. - ## Usage Building a multi-agent workflow with Agens involves a few different steps and core entities: @@ -77,12 +37,11 @@ Building a multi-agent workflow with Agens involves a few different steps and co This will start Agens as a supervised process inside your application: ```elixir -Supervisor.start_link( - [ - {Agens.Supervisor, name: Agens.Supervisor} - ], - strategy: :one_for_one -) +children = [ + {Agens.Supervisor, name: Agens.Supervisor} +] + +Supervisor.start_link(children, strategy: :one_for_one) ``` See `Agens.Supervisor` for more information @@ -96,19 +55,17 @@ A **Serving** is essentially a wrapper for language model inference. It can be a Application.put_env(:nx, :default_backend, EXLA.Backend) auth_token = System.get_env("HF_AUTH_TOKEN") -my_serving = fn -> - repo = {:hf, "mistralai/Mistral-7B-Instruct-v0.2", auth_token: auth_token} +repo = {:hf, "mistralai/Mistral-7B-Instruct-v0.2", auth_token: auth_token} - {:ok, model} = Bumblebee.load_model(repo, type: :bf16) - {:ok, tokenizer} = Bumblebee.load_tokenizer(repo) - {:ok, generation_config} = Bumblebee.load_generation_config(repo) +{:ok, model} = Bumblebee.load_model(repo, type: :bf16) +{:ok, tokenizer} = Bumblebee.load_tokenizer(repo) +{:ok, generation_config} = Bumblebee.load_generation_config(repo) - Bumblebee.Text.generation(model, tokenizer, generation_config) -end +serving = Bumblebee.Text.generation(model, tokenizer, generation_config) serving_config = %Agens.Serving.Config{ name: :my_serving, - serving: my_serving() + serving: serving } {:ok, pid} = Agens.Serving.start(serving_config) @@ -165,10 +122,61 @@ See `Agens.Job` for more information --- +## Configuration +Additional options can be passed to `Agens.Supervisor` in order to override the default values: + +```elixir +opts = [ + registry: Agens.MyCustomRegistry, + prompts: custom_prompt_prefixes +] + +children = [ + {Agens.Supervisor, name: Agens.Supervisor, opts: opts} +] + +Supervisor.start_link(children, strategy: :one_for_one) +``` + +The following default prompt prefixes can be copied, customized and used for the `prompts` option above: + +```elixir +%{ + prompt: + {"Agent", + "You are a specialized agent with the following capabilities and expertise"}, + identity: + {"Identity", + "You are a specialized agent with the following capabilities and expertise"}, + context: {"Context", "The purpose or goal behind your tasks are to"}, + constraints: + {"Constraints", "You must operate with the following constraints or limitations"}, + examples: + {"Examples", + "You should consider the following examples before returning results"}, + reflection: + {"Reflection", + "You should reflect on the following factors before returning results"}, + instructions: + {"Tool Instructions", + "You should provide structured output for function calling based on the following instructions"}, + objective: {"Step Objective", "The objective of this step is to"}, + description: + {"Job Description", "This is part of multi-step job to achieve the following"}, + input: + {"Input", + "The following is the actual input from the user, system or another agent"} +} +``` + +See the [Prompting](#prompting) section below or `Agens.Message` for more information on prompt prefixes. + +You can also see `Agens.Supervisor` for more information on configuration options. + ## Prompting Agens provides a variety of different ways to customize the final prompt sent to the language model (LM) or Serving. A natural language string can be assigned to the entity's specialized field (see below), while `nil` values will omit that field from the final prompt. This approach allows for precise control over the prompt’s content. -All fields with values, in addition to user input, will be included in the final prompt !!!!using the [in-context learning]() method!!!!. The goal should be to balance detailed prompts with efficient token usage by focusing on relevant fields and using concise language. This approach will yield the best results with minimal token usage, keeping costs low and performance high. +All fields with values, in addition to user input, will be included in the final prompt. The goal should be to balance detailed prompts with efficient token usage by focusing on relevant fields and using concise language. This approach will yield the best results with minimal token usage, keeping costs low and performance high. ### User/Agent The `input` value is the only required field for building prompts. This value can be the initial value provided to `Agens.Job.run/2`, or the final result of a previous step (`Agens.Job.Step`). Both the `input` and `result` are stored in `Agens.Message`, which can also be used to send messages directly to `Agens.Agent` or `Agens.Serving` without being part of an `Agens.Job`. diff --git a/config/config.exs b/config/config.exs deleted file mode 100644 index d7ffecc..0000000 --- a/config/config.exs +++ /dev/null @@ -1,22 +0,0 @@ -import Config - -config :agens, - registry: Agens.Registry, - prompts: %{ - prompt: - {"Agent", "You are a specialized agent with the following capabilities and expertise"}, - identity: - {"Identity", "You are a specialized agent with the following capabilities and expertise"}, - context: {"Context", "The purpose or goal behind your tasks are to"}, - constraints: - {"Constraints", "You must operate with the following constraints or limitations"}, - examples: {"Examples", "You should consider the following examples before returning results"}, - reflection: - {"Reflection", "You should reflect on the following factors before returning results"}, - instructions: - {"Tool Instructions", - "You should provide structured output for function calling based on the following instructions"}, - objective: {"Step Objective", "The objective of this step is to"}, - description: {"Job Description", "This is part of multi-step job to achieve the following"}, - input: {"Input", "The following is the actual input from the user, system or another agent"} - } diff --git a/lib/agens.ex b/lib/agens.ex index 299dfeb..5581286 100644 --- a/lib/agens.ex +++ b/lib/agens.ex @@ -21,157 +21,31 @@ defmodule Agens do - `Agens.Serving` - used to interact with language models - `Agens.Agent` - used to interact with servings in a specialized manner - `Agens.Job` - used to define multi-agent workflows + - `Agens.Message` - used to facilitate communication between agents, jobs, and servings """ - defmodule Message do - @moduledoc """ - The Message struct defines the details of a message passed between Agents, Jobs and Servings. - - ## Fields - - * `:parent_pid` - The process identifier of the parent/caller process. - * `:input` - The input string for the message. - * `:prompt` - The prompt string or `Agens.Agent.Prompt` struct for the message. - * `:result` - The result string for the message. - * `:agent_name` - The name of the `Agens.Agent`. - * `:serving_name` - The name of the `Agens.Serving`. - * `:job_name` - The name of the `Agens.Job`. - * `:job_description` - The description of the `Agens.Job` to be added to the LM prompt. - * `:step_index` - The index of the `Agens.Job.Step`. - * `:step_objective` - The objective of the `Agens.Job.Step` to be added to the LM prompt. - """ - - @type t :: %__MODULE__{ - parent_pid: pid() | nil, - input: String.t() | nil, - prompt: String.t() | Agens.Agent.Prompt.t() | nil, - result: String.t() | nil, - agent_name: atom() | nil, - serving_name: atom() | nil, - job_name: atom() | nil, - job_description: String.t() | nil, - step_index: non_neg_integer() | nil, - step_objective: String.t() | nil - } - - @enforce_keys [] - defstruct [ - :parent_pid, - :input, - :prompt, - :result, - :agent_name, - :serving_name, - :job_name, - :job_description, - :step_index, - :step_objective - ] - - alias Agens.{Agent, Serving} - - @registry Application.compile_env(:agens, :registry) - @fields Application.compile_env(:agens, :prompts) - - @doc """ - Sends an `Agens.Message` to an `Agens.Agent` - """ - @spec send(__MODULE__.t()) :: __MODULE__.t() | {:error, :agent_not_running} - def send(%__MODULE__{} = message) do - case Registry.lookup(@registry, message.agent_name) do - [{_, {agent_pid, config}}] when is_pid(agent_pid) -> - base = build_prompt(config, message) - prompt = "[INST]#{base}[/INST]" - - result = - message - |> Map.put(:serving_name, config.serving) - |> Map.put(:prompt, prompt) - |> Serving.run() - - message = Map.put(message, :result, result) - maybe_use_tool(message, config.tool) - - [] -> - {:error, :agent_not_running} - end - end - - @spec build_prompt(Agent.Config.t(), t()) :: String.t() - defp build_prompt(%Agent.Config{prompt: prompt, tool: tool}, %__MODULE__{} = message) do - %{ - objective: message.step_objective, - description: message.job_description - } - |> maybe_add_prompt(prompt) - |> maybe_add_tool(tool) - |> maybe_prep_input(message.input, tool) - |> Enum.reject(&filter_empty/1) - |> Enum.map(&field/1) - |> Enum.map(&to_prompt/1) - |> Enum.join("\n\n") - end - - defp filter_empty({_, value}), do: value == "" or is_nil(value) - - defp field({key, value}) do - {Map.get(@fields, key), value} - end - - defp to_prompt({{heading, detail}, value}) do - """ - ## #{heading} - #{detail}: #{value} - """ - end - - defp maybe_add_prompt(map, %Agent.Prompt{} = prompt), - do: prompt |> Map.from_struct() |> Map.merge(map) - - defp maybe_add_prompt(map, prompt) when is_binary(prompt), do: Map.put(map, :prompt, prompt) - defp maybe_add_prompt(map, _prompt), do: map - - defp maybe_add_tool(map, nil), do: map - defp maybe_add_tool(map, tool), do: Map.put(map, :instructions, tool.instructions()) - - defp maybe_prep_input(map, input, nil), do: Map.put(map, :input, input) - defp maybe_prep_input(map, input, tool), do: Map.put(map, :input, tool.pre(input)) - - @spec maybe_use_tool(__MODULE__.t(), module() | nil) :: __MODULE__.t() - defp maybe_use_tool(message, nil), do: message - - defp maybe_use_tool(%__MODULE__{} = message, tool) do - send( - message.parent_pid, - {:tool_started, {message.job_name, message.step_index}, message.result} - ) - - raw = - message.result - |> tool.to_args() - |> tool.execute() - - send(message.parent_pid, {:tool_raw, {message.job_name, message.step_index}, raw}) - - result = tool.post(raw) - - send(message.parent_pid, {:tool_result, {message.job_name, message.step_index}, result}) + use DynamicSupervisor - Map.put(message, :result, result) - end + @doc false + @spec start_link(keyword()) :: Supervisor.on_start() + def start_link(args) do + opts = Keyword.fetch!(args, :opts) + DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) end - use DynamicSupervisor - @doc false - @spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()} - def start_link(_) do - DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__) + @impl true + @spec init(keyword()) :: {:ok, DynamicSupervisor.sup_flags()} + def init(opts) do + DynamicSupervisor.init(strategy: :one_for_one, extra_arguments: [opts]) end @doc false - @spec init(any()) :: {:ok, any()} - def init(:ok) do - DynamicSupervisor.init(strategy: :one_for_one) + @spec name_to_pid(atom(), {:error, term()}, (pid() -> any())) :: any() + def name_to_pid(name, err, cb) do + case Process.whereis(name) do + nil -> err + pid when is_pid(pid) -> cb.(pid) + end end end diff --git a/lib/agens/agent.ex b/lib/agens/agent.ex index 6750aab..e4726b7 100644 --- a/lib/agens/agent.ex +++ b/lib/agens/agent.ex @@ -1,12 +1,12 @@ defmodule Agens.Agent do @moduledoc """ - The Agent module provides functions for starting, stopping and running Agents. + The Agent module provides functions for starting and stopping Agents. `Agens.Agent` is the the primary entity powering `Agens`. It uses `Agens.Serving` to interact with language models through `Nx.Serving`, or with language model APIs through a `GenServer`. Agents can have detailed identities to further refine LM outputs, and are used together in multi-agent workflows via the `Agens.Job` module. - Agent capabilities can be expanded even further with `Agens.Tool` modules, which are designed to handle LM functional calling. In future releases, Agents will also have access to RAG generations via knowledge base features. + Agent capabilities can be expanded even further with `Agens.Tool` modules, which are designed to handle LM functional calling. """ defmodule Prompt do @@ -41,7 +41,7 @@ defmodule Agens.Agent do ## Fields - `:name` - The name of the Agent process. - - `:serving` - The serving module or `Nx.Serving` struct for the Agent. + - `:serving` - The name of the Serving specified in `Agens.Serving.Config`. - `:knowledge` - The knowledge base or data source of the Agent. Default is nil. (Coming soon) - `:prompt` - The string or `Agens.Agent.Prompt` struct defining the Agent. Default is nil. - `:tool` - The module implementing the `Agens.Tool` behaviour for the Agent. Default is nil. @@ -49,7 +49,7 @@ defmodule Agens.Agent do @type t :: %__MODULE__{ name: atom(), - serving: module() | Nx.Serving.t(), + serving: atom(), knowledge: module() | nil, prompt: Agens.Agent.Prompt.t() | String.t() | nil, tool: module() | nil @@ -59,9 +59,23 @@ defmodule Agens.Agent do defstruct [:name, :serving, :knowledge, :prompt, :tool] end + defmodule State do + @moduledoc false + + @type t :: %__MODULE__{ + registry: atom(), + config: Agens.Agent.Config.t() + } + + @enforce_keys [:registry, :config] + defstruct [:registry, :config] + end + use GenServer - @registry Application.compile_env(:agens, :registry) + # =========================================================================== + # Public API + # =========================================================================== @doc """ Starts one or more `Agens.Agent` processes @@ -75,23 +89,7 @@ defmodule Agens.Agent do end def start(%Config{} = config) do - spec = %{ - id: config.name, - start: {__MODULE__, :start_link, [config]} - # type: :worker, - # restart: :transient - } - - Agens - |> DynamicSupervisor.start_child(spec) - |> case do - {:ok, pid} when is_pid(pid) -> - Registry.register(@registry, config.name, {pid, config}) - {:ok, pid} - - {:error, {:already_started, pid}} = err when is_pid(pid) -> - err - end + DynamicSupervisor.start_child(Agens, {__MODULE__, config}) end @doc """ @@ -99,28 +97,79 @@ defmodule Agens.Agent do """ @spec stop(atom()) :: :ok | {:error, :agent_not_found} def stop(agent_name) do - agent_name - |> Process.whereis() - |> case do - nil -> - {:error, :agent_not_found} - - pid -> - :ok = DynamicSupervisor.terminate_child(Agens, pid) - Registry.unregister(@registry, agent_name) - end + Agens.name_to_pid(agent_name, {:error, :agent_not_found}, fn pid -> + GenServer.call(pid, {:stop, agent_name}) + :ok = DynamicSupervisor.terminate_child(Agens, pid) + end) end + @doc """ + Retrieves the Agent configuration by Agent name or `pid`. + """ + @spec get_config(pid | atom) :: {:ok, Config.t()} | {:error, :agent_not_found} + def get_config(agent_name) when is_atom(agent_name) do + Agens.name_to_pid(agent_name, {:error, :agent_not_found}, fn pid -> {:ok, get_config(pid)} end) + end + + def get_config(pid) when is_pid(pid) do + GenServer.call(pid, :get_config) + end + + # =========================================================================== + # Setup + # =========================================================================== + @doc false - @spec start_link(Config.t()) :: GenServer.on_start() - def start_link(config) do - GenServer.start_link(__MODULE__, config, name: config.name) + @spec child_spec(Config.t()) :: Supervisor.child_spec() + def child_spec(%Config{} = config) do + %{ + id: config.name, + start: {__MODULE__, :start_link, [config]}, + type: :worker, + restart: :transient + } + end + + @doc false + @spec start_link(keyword(), Config.t()) :: GenServer.on_start() + def start_link(extra, config) do + opts = Keyword.put(extra, :config, config) + GenServer.start_link(__MODULE__, opts, name: config.name) end @doc false - @spec init(Config.t()) :: {:ok, map()} @impl true - def init(_config) do - {:ok, %{}} + @spec init(keyword()) :: {:ok, State.t()} + def init(opts) do + registry = Keyword.fetch!(opts, :registry) + config = Keyword.fetch!(opts, :config) + state = %State{registry: registry, config: config} + + {:ok, _} = Registry.register(registry, config.name, {self(), config}) + + {:ok, state} end + + # =========================================================================== + # Callbacks + # =========================================================================== + + @doc false + @impl true + @spec handle_call({:stop, atom()}, {pid, term}, State.t()) :: {:reply, :ok, State.t()} + def handle_call({:stop, agent_name}, _from, state) do + Registry.unregister(state.registry, agent_name) + {:reply, :ok, state} + end + + @doc false + @impl true + @spec handle_call(:get_config, {pid, term}, State.t()) :: {:reply, Config.t(), State.t()} + def handle_call(:get_config, _from, state) do + {:reply, state.config, state} + end + + # =========================================================================== + # Private + # =========================================================================== end diff --git a/lib/agens/job.ex b/lib/agens/job.ex index e62651b..f493b74 100644 --- a/lib/agens/job.ex +++ b/lib/agens/job.ex @@ -104,17 +104,22 @@ defmodule Agens.Job do status: :init | :running | :error | :completed, step_index: non_neg_integer() | nil, config: Config.t(), - parent: pid() | nil + parent: pid() | nil, + registry: atom() } - @enforce_keys [:status, :config] - defstruct [:status, :step_index, :config, :parent] + @enforce_keys [:status, :config, :registry] + defstruct [:status, :step_index, :config, :parent, :registry] end use GenServer alias Agens.Message + # =========================================================================== + # Public API + # =========================================================================== + @doc """ Starts a new Job process using the provided `Agens.Job.Config`. @@ -122,17 +127,19 @@ defmodule Agens.Job do """ @spec start(Config.t()) :: {:ok, pid} | {:error, term} def start(config) do - spec = child_spec(config) + DynamicSupervisor.start_child(Agens, {__MODULE__, config}) + end - Agens - |> DynamicSupervisor.start_child(spec) - |> case do - {:ok, pid} = result when is_pid(pid) -> - result + @doc """ + Retrieves the Job configuration by Job name or `pid`. + """ + @spec get_config(pid | atom) :: {:ok, Config.t()} | {:error, :job_not_found} + def get_config(job_name) when is_atom(job_name) do + Agens.name_to_pid(job_name, {:error, :job_not_found}, fn pid -> get_config(pid) end) + end - {:error, {:already_started, pid}} = error when is_pid(pid) -> - error - end + def get_config(pid) when is_pid(pid) do + {:ok, GenServer.call(pid, :get_config)} end @doc """ @@ -140,50 +147,22 @@ defmodule Agens.Job do A supervised process for the Job must be started first using `start/1`. """ - @spec run(pid | atom, term) :: {:ok, term} | {:error, :job_not_found} - def run(name, input) when is_atom(name) do - name - |> Process.whereis() - |> case do - nil -> - {:error, :job_not_found} - - pid when is_pid(pid) -> - run(pid, input) - end + @spec run(pid | atom, String.t()) :: {:ok, term} | {:error, :job_not_found} + def run(job_name, input) when is_atom(job_name) do + Agens.name_to_pid(job_name, {:error, :job_not_found}, fn pid -> run(pid, input) end) end def run(pid, input) when is_pid(pid) do GenServer.call(pid, {:run, input}) end - @doc """ - Retrieves the Job configuration by Job name or `pid`. - """ - @spec get_config(pid | atom) :: {:ok, term} | {:error, :job_not_found} - def get_config(name) when is_atom(name) do - name - |> Process.whereis() - |> case do - nil -> - {:error, :job_not_found} - - pid when is_pid(pid) -> - get_config(pid) - end - end - - def get_config(pid) when is_pid(pid) do - GenServer.call(pid, :get_config) - end - - @doc false - def start_link(config) do - GenServer.start_link(__MODULE__, config, name: config.name) - end + # =========================================================================== + # Setup + # =========================================================================== @doc false - def child_spec(config) do + @spec child_spec(Config.t()) :: Supervisor.child_spec() + def child_spec(%Config{} = config) do %{ id: config.name, start: {__MODULE__, :start_link, [config]}, @@ -192,17 +171,26 @@ defmodule Agens.Job do } end + @doc false + @spec start_link(keyword(), Config.t()) :: GenServer.on_start() + def start_link(extra, config) do + opts = Keyword.put(extra, :config, config) + GenServer.start_link(__MODULE__, opts, name: config.name) + end + @doc false @impl true - @spec init(Config.t()) :: - {:ok, State.t()} - | {:ok, State.t(), timeout() | :hibernate | {:continue, continue_arg :: term()}} - | :ignore - | {:stop, reason :: any()} - def init(config) do - {:ok, %State{status: :init, config: config}} + @spec init(keyword()) :: {:ok, State.t()} + def init(opts) do + registry = Keyword.fetch!(opts, :registry) + config = Keyword.fetch!(opts, :config) + {:ok, %State{status: :init, config: config, registry: registry}} end + # =========================================================================== + # Callbacks + # =========================================================================== + @doc false @impl true @spec handle_call(:get_config, {pid, term}, State.t()) :: {:reply, Config.t(), State.t()} @@ -259,17 +247,21 @@ defmodule Agens.Job do @doc false @impl true - @spec terminate(:complete | {:error, term}, State.t()) :: :ok + @spec terminate(:complete | {term(), list()}, State.t()) :: :ok def terminate(:complete, %State{config: %{name: name}} = state) do send(state.parent, {:job_ended, name, :complete}) :ok end - def terminate({error, _}, %State{config: %{name: name}} = state) do - send(state.parent, {:job_ended, name, {:error, error}}) + def terminate({exception, _}, %State{config: %{name: name}} = state) do + send(state.parent, {:job_ended, name, {:error, exception}}) :ok end + # =========================================================================== + # Private + # =========================================================================== + @doc false @spec do_step(String.t(), State.t()) :: :ok defp do_step(input, %State{config: job_config} = state) do diff --git a/lib/agens/message.ex b/lib/agens/message.ex new file mode 100644 index 0000000..0eb7689 --- /dev/null +++ b/lib/agens/message.ex @@ -0,0 +1,176 @@ +defmodule Agens.Message do + @moduledoc """ + The Message struct defines the details of a message passed between Agents, Jobs and Servings. + + ## Fields + + * `:parent_pid` - The process identifier of the parent/caller process. + * `:input` - The input string for the message. + * `:prompt` - The final prompt string constructed for `Agens.Serving.run/1`. + * `:result` - The result string for the message. + * `:agent_name` - The name of the `Agens.Agent`. + * `:serving_name` - The name of the `Agens.Serving`. + * `:job_name` - The name of the `Agens.Job`. + * `:job_description` - The description of the `Agens.Job` to be added to the LM prompt. + * `:step_index` - The index of the `Agens.Job.Step`. + * `:step_objective` - The objective of the `Agens.Job.Step` to be added to the LM prompt. + """ + + @type t :: %__MODULE__{ + parent_pid: pid() | nil, + input: String.t() | nil, + prompt: String.t() | Agens.Agent.Prompt.t() | nil, + result: String.t() | nil, + agent_name: atom() | nil, + serving_name: atom() | nil, + job_name: atom() | nil, + job_description: String.t() | nil, + step_index: non_neg_integer() | nil, + step_objective: String.t() | nil + } + + @enforce_keys [] + defstruct [ + :parent_pid, + :input, + :prompt, + :result, + :agent_name, + :serving_name, + :job_name, + :job_description, + :step_index, + :step_objective + ] + + alias Agens.{Agent, Serving} + + @doc """ + Sends an `Agens.Message` to an `Agens.Agent` or `Agens.Serving`. + """ + @spec send(t()) :: t() | {:error, atom()} + def send(%__MODULE__{agent_name: nil, serving_name: nil}) do + {:error, :no_agent_or_serving_name} + end + + def send(%__MODULE__{} = message) do + with {:ok, agent_config} <- maybe_get_agent_config(message.agent_name), + {:ok, serving_config} <- get_serving_config(agent_config, message) do + base = build_prompt(agent_config, message, serving_config.prompts) + prompt = "[INST]#{base}[/INST]" + + message = + message + |> Map.put(:prompt, prompt) + |> Map.put(:serving_name, serving_config.name) + + result = Serving.run(message) + + message = Map.put(message, :result, result) + tool = if agent_config, do: agent_config.tool, else: nil + maybe_use_tool(message, tool) + else + {:error, reason} -> {:error, reason} + end + end + + @doc false + @spec build_prompt(Agent.Config.t() | nil, t(), map()) :: String.t() + defp build_prompt(agent_config, %__MODULE__{} = message, prompts) do + %{ + objective: message.step_objective, + description: message.job_description + } + |> maybe_add_prompt(agent_config) + |> maybe_add_tool(agent_config) + |> maybe_prep_input(message.input, agent_config) + |> Enum.reject(&filter_empty/1) + |> Enum.map(fn {key, value} -> field({key, value}, prompts) end) + |> Enum.map(&to_prompt/1) + |> Enum.join("\n\n") + end + + @doc false + @spec filter_empty({atom(), String.t()}) :: boolean() + defp filter_empty({_, value}), do: value == "" or is_nil(value) + + @doc false + @spec field({atom(), String.t()}, map()) :: {String.t(), String.t()} + defp field({key, value}, prompts) do + {Map.get(prompts, key), value} + end + + @doc false + @spec to_prompt({{String.t(), String.t()}, String.t()}) :: String.t() + defp to_prompt({{heading, detail}, value}) do + """ + ## #{heading} + #{detail}: #{value} + """ + end + + @doc false + @spec maybe_add_prompt(map(), Agent.Config.t() | nil) :: map() + defp maybe_add_prompt(map, %Agent.Config{prompt: %Agent.Prompt{} = prompt}), + do: prompt |> Map.from_struct() |> Map.merge(map) + + defp maybe_add_prompt(map, %Agent.Config{prompt: prompt}) when is_binary(prompt), + do: Map.put(map, :prompt, prompt) + + defp maybe_add_prompt(map, _), do: map + + @doc false + @spec maybe_add_tool(map(), Agent.Config.t() | nil) :: map() + defp maybe_add_tool(map, %Agent.Config{tool: tool}) when not is_nil(tool), + do: Map.put(map, :instructions, tool.instructions()) + + defp maybe_add_tool(map, _), do: map + + @doc false + @spec maybe_prep_input(map(), String.t(), Agent.Config.t() | nil) :: map() + defp maybe_prep_input(map, input, %Agent.Config{tool: tool}) when not is_nil(tool), + do: Map.put(map, :input, tool.pre(input)) + + defp maybe_prep_input(map, input, _), do: Map.put(map, :input, input) + + @doc false + @spec maybe_use_tool(t(), module() | nil) :: t() + defp maybe_use_tool(message, nil), do: message + + defp maybe_use_tool(%__MODULE__{} = message, tool) do + send( + message.parent_pid, + {:tool_started, {message.job_name, message.step_index}, message.result} + ) + + raw = + message.result + |> tool.to_args() + |> tool.execute() + + send(message.parent_pid, {:tool_raw, {message.job_name, message.step_index}, raw}) + + result = tool.post(raw) + + send(message.parent_pid, {:tool_result, {message.job_name, message.step_index}, result}) + + Map.put(message, :result, result) + end + + @doc false + @spec get_serving_config(Agent.Config.t() | nil, t()) :: + {:ok, Serving.Config.t()} | {:error, atom()} + defp get_serving_config(nil, %__MODULE__{serving_name: serving_name}) + when is_atom(serving_name), + do: Serving.get_config(serving_name) + + defp get_serving_config(%Agent.Config{serving: serving_name}, _) when is_atom(serving_name), + do: Serving.get_config(serving_name) + + @doc false + @spec maybe_get_agent_config(atom() | nil) :: {:ok, Agent.Config.t() | nil} + defp maybe_get_agent_config(nil), do: {:ok, nil} + + defp maybe_get_agent_config(agent_name) when is_atom(agent_name), + do: Agent.get_config(agent_name) +end diff --git a/lib/agens/serving.ex b/lib/agens/serving.ex index d3ca871..6b18c1f 100644 --- a/lib/agens/serving.ex +++ b/lib/agens/serving.ex @@ -18,77 +18,175 @@ defmodule Agens.Serving do ## Fields - `:name` - The name of the `Agens.Serving` process. - `:serving` - The `Nx.Serving` struct or `GenServer` module for the `Agens.Serving`. + - `:prompts` - A map of custom prompt prefixes. If `nil`, default prompt prefixes will be used instead. Default prompt prefixes can also be overridden by using the `prompts` options in `Agens.Supervisor`. """ @type t :: %__MODULE__{ name: atom(), - serving: Nx.Serving.t() | module() + serving: Nx.Serving.t() | module(), + prompts: map() | nil } @enforce_keys [:name, :serving] - defstruct [:name, :serving] + defstruct [:name, :serving, :prompts] end + defmodule State do + @moduledoc false + + @type t :: %__MODULE__{ + registry: atom(), + config: Config.t() + } + + @enforce_keys [:registry, :config] + defstruct [:registry, :config] + end + + use GenServer + alias Agens.Message - @registry Application.compile_env(:agens, :registry) + @suffix "Supervisor" + @parent "Wrapper" + + # =========================================================================== + # Public API + # =========================================================================== @doc """ Starts an `Agens.Serving` process """ - @spec start(Config.t()) :: {:ok, pid()} + @spec start(Config.t()) :: {:ok, pid()} | {:error, term} def start(%Config{} = config) do - spec = %{ - id: config.name, - start: start_function(config) - } - - Agens - |> DynamicSupervisor.start_child(spec) - |> case do - {:ok, pid} when is_pid(pid) -> - Registry.register(@registry, config.name, {pid, config}) - {:ok, pid} - - {:error, {:already_started, pid}} = error when is_pid(pid) -> - error - end + DynamicSupervisor.start_child(Agens, {__MODULE__, config}) end @doc """ Stops an `Agens.Serving` process """ @spec stop(atom()) :: :ok | {:error, :serving_not_found} - def stop(serving_name) do - serving_name - |> Module.concat("Supervisor") - |> Process.whereis() - |> case do - nil -> - {:error, :serving_not_found} + def stop(name) when is_atom(name) do + name + |> parent_name() + |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> + GenServer.call(pid, {:stop, name}) + :ok = DynamicSupervisor.terminate_child(Agens, pid) + end) + end - pid -> - :ok = DynamicSupervisor.terminate_child(Agens, pid) - Registry.unregister(@registry, serving_name) - end + @doc """ + Retrieves the Serving configuration by Serving name or `pid`. + """ + @spec get_config(atom() | pid()) :: {:ok, Config.t()} | {:error, :serving_not_found} + def get_config(name) when is_atom(name) do + name + |> parent_name() + |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> get_config(pid) end) + end + + def get_config(pid) when is_pid(pid) do + {:ok, GenServer.call(pid, :get_config)} end @doc """ Executes an `Agens.Message` against an `Agens.Serving` """ - @spec run(Message.t()) :: String.t() | {:error, :serving_not_running} - def run(%Message{} = message) do - case Registry.lookup(@registry, message.serving_name) do - [{_, {serving_pid, config}}] when is_pid(serving_pid) -> - do_run({serving_pid, config}, message) - - [] -> - {:error, :serving_not_running} + @spec run(Message.t()) :: String.t() | {:error, :serving_not_found} + def run(%Message{serving_name: name} = message) when is_atom(name) do + name + |> parent_name() + |> Agens.name_to_pid({:error, :serving_not_found}, fn pid -> + GenServer.call(pid, {:run, message}) + end) + end + + # =========================================================================== + # Setup + # =========================================================================== + + @doc false + @spec child_spec(Config.t()) :: Supervisor.child_spec() + def child_spec(%Config{} = config) do + name = parent_name(config.name) + + %{ + id: name, + start: {__MODULE__, :start_link, [config]}, + type: :worker, + restart: :transient + } + end + + @doc false + @spec start_link(keyword(), Config.t()) :: GenServer.on_start() + def start_link(extra, config) do + name = parent_name(config.name) + opts = Keyword.put(extra, :config, config) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @doc false + @impl true + @spec init(keyword()) :: {:ok, State.t()} | {:stop, term(), State.t()} + def init(opts) do + registry = Keyword.fetch!(opts, :registry) + prompts = Keyword.fetch!(opts, :prompts) + config = Keyword.fetch!(opts, :config) + config = if is_nil(config.prompts), do: Map.put(config, :prompts, prompts), else: config + state = %State{config: config, registry: registry} + {m, f, a} = start_function(config) + + m + |> apply(f, a) + |> case do + {:ok, pid} when is_pid(pid) -> + name = serving_name(config.name) + {:ok, _} = Registry.register(registry, name, {pid, config}) + + {:ok, state} + + {:error, reason} -> + {:stop, reason, state} end end - @spec do_run({pid(), Config.t()}, Message.t()) :: String.t() - defp do_run({_, %Config{serving: %Nx.Serving{}}}, %Message{} = message) do + # =========================================================================== + # Callbacks + # =========================================================================== + + @doc false + @impl true + @spec handle_call({:stop, atom()}, {pid, term}, State.t()) :: {:reply, :ok, State.t()} + def handle_call({:stop, serving_name}, _from, state) do + serving_name = serving_name(serving_name) + Registry.unregister(state.registry, serving_name) + {:reply, :ok, state} + end + + @doc false + @impl true + @spec handle_call(:get_config, {pid, term}, State.t()) :: {:reply, Config.t(), State.t()} + def handle_call(:get_config, _from, state) do + {:reply, state.config, state} + end + + @doc false + @impl true + @spec handle_call({:run, Message.t()}, {pid, term}, State.t()) :: + {:reply, String.t(), State.t()} + def handle_call({:run, %Message{} = message}, _, state) do + result = do_run(state.config, message) + {:reply, result, state} + end + + # =========================================================================== + # Private + # =========================================================================== + + @doc false + @spec do_run(Config.t(), Message.t()) :: String.t() + defp do_run(%Config{serving: %Nx.Serving{}}, %Message{} = message) do message.serving_name |> Nx.Serving.batched_run(message.prompt) |> case do @@ -97,19 +195,31 @@ defmodule Agens.Serving do end end - defp do_run({serving_pid, _}, %Message{} = message) do - # GenServer.call(serving_name, {:run, message}) - GenServer.call(serving_pid, {:run, message}) + defp do_run(_, %Message{} = message) do + serving_name = serving_name(message.serving_name) + # need to get pid? + GenServer.call(serving_name, {:run, message}) end + @doc false @spec start_function(Config.t()) :: tuple() defp start_function(%Config{serving: %Nx.Serving{} = serving} = config) do {Nx.Serving, :start_link, [[serving: serving, name: config.name]]} end + @doc false + @spec start_function(Config.t()) :: tuple() # Module.concat with "Supervisor" for Nx.Serving parity defp start_function(%Config{serving: serving} = config) when is_atom(serving) do - name = Module.concat(config.name, "Supervisor") + name = serving_name(config.name) {serving, :start_link, [[name: name, config: config]]} end + + @doc false + @spec serving_name(atom) :: atom + defp serving_name(name) when is_atom(name), do: Module.concat(name, @suffix) + + @doc false + @spec parent_name(atom) :: atom + defp parent_name(name) when is_atom(name), do: Module.concat(name, @parent) end diff --git a/lib/agens/supervisor.ex b/lib/agens/supervisor.ex index f132b43..a5d34ca 100644 --- a/lib/agens/supervisor.ex +++ b/lib/agens/supervisor.ex @@ -4,12 +4,6 @@ defmodule Agens.Supervisor do `Agens.Supervisor` starts a `DynamicSupervisor` for managing `Agens.Agent`, `Agens.Serving`, and `Agens.Job` processes. It also starts a `Registry` for keeping track of these processes. - The Registry module can be overriden by your application config: - - ``` - config :agens, registry: MyApp.Registry - ``` - In order to use `Agens` simply add `Agens.Supervisor` to your application supervision tree: ``` @@ -20,28 +14,58 @@ defmodule Agens.Supervisor do strategy: :one_for_one ) ``` + + ### Options + * `:registry` (`atom`) - The default registry can be overriden with this option. Default is `Agens.Registry`. + * `:prompts` (`map`) - The default prompt prefixes can be overriden with this option. Each `Agens.Serving.Config` can also override the defaults on a per-serving basis. + + See the [README.md](README.md#configuration) for more info. """ use Supervisor - @registry Application.compile_env(:agens, :registry) + @default_registry Agens.Registry + @default_prompts %{ + prompt: + {"Agent", "You are a specialized agent with the following capabilities and expertise"}, + identity: + {"Identity", "You are a specialized agent with the following capabilities and expertise"}, + context: {"Context", "The purpose or goal behind your tasks are to"}, + constraints: + {"Constraints", "You must operate with the following constraints or limitations"}, + examples: {"Examples", "You should consider the following examples before returning results"}, + reflection: + {"Reflection", "You should reflect on the following factors before returning results"}, + instructions: + {"Tool Instructions", + "You should provide structured output for function calling based on the following instructions"}, + objective: {"Step Objective", "The objective of this step is to"}, + description: {"Job Description", "This is part of multi-step job to achieve the following"}, + input: {"Input", "The following is the actual input from the user, system or another agent"} + } + @default_opts [registry: @default_registry, prompts: @default_prompts] @doc false - @spec start_link(any()) :: Supervisor.on_start() - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + @spec start_link(keyword()) :: Supervisor.on_start() + def start_link(args) do + override = Keyword.get(args, :opts, []) + opts = Keyword.merge(@default_opts, override) + + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) end @doc false @impl true - @spec init(any()) :: + @spec init(keyword()) :: {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec() | (old_erlang_child_spec :: :supervisor.child_spec())]}} | :ignore - def init(_init_arg) do + def init(opts) do + registry = Keyword.fetch!(opts, :registry) + children = [ - {Agens, name: Agens}, - {Registry, keys: :unique, name: @registry} + {Agens, name: Agens, opts: opts}, + {Registry, keys: :unique, name: registry} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/mix.exs b/mix.exs index d200faa..dfea68c 100644 --- a/mix.exs +++ b/mix.exs @@ -76,7 +76,7 @@ defmodule Agens.MixProject do defp docs do [ main: "Agens", - extras: [{"README.md", [title: "Agens"]}, "LICENSE"], + extras: [{"README.md", [title: "Agens"]}, {"CHANGELOG.md", [title: "Changelog"]}, "LICENSE"], source_url: "https://github.com/jessedrelick/agens", groups_for_modules: [ Agent: [ diff --git a/test/agens/agent_test.exs b/test/agens/agent_test.exs index 286ef81..34639ba 100644 --- a/test/agens/agent_test.exs +++ b/test/agens/agent_test.exs @@ -61,7 +61,7 @@ defmodule Agens.AgentTest do assert Agent.stop(:test_stop_agent) == :ok result = Message.send(message) - assert result == {:error, :agent_not_running} + assert result == {:error, :agent_not_found} end end @@ -114,7 +114,7 @@ defmodule Agens.AgentTest do } result = Message.send(message) - assert result == {:error, :agent_not_running} + assert result == {:error, :agent_not_found} end end diff --git a/test/agens/job_test.exs b/test/agens/job_test.exs index 4360b19..fa659d5 100644 --- a/test/agens/job_test.exs +++ b/test/agens/job_test.exs @@ -82,8 +82,8 @@ defmodule Agens.JobTest do {:ok, pid} = Job.start(job) assert is_pid(pid) - assert job == Job.get_config(pid) - assert job == Job.get_config(:job_config) + assert {:ok, job} == Job.get_config(pid) + assert {:ok, job} == Job.get_config(:job_config) assert {:error, :job_not_found} == Job.get_config(:missing_job) end end diff --git a/test/agens/message_test.exs b/test/agens/message_test.exs new file mode 100644 index 0000000..08c7f75 --- /dev/null +++ b/test/agens/message_test.exs @@ -0,0 +1,42 @@ +defmodule Agens.MessageTest do + use Test.Support.AgentCase, async: false + + alias Agens.Message + + defp start_agens(_ctx) do + {:ok, _pid} = start_supervised({Agens.Supervisor, name: Agens.Supervisor}) + :ok + end + + defp start_serving(_ctx) do + %Agens.Serving.Config{ + name: :text_generation, + serving: Test.Support.Serving.Stub + } + |> Agens.Serving.start() + + :ok + end + + describe "no agent or serving" do + test "returns error" do + assert {:error, :no_agent_or_serving_name} == Message.send(%Message{}) + end + end + + describe "no agent" do + setup [:start_agens, :start_serving] + + test "works with explicit serving" do + serving_name = :text_generation + + assert %Message{ + input: "test", + serving_name: serving_name, + result: "sent 'test' to: ", + prompt: + "[INST]## Input\nThe following is the actual input from the user, system or another agent: test\n[/INST]" + } == Message.send(%Message{serving_name: serving_name, input: "test"}) + end + end +end diff --git a/test/agens/serving_test.exs b/test/agens/serving_test.exs index cae6ce6..0bcf107 100644 --- a/test/agens/serving_test.exs +++ b/test/agens/serving_test.exs @@ -4,7 +4,8 @@ defmodule Agens.ServingTest do alias Agens.{Message, Serving} defp start_agens(_ctx) do - {:ok, _pid} = start_supervised({Agens.Supervisor, name: Agens.Supervisor}) + opts = [registry: Agens.CustomRegistry] + {:ok, _pid} = start_supervised({Agens.Supervisor, name: Agens.Supervisor, opts: opts}) :ok end @@ -51,7 +52,7 @@ defmodule Agens.ServingTest do end test "not running" do - assert {:error, :serving_not_running} == + assert {:error, :serving_not_found} == Serving.run(%Message{serving_name: :serving_missing, input: "input"}) end end @@ -69,6 +70,21 @@ defmodule Agens.ServingTest do end end + describe "config" do + setup [:start_agens, :start_serving] + + test "get" do + serving_name = :serving_test + {:ok, %Serving.Config{name: name, serving: serving}} = Serving.get_config(serving_name) + assert name == serving_name + assert serving == Test.Support.Serving.Stub + end + + test "not found" do + assert {:error, :serving_not_found} == Serving.get_config(:serving_missing) + end + end + describe "nx" do setup :start_agens