From a1e025e599c95bcfcb34d8640c1cde9c8bacb062 Mon Sep 17 00:00:00 2001 From: Arpandeep Khatua Date: Tue, 12 Nov 2024 08:27:15 -0800 Subject: [PATCH 01/15] Adding OpenHands node --- .../group_discussion_agents.py | 2 +- .../openhands_node/openhands_node.toml | 11 + sotopia/experimental/__init__.py | 2 +- .../{agents.py => agents/base_agent.py} | 68 ++++- sotopia/experimental/nodes/openhands_node.py | 245 ++++++++++++++++++ 5 files changed, 320 insertions(+), 8 deletions(-) create mode 100644 examples/experimental/openhands_node/openhands_node.toml rename sotopia/experimental/{agents.py => agents/base_agent.py} (53%) create mode 100644 sotopia/experimental/nodes/openhands_node.py diff --git a/examples/experimental/group_discussion_agents/group_discussion_agents.py b/examples/experimental/group_discussion_agents/group_discussion_agents.py index e4b3c0c2c..8ef55e5c5 100644 --- a/examples/experimental/group_discussion_agents/group_discussion_agents.py +++ b/examples/experimental/group_discussion_agents/group_discussion_agents.py @@ -2,7 +2,7 @@ from aact import Message, NodeFactory from aact.messages import Text, Tick, DataModel, DataModelFactory from sotopia.agents.llm_agent import ainput -from sotopia.experimental.agents import BaseAgent +from sotopia.experimental.agents.base_agent import BaseAgent from sotopia.generation_utils import agenerate from sotopia.generation_utils.generate import StrOutputParser diff --git a/examples/experimental/openhands_node/openhands_node.toml b/examples/experimental/openhands_node/openhands_node.toml new file mode 100644 index 000000000..38ec6ab2e --- /dev/null +++ b/examples/experimental/openhands_node/openhands_node.toml @@ -0,0 +1,11 @@ +redis_url = "redis://localhost:6379/0" +extra_modules = ["sotopia.experimental.nodes.openhands_node"] + + +[[nodes]] +node_name = "runtime" +node_class = "openhands" + +[nodes.node_args] +output_channels = ['Runtime:Agent'] +input_channels = ['Agent:Runtime'] \ No newline at end of file diff --git a/sotopia/experimental/__init__.py b/sotopia/experimental/__init__.py index 21175f409..2f2166320 100644 --- a/sotopia/experimental/__init__.py +++ b/sotopia/experimental/__init__.py @@ -1,3 +1,3 @@ -from .agents import BaseAgent +from .agents.base_agent import BaseAgent __all__ = ["BaseAgent"] diff --git a/sotopia/experimental/agents.py b/sotopia/experimental/agents/base_agent.py similarity index 53% rename from sotopia/experimental/agents.py rename to sotopia/experimental/agents/base_agent.py index 209fc22d8..770a544c7 100644 --- a/sotopia/experimental/agents.py +++ b/sotopia/experimental/agents/base_agent.py @@ -1,19 +1,75 @@ import asyncio import sys +from enum import Enum +from pydantic import Field + if sys.version_info < (3, 11): from typing_extensions import Self else: from typing import Self -from typing import Any, AsyncIterator, TypeVar -from aact import Node -from aact.messages import DataModel, Message +from typing import Any, AsyncIterator, TypeVar, Optional + +from aact import Message, Node +from aact.messages import DataModel +from aact.messages.registry import DataModelFactory T_agent_observation = TypeVar("T_agent_observation", bound=DataModel) T_agent_action = TypeVar("T_agent_action", bound=DataModel) +class ActionType(Enum): + NONE = "none" + SPEAK = "speak" + NON_VERBAL = "non-verbal" + LEAVE = "leave" + THOUGHT = "thought" + BROWSE = "browse" + BROWSE_ACTION = "browse_action" + READ = "read" + WRITE = "write" + RUN = "run" + + def __str__(self) -> str: + return self.value + + def __eq__(self, other: object) -> bool: + if isinstance(other, ActionType): + return self.value == other.value + elif isinstance(other, str): + return self.value == other + else: + return NotImplemented + + +@DataModelFactory.register("agent_action") +class AgentAction(DataModel): + agent_name: str = Field(description="the name of the agent") + action_type: ActionType = Field( + description="whether to speak at this turn or choose to not do anything" + ) + argument: str = Field( + description="the utterance if choose to speak, the expression or gesture if choose non-verbal communication, or the physical action if choose action" + ) + path: Optional[str] = Field(description="path of file") + + def to_natural_language(self) -> str: + action_descriptions = { + ActionType.NONE: "did nothing", + ActionType.SPEAK: f'said: "{self.argument}"', + ActionType.THOUGHT: f'thought: "{self.argument}"', + ActionType.BROWSE: f'browsed: "{self.argument}"', + ActionType.RUN: f'ran: "{self.argument}"', + ActionType.READ: f'read: "{self.argument}"', + ActionType.WRITE: f'wrote: "{self.argument}"', + ActionType.NON_VERBAL: f"[{self.action_type.value}] {self.argument}", + ActionType.LEAVE: "left the conversation", + } + + return action_descriptions.get(self.action_type, "performed an unknown action") + + class BaseAgent(Node[T_agent_observation, T_agent_action]): def __init__( self, @@ -51,13 +107,13 @@ async def event_handler( await self.observation_queue.put(message.data) else: raise ValueError(f"Invalid channel: {channel}") - yield "", self.output_type() # unreachable code + yield "", self.output_type() async def send(self, action: T_agent_action) -> None: for output_channel, output_channel_type in self.output_channel_types.items(): await self.r.publish( output_channel, - Message[output_channel_type](data=action).model_dump_json(), # type: ignore[valid-type] + Message[output_channel_type](data=action).model_dump_json(), ) async def _task_scheduler(self) -> None: @@ -66,4 +122,4 @@ async def _task_scheduler(self) -> None: action_or_none = await self.aact(observation) if action_or_none is not None: await self.send(action_or_none) - self.observation_queue.task_done() + self.observation_queue.task_done() \ No newline at end of file diff --git a/sotopia/experimental/nodes/openhands_node.py b/sotopia/experimental/nodes/openhands_node.py new file mode 100644 index 000000000..eea27032b --- /dev/null +++ b/sotopia/experimental/nodes/openhands_node.py @@ -0,0 +1,245 @@ +import asyncio +import logging +import time +import os +import sys +from typing import Any, AsyncIterator, Optional + +from rich.logging import RichHandler + +from aact import Message, NodeFactory, Node +from aact.messages import Text, DataModel +from aact.messages.commons import DataEntry + +from sotopia.experimental.agents.base_agent import AgentAction, ActionType + +from openhands.core.config import AgentConfig, AppConfig, SandboxConfig +from openhands.core.logger import openhands_logger as logger +from openhands.core.main import create_runtime +from openhands.events.action import ( + BrowseURLAction, + CmdRunAction, + FileWriteAction, + FileReadAction, + BrowseInteractiveAction, +) +from openhands.runtime.base import Runtime +from openhands.utils.async_utils import call_async_from_sync + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + +BASE_CONTAINER_IMAGE = "docker.all-hands.dev/all-hands-ai/runtime:0.11-nikolaik" + +# Configuration for logging +FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" +logging.basicConfig( + level=logging.WARNING, + format=FORMAT, + datefmt="[%X]", + handlers=[RichHandler()], +) + + +@NodeFactory.register("openhands") +class OpenHands(Node[DataModel, Text]): + def __init__( + self, + input_channels: list[str], + output_channels: list[str], + redis_url: str, + ): + super().__init__( + input_channel_types=[ + (input_channel, AgentAction) for input_channel in input_channels + ], + output_channel_types=[ + (output_channel, Text) for output_channel in output_channels + ], + redis_url=redis_url, + ) + self.queue: asyncio.Queue[DataEntry[DataModel]] = asyncio.Queue() + self.task: asyncio.Task[None] | None = None + self.runtime: Optional[Runtime] = None + + async def init_runtime(self) -> None: + """ + Initializes the runtime environment with the specified configuration. + """ + start_time = time.time() + modal_api_token_id = os.environ.get("MODAL_API_TOKEN_ID", "") + modal_api_token_secret = os.environ.get("MODAL_API_TOKEN_SECRET", "") + allhands_api_key = os.environ.get("ALLHANDS_API_KEY", None) + sandbox_remote_runtime_api_url = os.environ.get( + "SANDBOX_REMOTE_RUNTIME_API_URL", "" + ) + + if not modal_api_token_id or not modal_api_token_secret: + logger.warning("Modal API tokens are not set. Check environment variables.") + + config = AppConfig( + default_agent="CodeActAgent", + run_as_openhands=False, + max_iterations=3, + runtime="modal", + modal_api_token_id=modal_api_token_id, + modal_api_token_secret=modal_api_token_secret, + sandbox=SandboxConfig( + base_container_image=BASE_CONTAINER_IMAGE, + enable_auto_lint=True, + use_host_network=False, + timeout=50, + platform="linux/amd64", + api_key=allhands_api_key, + remote_runtime_api_url=sandbox_remote_runtime_api_url, + keep_remote_runtime_alive=False, + ), + workspace_base=None, + workspace_mount_path=None, + ) + + agent_config = AgentConfig( + codeact_enable_jupyter=True, + codeact_enable_browsing=True, + codeact_enable_llm_editor=True, + ) + config.set_agent_config(agent_config) + + self.runtime = create_runtime(config) + if self.runtime: + call_async_from_sync(self.runtime.connect) + logger.info("-" * 20) + logger.info("RUNTIME CONNECTED") + logger.info("-" * 20) + else: + logger.error("Failed to initialize runtime.") + end_time = time.time() # End timing + elapsed_time = end_time - start_time + logger.info(f"Runtime initialization took {elapsed_time:.2f} seconds.") + + async def __aenter__(self) -> Self: + self.runtime_init_task = asyncio.create_task(self.init_runtime()) + self.task = asyncio.create_task(self.run_action()) + return await super().__aenter__() + + async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + if self.runtime: + self.runtime.close() + return await super().__aexit__(exc_type, exc_value, traceback) + + async def aact(self, action: AgentAction) -> Optional[Text]: + """ + Executes an action based on the observation and returns the result as Text. + + Args: + observation (AgentAction): The action to be executed. + + Returns: + Optional[Text]: The result of the action, or None if the runtime is not available. + """ + if not self.runtime: + logger.warning("Runtime is not initialized.") + return None + + try: + action_obj = self._create_action(action) + logger.info(f"Executing action: {action}", extra={"msg_type": "ACTION"}) + obs = self.runtime.run_action(action_obj) + logger.info( + f"Received observation: {str(obs).splitlines()[:2]}", + extra={"msg_type": "OBSERVATION"}, + ) + return Text(text=str(obs)) + except Exception as e: + logger.error(f"Error executing action: {e}") + return None + + def _create_action(self, observation: AgentAction) -> Any: + """ + Creates an action based on the observation's action type. + + Args: + observation (AgentAction): The observation containing the action type and arguments. + + Returns: + Any: The created action. + """ + action_type = observation.action_type + argument = observation.argument + path = observation.path + + if action_type == ActionType.BROWSE: + return BrowseURLAction(url=argument) + elif action_type == ActionType.BROWSE_ACTION: + return BrowseInteractiveAction(browser_actions=argument) + elif action_type == ActionType.RUN: + return CmdRunAction(command=argument) + elif action_type == ActionType.WRITE: + if path is None: + raise ValueError("Path cannot be None for WRITE action") + return FileWriteAction(path=path, content=argument) + elif action_type == ActionType.READ: + if path is None: + raise ValueError("Path cannot be None for READ action") + return FileReadAction(path=path) + else: + raise ValueError(f"Unsupported action type: {action_type}") + + async def send(self, action: Text) -> None: + """ + Sends the action to all output channels. + + Args: + action (Text): The action to be sent. + """ + try: + for output_channel, _ in self.output_channel_types.items(): + message = Message[Text](data=action).model_dump_json() + await self.r.publish(output_channel, message) + except Exception as e: + logger.error(f"Error sending action: {e}") + + async def run_action(self) -> None: + """ + Continuously processes actions from the queue. + """ + while self.task: + try: + data_entry = await self.queue.get() + if isinstance(data_entry.data, AgentAction): + obs = await self.aact(data_entry.data) + if obs is not None: + await self.send(obs) + else: + logger.error("Data is not of type AgentAction") + self.queue.task_done() + except Exception as e: + logger.error(f"Error processing action: {e}") + + async def event_handler( + self, input_channel: str, input_message: Message[DataModel] + ) -> AsyncIterator[tuple[str, Message[Text]]]: + """ + Handles incoming events and adds them to the processing queue. + + Args: + input_channel (str): The channel from which the message was received. + input_message (Message[DataModel]): The incoming message. + + Yields: + Tuple[str, Message[Zero]]: A tuple containing the channel and a zero message if the channel is not recognized. + """ + try: + if input_channel in self.input_channel_types: + # Create a DataEntry instance with the correct type + data_entry: DataEntry[DataModel] = DataEntry( + channel=input_channel, data=input_message.data + ) + await self.queue.put(data_entry) + else: + logger.warning(f"Unrecognized input channel: {input_channel}") + yield input_channel, Message[Text](data=Text(text="")) + except Exception as e: + logger.error(f"Error handling event: {e}") \ No newline at end of file From ae72e44e6d784f39b8a2b20c97162331372b3a52 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:29:31 +0000 Subject: [PATCH 02/15] [autofix.ci] apply automated fixes --- examples/experimental/openhands_node/openhands_node.toml | 2 +- sotopia/experimental/agents/base_agent.py | 4 ++-- sotopia/experimental/nodes/openhands_node.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/experimental/openhands_node/openhands_node.toml b/examples/experimental/openhands_node/openhands_node.toml index 38ec6ab2e..a3987fea5 100644 --- a/examples/experimental/openhands_node/openhands_node.toml +++ b/examples/experimental/openhands_node/openhands_node.toml @@ -8,4 +8,4 @@ node_class = "openhands" [nodes.node_args] output_channels = ['Runtime:Agent'] -input_channels = ['Agent:Runtime'] \ No newline at end of file +input_channels = ['Agent:Runtime'] diff --git a/sotopia/experimental/agents/base_agent.py b/sotopia/experimental/agents/base_agent.py index 770a544c7..dca34e5e0 100644 --- a/sotopia/experimental/agents/base_agent.py +++ b/sotopia/experimental/agents/base_agent.py @@ -107,7 +107,7 @@ async def event_handler( await self.observation_queue.put(message.data) else: raise ValueError(f"Invalid channel: {channel}") - yield "", self.output_type() + yield "", self.output_type() async def send(self, action: T_agent_action) -> None: for output_channel, output_channel_type in self.output_channel_types.items(): @@ -122,4 +122,4 @@ async def _task_scheduler(self) -> None: action_or_none = await self.aact(observation) if action_or_none is not None: await self.send(action_or_none) - self.observation_queue.task_done() \ No newline at end of file + self.observation_queue.task_done() diff --git a/sotopia/experimental/nodes/openhands_node.py b/sotopia/experimental/nodes/openhands_node.py index eea27032b..59671d7b0 100644 --- a/sotopia/experimental/nodes/openhands_node.py +++ b/sotopia/experimental/nodes/openhands_node.py @@ -242,4 +242,4 @@ async def event_handler( logger.warning(f"Unrecognized input channel: {input_channel}") yield input_channel, Message[Text](data=Text(text="")) except Exception as e: - logger.error(f"Error handling event: {e}") \ No newline at end of file + logger.error(f"Error handling event: {e}") From dfeaa82708df100731d6ace324bab9cf2887c3ef Mon Sep 17 00:00:00 2001 From: Arpandeep Khatua Date: Tue, 12 Nov 2024 09:22:34 -0800 Subject: [PATCH 03/15] Added LLM Agent Node --- .../experimental/llm_agent/llm_agent.toml | 45 ++ sotopia/experimental/agents/llm_agent.py | 390 ++++++++++++++++++ 2 files changed, 435 insertions(+) create mode 100644 examples/experimental/llm_agent/llm_agent.toml create mode 100644 sotopia/experimental/agents/llm_agent.py diff --git a/examples/experimental/llm_agent/llm_agent.toml b/examples/experimental/llm_agent/llm_agent.toml new file mode 100644 index 000000000..a204bccd0 --- /dev/null +++ b/examples/experimental/llm_agent/llm_agent.toml @@ -0,0 +1,45 @@ +redis_url = "redis://localhost:6379/0" +extra_modules = ["sotopia.experimental.agents.llm_agent"] + + +[[nodes]] +node_name = "Jack" +node_class = "llm_agent" + +[nodes.node_args] +query_interval = 5 +output_channel = "Jack:Jane" +input_text_channels = ["Jane:Jack", "Runtime:Agent"] +input_env_channels = [] +input_tick_channel = "tick/secs/1" +goal = "Your goal is to effectively test Jane's technical ability and finally decide if she has passed the interview. Make sure to also evaluate her communication skills, problem-solving approach, and enthusiasm." +model_name = "gpt-4o-mini" +agent_name = "Jack" + +[[nodes]] +node_name = "Jane" +node_class = "llm_agent" + +[nodes.node_args] +query_interval = 7 +output_channel = "Jane:Jack" +input_text_channels = ["Jack:Jane", "Runtime:Agent"] +input_env_channels = [] +input_tick_channel = "tick/secs/1" +goal = "Your goal is to do well in the interview by demonstrating your technical skills, clear communication, and enthusiasm for the position. Stay calm, ask clarifying questions when needed, and confidently explain your thought process." +model_name = "gpt-4o-mini" +agent_name = "Jane" + +[[nodes]] +node_name = "tick" +node_class = "tick" + + +[[nodes]] +node_name = "print" +node_class = "print" + +[nodes.node_args.print_channel_types] +"tick/secs/1" = "tick" +"Jane:Jack" = "agent_action" +"Jack:Jane" = "agent_action" \ No newline at end of file diff --git a/sotopia/experimental/agents/llm_agent.py b/sotopia/experimental/agents/llm_agent.py new file mode 100644 index 000000000..8cb1fc428 --- /dev/null +++ b/sotopia/experimental/agents/llm_agent.py @@ -0,0 +1,390 @@ +import logging +import sys + +from rich.logging import RichHandler + +from aact import Message, NodeFactory +from aact.messages import Text, Tick +from sotopia.experimental.agents.base_agent import BaseAgent, AgentAction, ActionType + +from sotopia.generation_utils import agenerate +from sotopia.generation_utils.generate import StrOutputParser + +import json +import traceback + +# Check Python version +if sys.version_info >= (3, 11): + pass +else: + pass + +# Configure logging +FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" +logging.basicConfig( + level=logging.WARNING, + format=FORMAT, + datefmt="[%X]", + handlers=[RichHandler()], +) + + +@NodeFactory.register("llm_agent") +class LLMAgent(BaseAgent[AgentAction | Tick | Text, AgentAction]): + def __init__( + self, + input_text_channels: list[str], + input_tick_channel: str, + input_env_channels: list[str], + output_channel: str, + query_interval: int, + agent_name: str, + goal: str, + model_name: str, + redis_url: str, + ): + super().__init__( + [ + (input_text_channel, AgentAction) + for input_text_channel in input_text_channels + ] + + [ + (input_tick_channel, Tick), + ] + + [(input_env_channel, Text) for input_env_channel in input_env_channels], + [(output_channel, AgentAction)], + redis_url, + ) + self.output_channel = output_channel + self.query_interval = query_interval + self.count_ticks = 0 + self.message_history: list[tuple[str, str, str]] = [] + self.name = agent_name + self.model_name = model_name + self.goal = goal + + async def send(self, message: AgentAction) -> None: + if message.action_type == "speak": + await self.r.publish( + self.output_channel, + Message[AgentAction](data=message).model_dump_json(), + ) + + elif message.action_type in ("browse", "browse_action", "write", "read", "run"): + await self.r.publish( + "Agent:Runtime", + Message[AgentAction](data=message).model_dump_json(), + ) + + def _format_message_history( + self, message_history: list[tuple[str, str, str]] + ) -> str: + ## TODO: akhatua Fix the mapping of action to be gramatically correct + return "\n".join( + (f"{speaker} {action} {message}") + for speaker, action, message in message_history + ) + + def get_action_template(self, selected_actions: list[ActionType]) -> str: + """ + Returns the action template string with selected actions. + + Args: + selected_actions (list[ActionType]): List of ActionType enum members to include in the template. + + Returns: + str: The action template with the selected actions. + """ + base_template = """ You are talking to another agent. + You are {agent_name}.\n + {message_history}\nand you plan to {goal}. + ## Action + What is your next thought or action? Your response must be in JSON format. + + It must be an object, and it must contain two fields: + * `action`, which is one of the actions below + * `args`, which is a map of key-value pairs, specifying the arguments for that action + """ + + action_descriptions = { + str( + ActionType.SPEAK + ): """`speak` - you can talk to the other agents to share information or ask them something. Arguments: + * `content` - the message to send to the other agents (should be short)""", + str( + ActionType.THOUGHT + ): """`thought` - only use this rarely to make a plan, set a goal, record your thoughts. Arguments: + * `content` - the message you send yourself to organize your thoughts (should be short). You cannot think more than 2 turns.""", + str( + ActionType.NONE + ): """`none` - you can choose not to take an action if you are waiting for some data""", + str( + ActionType.NON_VERBAL + ): """`non-verbal` - you can choose to do a non verbal action + * `content` - the non veral action you want to send to other agents. eg: smile, shrug, thumbs up""", + str(ActionType.BROWSE): """`browse` - opens a web page. Arguments: + * `url` - the URL to open, when you browse the web you must use `none` action until you get some information back. When you get the information back you must summarize the article and explain the article to the other agents.""", + str( + ActionType.BROWSE_ACTION + ): """`browse_action` - actions you can take on a web browser + * `command` - the command to run. You have 15 available commands. These commands must be a single string value of command + Options for `command`: + `command` = goto(url: str) + Description: Navigate to a url. + Examples: + goto('http://www.example.com') + + `command` = go_back() + Description: Navigate to the previous page in history. + Examples: + go_back() + + `command` = go_forward() + Description: Navigate to the next page in history. + Examples: + go_forward() + + `command` = noop(wait_ms: float = 1000) + Description: Do nothing, and optionally wait for the given time (in milliseconds). + You can use this to get the current page content and/or wait for the page to load. + Examples: + noop() + noop(500) + + `command` = scroll(delta_x: float, delta_y: float) + Description: Scroll horizontally and vertically. Amounts in pixels, positive for right or down scrolling, negative for left or up scrolling. Dispatches a wheel event. + Examples: + scroll(0, 200) + scroll(-50.2, -100.5) + + `command` = fill(bid, value) + Description: Fill out a form field. It focuses the element and triggers an input event with the entered text. It works for ,