From 4e2ce705aaf1b8ade97116f27a6fa476f4fd3c74 Mon Sep 17 00:00:00 2001 From: Kye Date: Sat, 11 Nov 2023 17:36:37 -0500 Subject: [PATCH] CLEANUP: swarms.agents, removed unused files Former-commit-id: 07bcd22eef84de5d9157910366aee4547d68d509 --- code_quality.sh | 2 +- playground/models/revgpt.py | 29 - swarms/__init__.py | 12 +- swarms/agents/__init__.py | 5 - swarms/agents/agent.py | 433 ---- swarms/agents/aot.py | 276 --- swarms/agents/companion.py | 4 - swarms/agents/hf_agents.py | 599 ----- swarms/agents/idea_to_image_agent.py | 110 - swarms/agents/meta_prompter.py | 158 -- swarms/agents/multi_modal_visual_agent.py | 2189 ----------------- .../neural_architecture_search_worker.py | 12 - swarms/agents/operations_agent.py | 0 swarms/agents/refiner_agent.py | 9 - swarms/agents/simple_agent.py | 37 - swarms/chunkers/omni_chunker.py | 1 - swarms/models/__init__.py | 14 +- swarms/models/base.py | 2 +- swarms/models/dalle3.py | 2 +- swarms/models/distilled_whisperx.py | 2 +- swarms/models/fastvit.py | 1 - swarms/models/kosmos2.py | 1 - swarms/models/openai_chat.py | 1 - swarms/models/timm.py | 2 +- swarms/models/trocr.py | 3 - swarms/prompts/__init__.py | 11 + swarms/prompts/autobloggen.py | 4 +- swarms/prompts/chat_prompt.py | 3 +- swarms/structs/flow.py | 3 +- swarms/swarms/autobloggen.py | 2 - swarms/swarms/dialogue_simulator.py | 1 - swarms/swarms/multi_agent_debate.py | 1 - swarms/utils/__init__.py | 7 + swarms/workers/__init__.py | 1 - tests/models/cohere.py | 147 +- 35 files changed, 108 insertions(+), 3976 deletions(-) delete mode 100644 playground/models/revgpt.py delete mode 100644 swarms/agents/agent.py delete mode 100644 swarms/agents/aot.py delete mode 100644 swarms/agents/companion.py delete mode 100644 swarms/agents/hf_agents.py delete mode 100644 swarms/agents/idea_to_image_agent.py delete mode 100644 swarms/agents/meta_prompter.py delete mode 100644 swarms/agents/multi_modal_visual_agent.py delete mode 100644 swarms/agents/neural_architecture_search_worker.py delete mode 100644 swarms/agents/operations_agent.py delete mode 100644 swarms/agents/refiner_agent.py delete mode 100644 swarms/agents/simple_agent.py diff --git a/code_quality.sh b/code_quality.sh index ee7be7d16..6ffc2f50a 100755 --- a/code_quality.sh +++ b/code_quality.sh @@ -13,7 +13,7 @@ black --experimental-string-processing swarms/ # Run ruff on the 'swarms' directory. # Add any additional flags if needed according to your version of ruff. -ruff swarms/ +x #--unsafe_fix # YAPF diff --git a/playground/models/revgpt.py b/playground/models/revgpt.py deleted file mode 100644 index cd5bd2d67..000000000 --- a/playground/models/revgpt.py +++ /dev/null @@ -1,29 +0,0 @@ -import os -import sys -from dotenv import load_dotenv -from swarms.models.revgptV4 import RevChatGPTModelv4 -from swarms.models.revgptV1 import RevChatGPTModelv1 - -root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) -sys.path.append(root_dir) - -load_dotenv() - -config = { - "model": os.getenv("REVGPT_MODEL"), - "plugin_ids": [os.getenv("REVGPT_PLUGIN_IDS")], - "disable_history": os.getenv("REVGPT_DISABLE_HISTORY") == "True", - "PUID": os.getenv("REVGPT_PUID"), - "unverified_plugin_domains": [os.getenv("REVGPT_UNVERIFIED_PLUGIN_DOMAINS")], -} - -# For v1 model -model = RevChatGPTModelv1(access_token=os.getenv("ACCESS_TOKEN"), **config) -# model = RevChatGPTModelv4(access_token=os.getenv("ACCESS_TOKEN"), **config) - -# For v3 model -# model = RevChatGPTModel(access_token=os.getenv("OPENAI_API_KEY"), **config) - -task = "Write a cli snake game" -response = model.run(task) -print(response) diff --git a/swarms/__init__.py b/swarms/__init__.py index f45f876f3..49aa5f531 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -6,9 +6,9 @@ # disable tensorflow warnings os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" -from swarms.agents import * -from swarms.swarms import * -from swarms.structs import * -from swarms.models import * -from swarms.chunkers import * -from swarms.workers import * +from swarms.agents import * # noqa: E402, F403 +from swarms.swarms import * # noqa: E402, F403 +from swarms.structs import * # noqa: E402, F403 +from swarms.models import * # noqa: E402, F403 +from swarms.chunkers import * # noqa: E402, F403 +from swarms.workers import * # noqa: E402, F403 diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index a5b0a9c48..455b95ee6 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -1,5 +1,4 @@ # from swarms.agents.omni_modal_agent import OmniModalAgent -from swarms.agents.hf_agents import HFAgent from swarms.agents.message import Message # from swarms.agents.stream_response import stream @@ -7,16 +6,12 @@ from swarms.agents.registry import Registry # from swarms.agents.idea_to_image_agent import Idea2Image -from swarms.agents.simple_agent import SimpleAgent """Agent Infrastructure, models, memory, utils, tools""" __all__ = [ # "OmniModalAgent", - "HFAgent", "Message", "AbstractAgent", "Registry", - # "Idea2Image", - "SimpleAgent", ] diff --git a/swarms/agents/agent.py b/swarms/agents/agent.py deleted file mode 100644 index bad9d3bb4..000000000 --- a/swarms/agents/agent.py +++ /dev/null @@ -1,433 +0,0 @@ -from __future__ import annotations - -import json -import time -from typing import Any, Callable, List, Optional - -from langchain.chains.llm import LLMChain -from langchain.chat_models.base import BaseChatModel -from langchain.memory import ChatMessageHistory -from langchain.prompts.chat import ( - BaseChatPromptTemplate, -) -from langchain.schema import ( - BaseChatMessageHistory, - Document, -) -from langchain.schema.messages import ( - AIMessage, - BaseMessage, - HumanMessage, - SystemMessage, -) -from langchain.schema.vectorstore import VectorStoreRetriever -from langchain.tools.base import BaseTool -from langchain.tools.human.tool import HumanInputRun -from langchain_experimental.autonomous_agents.autogpt.output_parser import ( - AutoGPTOutputParser, - BaseAutoGPTOutputParser, -) -from langchain_experimental.autonomous_agents.autogpt.prompt import AutoGPTPrompt -from langchain_experimental.autonomous_agents.autogpt.prompt_generator import ( - FINISH_NAME, - get_prompt, -) -from langchain_experimental.pydantic_v1 import BaseModel, ValidationError - -# PROMPT -FINISH_NAME = "finish" - - -# This class has a metaclass conflict: both `BaseChatPromptTemplate` and `BaseModel` -# define a metaclass to use, and the two metaclasses attempt to define -# the same functions but in mutually-incompatible ways. -# It isn't clear how to resolve this, and this code predates mypy -# beginning to perform that check. -# -# Mypy errors: -# ``` -# Definition of "__private_attributes__" in base class "BaseModel" is -# incompatible with definition in base class "BaseModel" [misc] -# Definition of "__repr_name__" in base class "Representation" is -# incompatible with definition in base class "BaseModel" [misc] -# Definition of "__pretty__" in base class "Representation" is -# incompatible with definition in base class "BaseModel" [misc] -# Definition of "__repr_str__" in base class "Representation" is -# incompatible with definition in base class "BaseModel" [misc] -# Definition of "__rich_repr__" in base class "Representation" is -# incompatible with definition in base class "BaseModel" [misc] -# Metaclass conflict: the metaclass of a derived class must be -# a (non-strict) subclass of the metaclasses of all its bases [misc] -# ``` -# -# TODO: look into refactoring this class in a way that avoids the mypy type errors -class AutoGPTPrompt(BaseChatPromptTemplate, BaseModel): # type: ignore[misc] - """Prompt for AutoGPT.""" - - ai_name: str - ai_role: str - tools: List[BaseTool] - token_counter: Callable[[str], int] - send_token_limit: int = 4196 - - def construct_full_prompt(self, goals: List[str]) -> str: - prompt_start = ( - "Your decisions must always be made independently " - "without seeking user assistance.\n" - "Play to your strengths as an LLM and pursue simple " - "strategies with no legal complications.\n" - "If you have completed all your tasks, make sure to " - 'use the "finish" command.' - ) - # Construct full prompt - full_prompt = ( - f"You are {self.ai_name}, {self.ai_role}\n{prompt_start}\n\nGOALS:\n\n" - ) - for i, goal in enumerate(goals): - full_prompt += f"{i+1}. {goal}\n" - - full_prompt += f"\n\n{get_prompt(self.tools)}" - return full_prompt - - def format_messages(self, **kwargs: Any) -> List[BaseMessage]: - base_prompt = SystemMessage(content=self.construct_full_prompt(kwargs["goals"])) - time_prompt = SystemMessage( - content=f"The current time and date is {time.strftime('%c')}" - ) - used_tokens = self.token_counter(base_prompt.content) + self.token_counter( - time_prompt.content - ) - memory: VectorStoreRetriever = kwargs["memory"] - previous_messages = kwargs["messages"] - relevant_docs = memory.get_relevant_documents(str(previous_messages[-10:])) - relevant_memory = [d.page_content for d in relevant_docs] - relevant_memory_tokens = sum( - [self.token_counter(doc) for doc in relevant_memory] - ) - while used_tokens + relevant_memory_tokens > 2500: - relevant_memory = relevant_memory[:-1] - relevant_memory_tokens = sum( - [self.token_counter(doc) for doc in relevant_memory] - ) - content_format = ( - f"This reminds you of these events from your past:\n{relevant_memory}\n\n" - ) - memory_message = SystemMessage(content=content_format) - used_tokens += self.token_counter(memory_message.content) - historical_messages: List[BaseMessage] = [] - for message in previous_messages[-10:][::-1]: - message_tokens = self.token_counter(message.content) - if used_tokens + message_tokens > self.send_token_limit - 1000: - break - historical_messages = [message] + historical_messages - used_tokens += message_tokens - input_message = HumanMessage(content=kwargs["user_input"]) - messages: List[BaseMessage] = [base_prompt, time_prompt, memory_message] - messages += historical_messages - messages.append(input_message) - return messages - - -class PromptGenerator: - """A class for generating custom prompt strings. - - Does this based on constraints, commands, resources, and performance evaluations. - """ - - def __init__(self) -> None: - """Initialize the PromptGenerator object. - - Starts with empty lists of constraints, commands, resources, - and performance evaluations. - """ - self.constraints: List[str] = [] - self.commands: List[BaseTool] = [] - self.resources: List[str] = [] - self.performance_evaluation: List[str] = [] - self.response_format = { - "thoughts": { - "text": "thought", - "reasoning": "reasoning", - "plan": "- short bulleted\n- list that conveys\n- long-term plan", - "criticism": "constructive self-criticism", - "speak": "thoughts summary to say to user", - }, - "command": {"name": "command name", "args": {"arg name": "value"}}, - } - - def add_constraint(self, constraint: str) -> None: - """ - Add a constraint to the constraints list. - - Args: - constraint (str): The constraint to be added. - """ - self.constraints.append(constraint) - - def add_tool(self, tool: BaseTool) -> None: - self.commands.append(tool) - - def _generate_command_string(self, tool: BaseTool) -> str: - output = f"{tool.name}: {tool.description}" - output += f", args json schema: {json.dumps(tool.args)}" - return output - - def add_resource(self, resource: str) -> None: - """ - Add a resource to the resources list. - - Args: - resource (str): The resource to be added. - """ - self.resources.append(resource) - - def add_performance_evaluation(self, evaluation: str) -> None: - """ - Add a performance evaluation item to the performance_evaluation list. - - Args: - evaluation (str): The evaluation item to be added. - """ - self.performance_evaluation.append(evaluation) - - def _generate_numbered_list(self, items: list, item_type: str = "list") -> str: - """ - Generate a numbered list from given items based on the item_type. - - Args: - items (list): A list of items to be numbered. - item_type (str, optional): The type of items in the list. - Defaults to 'list'. - - Returns: - str: The formatted numbered list. - """ - if item_type == "command": - command_strings = [ - f"{i + 1}. {self._generate_command_string(item)}" - for i, item in enumerate(items) - ] - finish_description = ( - "use this to signal that you have finished all your objectives" - ) - finish_args = ( - '"response": "final response to let ' - 'people know you have finished your objectives"' - ) - finish_string = ( - f"{len(items) + 1}. {FINISH_NAME}: " - f"{finish_description}, args: {finish_args}" - ) - return "\n".join(command_strings + [finish_string]) - else: - return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items)) - - def generate_prompt_string(self) -> str: - """Generate a prompt string. - - Returns: - str: The generated prompt string. - """ - formatted_response_format = json.dumps(self.response_format, indent=4) - prompt_string = ( - f"Constraints:\n{self._generate_numbered_list(self.constraints)}\n\n" - "Commands:\n" - f"{self._generate_numbered_list(self.commands, item_type='command')}\n\n" - f"Resources:\n{self._generate_numbered_list(self.resources)}\n\n" - "Performance Evaluation:\n" - f"{self._generate_numbered_list(self.performance_evaluation)}\n\n" - "You should only respond in JSON format as described below " - f"\nResponse Format: \n{formatted_response_format} " - "\nEnsure the response can be parsed by Python json.loads" - ) - - return prompt_string - - -def get_prompt(tools: List[BaseTool]) -> str: - """Generates a prompt string. - - It includes various constraints, commands, resources, and performance evaluations. - - Returns: - str: The generated prompt string. - """ - - # Initialize the PromptGenerator object - prompt_generator = PromptGenerator() - - # Add constraints to the PromptGenerator object - prompt_generator.add_constraint( - "~16000 word limit for short term memory. " - "Your short term memory is short, " - "so immediately save important information to files." - ) - prompt_generator.add_constraint( - "If you are unsure how you previously did something " - "or want to recall past events, " - "thinking about similar events will help you remember." - ) - prompt_generator.add_constraint("No user assistance") - prompt_generator.add_constraint( - 'Exclusively use the commands listed in double quotes e.g. "command name"' - ) - - # Add commands to the PromptGenerator object - for tool in tools: - prompt_generator.add_tool(tool) - - # Add resources to the PromptGenerator object - prompt_generator.add_resource( - "Internet access for searches and information gathering." - ) - prompt_generator.add_resource("Long Term memory management.") - prompt_generator.add_resource( - "GPT-3.5 powered Agents for delegation of simple tasks." - ) - prompt_generator.add_resource("File output.") - - # Add performance evaluations to the PromptGenerator object - prompt_generator.add_performance_evaluation( - "Continuously review and analyze your actions " - "to ensure you are performing to the best of your abilities." - ) - prompt_generator.add_performance_evaluation( - "Constructively self-criticize your big-picture behavior constantly." - ) - prompt_generator.add_performance_evaluation( - "Reflect on past decisions and strategies to refine your approach." - ) - prompt_generator.add_performance_evaluation( - "Every command has a cost, so be smart and efficient. " - "Aim to complete tasks in the least number of steps." - ) - - # Generate the prompt string - prompt_string = prompt_generator.generate_prompt_string() - - return prompt_string - - -class AutoGPT: - """ - AutoAgent: - - - Args: - - - - - """ - - def __init__( - self, - ai_name: str, - memory: VectorStoreRetriever, - chain: LLMChain, - output_parser: BaseAutoGPTOutputParser, - tools: List[BaseTool], - feedback_tool: Optional[HumanInputRun] = None, - chat_history_memory: Optional[BaseChatMessageHistory] = None, - ): - self.ai_name = ai_name - self.memory = memory - self.next_action_count = 0 - self.chain = chain - self.output_parser = output_parser - self.tools = tools - self.feedback_tool = feedback_tool - self.chat_history_memory = chat_history_memory or ChatMessageHistory() - - @classmethod - def from_llm_and_tools( - cls, - ai_name: str, - ai_role: str, - memory: VectorStoreRetriever, - tools: List[BaseTool], - llm: BaseChatModel, - human_in_the_loop: bool = False, - output_parser: Optional[BaseAutoGPTOutputParser] = None, - chat_history_memory: Optional[BaseChatMessageHistory] = None, - ) -> AutoGPT: - prompt = AutoGPTPrompt( - ai_name=ai_name, - ai_role=ai_role, - tools=tools, - input_variables=["memory", "messages", "goals", "user_input"], - token_counter=llm.get_num_tokens, - ) - human_feedback_tool = HumanInputRun() if human_in_the_loop else None - chain = LLMChain(llm=llm, prompt=prompt) - return cls( - ai_name, - memory, - chain, - output_parser or AutoGPTOutputParser(), - tools, - feedback_tool=human_feedback_tool, - chat_history_memory=chat_history_memory, - ) - - def run(self, goals: List[str]) -> str: - user_input = ( - "Determine which next command to use, " - "and respond using the format specified above:" - ) - # Interaction Loop - loop_count = 0 - while True: - # Discontinue if continuous limit is reached - loop_count += 1 - - # Send message to AI, get response - assistant_reply = self.chain.run( - goals=goals, - messages=self.chat_history_memory.messages, - memory=self.memory, - user_input=user_input, - ) - - # Print Assistant thoughts - print(assistant_reply) - self.chat_history_memory.add_message(HumanMessage(content=user_input)) - self.chat_history_memory.add_message(AIMessage(content=assistant_reply)) - - # Get command name and arguments - action = self.output_parser.parse(assistant_reply) - tools = {t.name: t for t in self.tools} - if action.name == FINISH_NAME: - return action.args["response"] - if action.name in tools: - tool = tools[action.name] - try: - observation = tool.run(action.args) - except ValidationError as e: - observation = ( - f"Validation Error in args: {str(e)}, args: {action.args}" - ) - except Exception as e: - observation = ( - f"Error: {str(e)}, {type(e).__name__}, args: {action.args}" - ) - result = f"Command {tool.name} returned: {observation}" - elif action.name == "ERROR": - result = f"Error: {action.args}. " - else: - result = ( - f"Unknown command '{action.name}'. " - "Please refer to the 'COMMANDS' list for available " - "commands and only respond in the specified JSON format." - ) - - memory_to_add = f"Assistant Reply: {assistant_reply} \nResult: {result} " - if self.feedback_tool is not None: - feedback = f"\n{self.feedback_tool.run('Input: ')}" - if feedback in {"q", "stop"}: - print("EXITING") - return "EXITING" - memory_to_add += feedback - - self.memory.add_documents([Document(page_content=memory_to_add)]) - self.chat_history_memory.add_message(SystemMessage(content=result)) diff --git a/swarms/agents/aot.py b/swarms/agents/aot.py deleted file mode 100644 index b36fb43c5..000000000 --- a/swarms/agents/aot.py +++ /dev/null @@ -1,276 +0,0 @@ -import logging -import os -import time - -import openai_model - -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -class OpenAI: - def __init__( - self, - api_key, - strategy="cot", - evaluation_strategy="value", - api_base="", - api_model="", - ): - if api_key == "" or api_key is None: - api_key = os.environ.get("OPENAI_API_KEY", "") - if api_key != "": - openai_model.api_key = api_key - else: - raise Exception("Please provide OpenAI API key") - - if api_base == "" or api_base is None: - api_base = os.environ.get( - "OPENAI_API_BASE", "" - ) # if not set, use the default base path of "https://api.openai.com/v1" - if api_base != "": - # e.g. https://api.openai.com/v1/ or your custom url - openai_model.api_base = api_base - print(f"Using custom api_base {api_base}") - - if api_model == "" or api_model is None: - api_model = os.environ.get("OPENAI_API_MODEL", "") - if api_model != "": - self.api_model = api_model - else: - self.api_model = "text-davinci-003" - print(f"Using api_model {self.api_model}") - - self.use_chat_api = "gpt" in self.api_model - self.strategy = strategy - self.evaluation_strategy = evaluation_strategy - - def run(self, prompt, max_tokens, temperature, k=1, stop=None): - while True: - try: - if self.use_chat_api: - messages = [{"role": "user", "content": prompt}] - response = openai_model.ChatCompletion.create( - model=self.api_model, - messages=messages, - max_tokens=max_tokens, - temperature=temperature, - ) - else: - response = openai_model.Completion.create( - engine=self.api_model, - prompt=prompt, - n=k, - max_tokens=max_tokens, - stop=stop, - temperature=temperature, - ) - with open("openai.logs", "a") as log_file: - log_file.write( - "\n" + "-----------" + "\n" + "Prompt : " + prompt + "\n" - ) - return response - except openai_model.error.RateLimitError as e: - sleep_duratoin = os.environ.get("OPENAI_RATE_TIMEOUT", 30) - print( - f"{str(e)}, sleep for {sleep_duratoin}s, set it by env" - " OPENAI_RATE_TIMEOUT" - ) - time.sleep(sleep_duratoin) - - def openai_choice2text_handler(self, choice): - if self.use_chat_api: - text = choice["message"]["content"] - else: - text = choice.text.strip() - return text - - def generate_text(self, prompt, k): - if self.use_chat_api: - thoughts = [] - for _ in range(k): - response = self.run(prompt, 400, 0.5, k) - text = self.openai_choice2text_handler(response.choices[0]) - thoughts += [text] - # print(f'thoughts: {thoughts}') - return thoughts - - else: - response = self.run(prompt, 300, 0.5, k) - thoughts = [ - self.openai_choice2text_handler(choice) for choice in response.choices - ] - return thoughts - - def generate_thoughts(self, state, k, initial_prompt, rejected_solutions=None): - if isinstance(state, str): - pass - else: - "\n".join(state) - print("New state generating thought:", state, "\n\n") - prompt = f""" - Accomplish the task below by decomposing it as many very explicit subtasks as possible, be very explicit and thorough denoted by - a search process, highlighted by markers ‘1’,..., ‘3’ as “first operations” guiding subtree exploration for the OBJECTIVE, - focus on the third subtree exploration. Produce prospective search steps (e.g., the subtree exploration ‘5. 11 + 1’) - and evaluates potential subsequent steps to either progress - towards a solution or retrace to another viable subtree then be very thorough - and think atomically then provide solutions for those subtasks, - then return the definitive end result and then summarize it - - - ########## OBJECTIVE - {initial_prompt} - ################### - """ - thoughts = self.generate_text(prompt, k) - # print(f"Generated thoughts: {thoughts}") - return thoughts - - def generate_solution(self, initial_prompt, state, rejected_solutions=None): - try: - if isinstance(state, list): - state_text = "\n".join(state) - else: - state_text = state - - prompt = f""" - Generate a series of solutions to comply with the user's instructions, - you must generate solutions on the basis of determining the most reliable solution in the shortest amount of time, - while taking rejected solutions into account and learning from them. - Considering the reasoning provided:\n\n - ###'{state_text}'\n\n### - Devise the best possible solution for the task: {initial_prompt}, Here are evaluated solutions that were rejected: - ###{rejected_solutions}###, - complete the {initial_prompt} without making the same mistakes you did with the evaluated rejected solutions. Be simple. Be direct. Provide intuitive solutions as soon as you think of them.""" - answer = self.generate_text(prompt, 1) - print(f"Generated Solution Summary {answer}") - return answer - except Exception as e: - logger.error(f"Error in generate_solutions: {e}") - return None - - def evaluate_states(self, states, initial_prompt): - if not states: - return {} - - if self.evaluation_strategy == "value": - state_values = {} - for state in states: - if isinstance(state, str): - state_text = state - else: - state_text = "\n".join(state) - print( - "We receive a state of type", - type(state), - "For state: ", - state, - "\n\n", - ) - prompt = f""" To achieve the following goal: '{initial_prompt}', pessimistically value the context of the past solutions and more importantly the latest generated solution you had AS A FLOAT BETWEEN 0 AND 1\n - Past solutions:\n\n - {state_text}\n - If the solutions is not making fast progress in achieving the goal, give it a lower score. - Evaluate all solutions AS A FLOAT BETWEEN 0 and 1:\n, DO NOT RETURN ANYTHING ELSE - """ - response = self.run(prompt, 10, 1) - try: - value_text = self.openai_choice2text_handler(response.choices[0]) - # print(f'state: {value_text}') - value = float(value_text) - print(f"Evaluated Thought Value: {value}") - except ValueError: - value = 0 - state_values[state] = value - return state_values - - else: - raise ValueError("Invalid evaluation strategy. Choose 'value' or 'vote'.") - - -class AoTAgent: - def __init__( - self, - num_thoughts: int = None, - max_steps: int = None, - value_threshold: float = None, - pruning_threshold=0.5, - backtracking_threshold=0.4, - initial_prompt=None, - openai_api_key: str = None, - model=None, - ): - self.num_thoughts = num_thoughts - self.max_steps = max_steps - self.value_threshold = value_threshold - self.backtracking_threshold = backtracking_threshold - self.pruning_threshold = pruning_threshold - self.initial_prompt = initial_prompt - self.output = [] - self.openai_api_key = openai_api_key - self.model = model - self.model = self.model or OpenAI(api_key=self.openai_api_key) - - def solve(self): - try: - self.dfs(self.initial_prompt, 1) - - if not self.output: - logger.error("No valid thoughts were generated during DFS") - return None - - best_state, _ = max(self.output, key=lambda x: x[1]) - solution = self.model.generate_solution(self.initial_prompt, best_state) - print(f"Solution is {solution}") - return solution if solution else best_state - except Exception as error: - logger.error(f"Error in tot_dfs: {error}") - raise error - - def dfs(self, state, step): - if step > self.max_steps: - thought, value = self.evaluate_thought(state) - self.output.append((thought, value)) - return - - thoughts = self.generate_and_filter_thoughts(state) - for next_state in thoughts: - state_value = self.evaluated_thoughts[next_state] - if state_value > self.value_threshold: - child = ( - (state, next_state) - if isinstance(state, str) - else (*state, next_state) - ) - self.dfs(child, step + 1) - - # backtracking - best_value = max([value for _, value in self.output]) - if best_value < self.backtracking_threshold: - self.output.pop() - continue - - def generate_and_filter_thoughts(self, state): - thoughts = self.model.generate_thoughts( - state, self.num_thoughts, self.initial_prompt - ) - - self.evaluated_thoughts = self.model.evaluate_states( - thoughts, self.initial_prompt - ) - - filtered_thoughts = [ - thought - for thought in thoughts - if self.evaluated_thoughts[thought] >= self.pruning_threshold - ] - print(f"filtered_thoughts: {filtered_thoughts}") - return filtered_thoughts - - def evaluate_thought(self, state): - thought = self.model.generate_thoughts(state, 1, self.initial_prompt) - value = self.model.evaluate_states([state], self.initial_prompt)[state] - print(f"Evaluated thought: {value}") - return thought, value diff --git a/swarms/agents/companion.py b/swarms/agents/companion.py deleted file mode 100644 index a630895e7..000000000 --- a/swarms/agents/companion.py +++ /dev/null @@ -1,4 +0,0 @@ -""" -Companion agents converse with the user about the agent the user wants to create then creates the agent with the desired attributes and traits and tools and configurations - -""" diff --git a/swarms/agents/hf_agents.py b/swarms/agents/hf_agents.py deleted file mode 100644 index 4e186e3a2..000000000 --- a/swarms/agents/hf_agents.py +++ /dev/null @@ -1,599 +0,0 @@ -import importlib.util -import json -import os -from dataclasses import dataclass -from typing import Dict - -from huggingface_hub import hf_hub_download, list_spaces -from transformers.tools.base import ( - TASK_MAPPING, - TOOL_CONFIG_FILE, - Tool, - load_tool, - supports_remote, -) -from transformers.tools.prompts import CHAT_MESSAGE_PROMPT, download_prompt -from transformers.tools.python_interpreter import evaluate -from transformers.utils import is_offline_mode, is_openai_available, logging - -# utils -logger = logging.get_logger(__name__) - -if is_openai_available(): - import openai - -else: - StoppingCriteria = object - -_tools_are_initialized = False - -BASE_PYTHON_TOOLS = { - "print": print, - "range": range, - "float": float, - "int": int, - "bool": bool, - "str": str, -} - - -@dataclass -class PreTool: - task: str - description: str - repo_id: str - - -HUGGINGFACE_DEFAULT_TOOLS = {} - -HUGGINGFACE_DEFAULT_TOOLS_FROM_HUB = [ - "image-transformation", - "text-download", - "text-to-image", - "text-to-video", -] - - -def get_remote_tools(organization="huggingface-tools"): - if is_offline_mode(): - logger.info("You are in offline mode, so remote tools are not available.") - return {} - - spaces = list_spaces(author=organization) - tools = {} - for space_info in spaces: - repo_id = space_info.id - resolved_config_file = hf_hub_download( - repo_id, TOOL_CONFIG_FILE, repo_type="space" - ) - with open(resolved_config_file, encoding="utf-8") as reader: - config = json.load(reader) - - task = repo_id.split("/")[-1] - tools[config["name"]] = PreTool( - task=task, description=config["description"], repo_id=repo_id - ) - - return tools - - -def _setup_default_tools(): - global HUGGINGFACE_DEFAULT_TOOLS - global _tools_are_initialized - - if _tools_are_initialized: - return - - main_module = importlib.import_module("transformers") - tools_module = main_module.tools - - remote_tools = get_remote_tools() - for task_name, tool_class_name in TASK_MAPPING.items(): - tool_class = getattr(tools_module, tool_class_name) - description = tool_class.description - HUGGINGFACE_DEFAULT_TOOLS[tool_class.name] = PreTool( - task=task_name, description=description, repo_id=None - ) - - if not is_offline_mode(): - for task_name in HUGGINGFACE_DEFAULT_TOOLS_FROM_HUB: - found = False - for tool_name, tool in remote_tools.items(): - if tool.task == task_name: - HUGGINGFACE_DEFAULT_TOOLS[tool_name] = tool - found = True - break - - if not found: - raise ValueError(f"{task_name} is not implemented on the Hub.") - - _tools_are_initialized = True - - -def resolve_tools(code, toolbox, remote=False, cached_tools=None): - if cached_tools is None: - resolved_tools = BASE_PYTHON_TOOLS.copy() - else: - resolved_tools = cached_tools - for name, tool in toolbox.items(): - if name not in code or name in resolved_tools: - continue - - if isinstance(tool, Tool): - resolved_tools[name] = tool - else: - task_or_repo_id = tool.task if tool.repo_id is None else tool.repo_id - _remote = remote and supports_remote(task_or_repo_id) - resolved_tools[name] = load_tool(task_or_repo_id, remote=_remote) - - return resolved_tools - - -def get_tool_creation_code(code, toolbox, remote=False): - code_lines = ["from transformers import load_tool", ""] - for name, tool in toolbox.items(): - if name not in code or isinstance(tool, Tool): - continue - - task_or_repo_id = tool.task if tool.repo_id is None else tool.repo_id - line = f'{name} = load_tool("{task_or_repo_id}"' - if remote: - line += ", remote=True" - line += ")" - code_lines.append(line) - - return "\n".join(code_lines) + "\n" - - -def clean_code_for_chat(result): - lines = result.split("\n") - idx = 0 - while idx < len(lines) and not lines[idx].lstrip().startswith("```"): - idx += 1 - explanation = "\n".join(lines[:idx]).strip() - if idx == len(lines): - return explanation, None - - idx += 1 - start_idx = idx - while not lines[idx].lstrip().startswith("```"): - idx += 1 - code = "\n".join(lines[start_idx:idx]).strip() - - return explanation, code - - -def clean_code_for_run(result): - result = f"I will use the following {result}" - explanation, code = result.split("Answer:") - explanation = explanation.strip() - code = code.strip() - - code_lines = code.split("\n") - if code_lines[0] in ["```", "```py", "```python"]: - code_lines = code_lines[1:] - if code_lines[-1] == "```": - code_lines = code_lines[:-1] - code = "\n".join(code_lines) - - return explanation, code - - -class Agent: - """ - Base class for all agents which contains the main API methods. - - Args: - chat_prompt_template (`str`, *optional*): - Pass along your own prompt if you want to override the default template for the `chat` method. Can be the - actual prompt template or a repo ID (on the Hugging Face Hub). The prompt should be in a file named - `chat_prompt_template.txt` in this repo in this case. - run_prompt_template (`str`, *optional*): - Pass along your own prompt if you want to override the default template for the `run` method. Can be the - actual prompt template or a repo ID (on the Hugging Face Hub). The prompt should be in a file named - `run_prompt_template.txt` in this repo in this case. - additional_tools ([`Tool`], list of tools or dictionary with tool values, *optional*): - Any additional tools to include on top of the default ones. If you pass along a tool with the same name as - one of the default tools, that default tool will be overridden. - """ - - def __init__( - self, chat_prompt_template=None, run_prompt_template=None, additional_tools=None - ): - _setup_default_tools() - - agent_name = self.__class__.__name__ - self.chat_prompt_template = download_prompt( - chat_prompt_template, agent_name, mode="chat" - ) - self.run_prompt_template = download_prompt( - run_prompt_template, agent_name, mode="run" - ) - self._toolbox = HUGGINGFACE_DEFAULT_TOOLS.copy() - self.log = print - if additional_tools is not None: - if isinstance(additional_tools, (list, tuple)): - additional_tools = {t.name: t for t in additional_tools} - elif not isinstance(additional_tools, dict): - additional_tools = {additional_tools.name: additional_tools} - - replacements = { - name: tool - for name, tool in additional_tools.items() - if name in HUGGINGFACE_DEFAULT_TOOLS - } - self._toolbox.update(additional_tools) - if len(replacements) > 1: - names = "\n".join([f"- {n}: {t}" for n, t in replacements.items()]) - logger.warning( - "The following tools have been replaced by the ones provided in" - f" `additional_tools`:\n{names}." - ) - elif len(replacements) == 1: - name = list(replacements.keys())[0] - logger.warning( - f"{name} has been replaced by {replacements[name]} as provided in" - " `additional_tools`." - ) - - self.prepare_for_new_chat() - - @property - def toolbox(self) -> Dict[str, Tool]: - """Get all tool currently available to the agent""" - return self._toolbox - - def format_prompt(self, task, chat_mode=False): - description = "\n".join( - [f"- {name}: {tool.description}" for name, tool in self.toolbox.items()] - ) - if chat_mode: - if self.chat_history is None: - prompt = self.chat_prompt_template.replace("<>", description) - else: - prompt = self.chat_history - prompt += CHAT_MESSAGE_PROMPT.replace("<>", task) - else: - prompt = self.run_prompt_template.replace("<>", description) - prompt = prompt.replace("<>", task) - return prompt - - def set_stream(self, streamer): - """ - Set the function use to stream results (which is `print` by default). - - Args: - streamer (`callable`): The function to call when streaming results from the LLM. - """ - self.log = streamer - - def chat(self, task, *, return_code=False, remote=False, **kwargs): - """ - Sends a new request to the agent in a chat. Will use the previous ones in its history. - - Args: - task (`str`): The task to perform - return_code (`bool`, *optional*, defaults to `False`): - Whether to just return code and not evaluate it. - remote (`bool`, *optional*, defaults to `False`): - Whether or not to use remote tools (inference endpoints) instead of local ones. - kwargs (additional keyword arguments, *optional*): - Any keyword argument to send to the agent when evaluating the code. - - Example: - - ```py - from transformers import HfAgent - - agent = HfAgent("https://api-inference.huggingface.co/models/bigcode/starcoder") - agent.chat("Draw me a picture of rivers and lakes") - - agent.chat("Transform the picture so that there is a rock in there") - ``` - """ - prompt = self.format_prompt(task, chat_mode=True) - result = self.generate_one(prompt, stop=["Human:", "====="]) - self.chat_history = prompt + result.strip() + "\n" - explanation, code = clean_code_for_chat(result) - - self.log(f"==Explanation from the agent==\n{explanation}") - - if code is not None: - self.log(f"\n\n==Code generated by the agent==\n{code}") - if not return_code: - self.log("\n\n==Result==") - self.cached_tools = resolve_tools( - code, self.toolbox, remote=remote, cached_tools=self.cached_tools - ) - self.chat_state.update(kwargs) - return evaluate( - code, self.cached_tools, self.chat_state, chat_mode=True - ) - else: - tool_code = get_tool_creation_code(code, self.toolbox, remote=remote) - return f"{tool_code}\n{code}" - - def prepare_for_new_chat(self): - """ - Clears the history of prior calls to [`~Agent.chat`]. - """ - self.chat_history = None - self.chat_state = {} - self.cached_tools = None - - def run(self, task, *, return_code=False, remote=False, **kwargs): - """ - Sends a request to the agent. - - Args: - task (`str`): The task to perform - return_code (`bool`, *optional*, defaults to `False`): - Whether to just return code and not evaluate it. - remote (`bool`, *optional*, defaults to `False`): - Whether or not to use remote tools (inference endpoints) instead of local ones. - kwargs (additional keyword arguments, *optional*): - Any keyword argument to send to the agent when evaluating the code. - - Example: - - ```py - from transformers import HfAgent - - agent = HfAgent("https://api-inference.huggingface.co/models/bigcode/starcoder") - agent.run("Draw me a picture of rivers and lakes") - ``` - """ - prompt = self.format_prompt(task) - result = self.generate_one(prompt, stop=["Task:"]) - explanation, code = clean_code_for_run(result) - - self.log(f"==Explanation from the agent==\n{explanation}") - - self.log(f"\n\n==Code generated by the agent==\n{code}") - if not return_code: - self.log("\n\n==Result==") - self.cached_tools = resolve_tools( - code, self.toolbox, remote=remote, cached_tools=self.cached_tools - ) - return evaluate(code, self.cached_tools, state=kwargs.copy()) - else: - tool_code = get_tool_creation_code(code, self.toolbox, remote=remote) - return f"{tool_code}\n{code}" - - def generate_one(self, prompt, stop): - # This is the method to implement in your custom agent. - raise NotImplementedError - - def generate_many(self, prompts, stop): - # Override if you have a way to do batch generation faster than one by one - return [self.generate_one(prompt, stop) for prompt in prompts] - - -class HFAgent(Agent): - """ - Agent that uses the openai API to generate code. - - - - The openAI models are used in generation mode, so even for the `chat()` API, it's better to use models like - `"text-davinci-003"` over the chat-GPT variant. Proper support for chat-GPT models will come in a next version. - - - - Args: - model (`str`, *optional*, defaults to `"text-davinci-003"`): - The name of the OpenAI model to use. - api_key (`str`, *optional*): - The API key to use. If unset, will look for the environment variable `"OPENAI_API_KEY"`. - chat_prompt_template (`str`, *optional*): - Pass along your own prompt if you want to override the default template for the `chat` method. Can be the - actual prompt template or a repo ID (on the Hugging Face Hub). The prompt should be in a file named - `chat_prompt_template.txt` in this repo in this case. - run_prompt_template (`str`, *optional*): - Pass along your own prompt if you want to override the default template for the `run` method. Can be the - actual prompt template or a repo ID (on the Hugging Face Hub). The prompt should be in a file named - `run_prompt_template.txt` in this repo in this case. - additional_tools ([`Tool`], list of tools or dictionary with tool values, *optional*): - Any additional tools to include on top of the default ones. If you pass along a tool with the same name as - one of the default tools, that default tool will be overridden. - - Example: - - ```py - from swarms.agents.hf_agents import HFAgent - - agent = OpenAiAgent(model="text-davinci-003", api_key=xxx) - agent.run("Is the following `text` (in Spanish) positive or negative?", text="¡Este es un API muy agradable!") - ``` - """ - - def __init__( - self, - model="text-davinci-003", - api_key=None, - chat_prompt_template=None, - run_prompt_template=None, - additional_tools=None, - ): - if not is_openai_available(): - raise ImportError( - "Using `OpenAiAgent` requires `openai`: `pip install openai`." - ) - - if api_key is None: - api_key = os.environ.get("OPENAI_API_KEY", None) - if api_key is None: - raise ValueError( - "You need an openai key to use `OpenAIAgent`. You can get one here: Get" - " one here https://openai.com/api/`. If you have one, set it in your" - " env with `os.environ['OPENAI_API_KEY'] = xxx." - ) - else: - openai.api_key = api_key - self.model = model - super().__init__( - chat_prompt_template=chat_prompt_template, - run_prompt_template=run_prompt_template, - additional_tools=additional_tools, - ) - - def generate_many(self, prompts, stop): - if "gpt" in self.model: - return [self._chat_generate(prompt, stop) for prompt in prompts] - else: - return self._completion_generate(prompts, stop) - - def generate_one(self, prompt, stop): - if "gpt" in self.model: - return self._chat_generate(prompt, stop) - else: - return self._completion_generate([prompt], stop)[0] - - def _chat_generate(self, prompt, stop): - result = openai.ChatCompletion.create( - model=self.model, - messages=[{"role": "user", "content": prompt}], - temperature=0, - stop=stop, - ) - return result["choices"][0]["message"]["content"] - - def _completion_generate(self, prompts, stop): - result = openai.Completion.create( - model=self.model, - prompt=prompts, - temperature=0, - stop=stop, - max_tokens=200, - ) - return [answer["text"] for answer in result["choices"]] - - -class AzureOpenAI(Agent): - """ - Agent that uses Azure OpenAI to generate code. See the [official - documentation](https://learn.microsoft.com/en-us/azure/cognitive-services/openai/) to learn how to deploy an openAI - model on Azure - - - - The openAI models are used in generation mode, so even for the `chat()` API, it's better to use models like - `"text-davinci-003"` over the chat-GPT variant. Proper support for chat-GPT models will come in a next version. - - - - Args: - deployment_id (`str`): - The name of the deployed Azure openAI model to use. - api_key (`str`, *optional*): - The API key to use. If unset, will look for the environment variable `"AZURE_OPENAI_API_KEY"`. - resource_name (`str`, *optional*): - The name of your Azure OpenAI Resource. If unset, will look for the environment variable - `"AZURE_OPENAI_RESOURCE_NAME"`. - api_version (`str`, *optional*, default to `"2022-12-01"`): - The API version to use for this agent. - is_chat_mode (`bool`, *optional*): - Whether you are using a completion model or a chat model (see note above, chat models won't be as - efficient). Will default to `gpt` being in the `deployment_id` or not. - chat_prompt_template (`str`, *optional*): - Pass along your own prompt if you want to override the default template for the `chat` method. Can be the - actual prompt template or a repo ID (on the Hugging Face Hub). The prompt should be in a file named - `chat_prompt_template.txt` in this repo in this case. - run_prompt_template (`str`, *optional*): - Pass along your own prompt if you want to override the default template for the `run` method. Can be the - actual prompt template or a repo ID (on the Hugging Face Hub). The prompt should be in a file named - `run_prompt_template.txt` in this repo in this case. - additional_tools ([`Tool`], list of tools or dictionary with tool values, *optional*): - Any additional tools to include on top of the default ones. If you pass along a tool with the same name as - one of the default tools, that default tool will be overridden. - - Example: - - ```py - from transformers import AzureOpenAiAgent - - agent = AzureAiAgent(deployment_id="Davinci-003", api_key=xxx, resource_name=yyy) - agent.run("Is the following `text` (in Spanish) positive or negative?", text="¡Este es un API muy agradable!") - ``` - """ - - def __init__( - self, - deployment_id, - api_key=None, - resource_name=None, - api_version="2022-12-01", - is_chat_model=None, - chat_prompt_template=None, - run_prompt_template=None, - additional_tools=None, - ): - if not is_openai_available(): - raise ImportError( - "Using `OpenAiAgent` requires `openai`: `pip install openai`." - ) - - self.deployment_id = deployment_id - openai.api_type = "azure" - if api_key is None: - api_key = os.environ.get("AZURE_OPENAI_API_KEY", None) - if api_key is None: - raise ValueError( - "You need an Azure openAI key to use `AzureOpenAIAgent`. If you have" - " one, set it in your env with `os.environ['AZURE_OPENAI_API_KEY'] =" - " xxx." - ) - else: - openai.api_key = api_key - if resource_name is None: - resource_name = os.environ.get("AZURE_OPENAI_RESOURCE_NAME", None) - if resource_name is None: - raise ValueError( - "You need a resource_name to use `AzureOpenAIAgent`. If you have one," - " set it in your env with `os.environ['AZURE_OPENAI_RESOURCE_NAME'] =" - " xxx." - ) - else: - openai.api_base = f"https://{resource_name}.openai.azure.com" - openai.api_version = api_version - - if is_chat_model is None: - is_chat_model = "gpt" in deployment_id.lower() - self.is_chat_model = is_chat_model - - super().__init__( - chat_prompt_template=chat_prompt_template, - run_prompt_template=run_prompt_template, - additional_tools=additional_tools, - ) - - def generate_many(self, prompts, stop): - if self.is_chat_model: - return [self._chat_generate(prompt, stop) for prompt in prompts] - else: - return self._completion_generate(prompts, stop) - - def generate_one(self, prompt, stop): - if self.is_chat_model: - return self._chat_generate(prompt, stop) - else: - return self._completion_generate([prompt], stop)[0] - - def _chat_generate(self, prompt, stop): - result = openai.ChatCompletion.create( - engine=self.deployment_id, - messages=[{"role": "user", "content": prompt}], - temperature=0, - stop=stop, - ) - return result["choices"][0]["message"]["content"] - - def _completion_generate(self, prompts, stop): - result = openai.Completion.create( - engine=self.deployment_id, - prompt=prompts, - temperature=0, - stop=stop, - max_tokens=200, - ) - return [answer["text"] for answer in result["choices"]] diff --git a/swarms/agents/idea_to_image_agent.py b/swarms/agents/idea_to_image_agent.py deleted file mode 100644 index ce3654e05..000000000 --- a/swarms/agents/idea_to_image_agent.py +++ /dev/null @@ -1,110 +0,0 @@ -import os -import logging -from dataclasses import dataclass -from swarms.models.dalle3 import Dalle -from swarms.models import OpenAIChat - - -@dataclass -class Idea2Image: - """ - A class used to generate images from text prompts using DALLE-3. - - ... - - Attributes - ---------- - image : str - Text prompt for the image to generate - openai_api_key : str - OpenAI API key - cookie : str - Cookie value for DALLE-3 - output_folder : str - Folder to save the generated images - - Methods - ------- - llm_prompt(): - Returns a prompt for refining the image generation - generate_image(): - Generates and downloads the image based on the prompt - - - Usage: - ------ - from dalle3 import Idea2Image - - idea2image = Idea2Image( - image="Fish hivemind swarm in light blue avatar anime in zen garden pond concept art anime art, happy fish, anime scenery" - ) - idea2image.run() - """ - - image: str - openai_api_key: str = os.getenv("OPENAI_API_KEY") or None - cookie: str = os.getenv("BING_COOKIE") or None - output_folder: str = "images/" - - def __post_init__(self): - self.llm = OpenAIChat(openai_api_key=self.openai_api_key) - self.dalle = Dalle(self.cookie) - - def llm_prompt(self): - LLM_PROMPT = f""" - Refine the USER prompt to create a more precise image tailored to the user's needs using - an image generator like DALLE-3. - - ###### FOLLOW THE GUIDE BELOW TO REFINE THE PROMPT ###### - - - Use natural language prompts up to 400 characters to describe the image you want to generate. Be as specific or vague as needed. - - - Frame your photographic prompts like camera position, lighting, film type, year, usage context. This implicitly suggests image qualities. - - - For illustrations, you can borrow photographic terms like "close up" and prompt for media, style, artist, animation style, etc. - - - Prompt hack: name a film/TV show genre + year to "steal the look" for costumes, lighting, etc without knowing technical details. - - - Try variations of a prompt, make edits, and do recursive uncropping to create interesting journeys and zoom-out effects. - - - Use an image editor like Photopea to uncrop DALL-E outputs and prompt again to extend the image. - - - Combine separate DALL-E outputs into panoramas and murals with careful positioning/editing. - - - Browse communities like Reddit r/dalle2 to get inspired and share your creations. See tools, free image resources, articles. - - - Focus prompts on size, structure, shape, mood, aesthetics to influence the overall vibe and composition. - - - Be more vague or detailed as needed - DALL-E has studied over 400M images and can riff creatively or replicate specific styles. - - - Be descriptive, describe the art style at the end like fusing concept art with anime art or game art or product design art. - - ###### END OF GUIDE ###### - - Prompt to refine: {self.image} - """ - return LLM_PROMPT - - def run(self): - """ - Generates and downloads the image based on the prompt. - - This method refines the prompt using the llm, opens the website with the query, - gets the image URLs, and downloads the images to the specified folder. - """ - # Set up logging - logging.basicConfig(level=logging.INFO) - - # Refine the prompt using the llm - image = self.llm_prompt() - refined_prompt = self.llm(image) - print(f"Refined prompt: {refined_prompt}") - - # Open the website with your query - self.dalle.create(refined_prompt) - - # Get the image URLs - urls = self.dalle.get_urls() - - # Download the images to your specified folder - self.dalle.download(urls, self.output_folder) diff --git a/swarms/agents/meta_prompter.py b/swarms/agents/meta_prompter.py deleted file mode 100644 index aeee98788..000000000 --- a/swarms/agents/meta_prompter.py +++ /dev/null @@ -1,158 +0,0 @@ -from langchain.chains import LLMChain -from langchain.prompts import PromptTemplate -from langchain.memory import ConversationBufferWindowMemory - - -class MetaPrompterAgent: - """ - Meta Prompting Agent - The Meta Prompting Agent has 1 purpose: to create better prompts for an agent. - - The meta prompting agent would be used in this flow: - user task -> MetaPrompterAgent -> Agent - - Args: - llm (BaseLanguageModel): Language Model - max_iters (int, optional): Maximum number of iterations. Defaults to 3. - max_meta_iters (int, optional): Maximum number of meta iterations. Defaults to 5. - failed_phrase (str, optional): Phrase to indicate failure. Defaults to "task failed". - success_phrase (str, optional): Phrase to indicate success. Defaults to "task succeeded". - instructions (str, optional): Instructions to be used in the meta prompt. Defaults to "None". - template (str, optional): Template to be used in the meta prompt. Defaults to None. - memory (ConversationBufferWindowMemory, optional): Memory to be used in the meta prompt. Defaults to None. - meta_template (str, optional): Template to be used in the meta prompt. Defaults to None. - human_input (bool, optional): Whether to use human input. Defaults to False. - - Returns: - str: Response from the agent - - Usage: - -------------- - from swarms.workers import Worker - from swarms.agents.meta_prompter import MetaPrompterAgent - from langchain.llms import OpenAI - - #init llm - llm = OpenAI() - - #init the meta prompter agent that optimized prompts - meta_optimizer = MetaPrompterAgent(llm=llm) - - #init the worker agent - worker = Worker(llm) - - #broad task to complete - task = "Create a feedforward in pytorch" - - #optimize the prompt - optimized_prompt = meta_optimizer.run(task) - - #run the optimized prompt with detailed instructions - result = worker.run(optimized_prompt) - - print(result) - """ - - def __init__( - self, - llm, - max_iters: int = 3, - max_meta_iters: int = 5, - failed_phrase: str = "task failed", - success_phrase: str = "task succeeded", - instructions: str = "None", - template: str = None, - memory=None, - meta_template: str = None, - human_input: bool = False, - ): - self.llm = llm - self.max_iters = max_iters - self.max_meta_iters = max_meta_iters - self.failed_phrase = failed_phrase - self.success_phrase = success_phrase - self.instructions = instructions - self.template = template - self.memory = memory - self.meta_template = meta_template - self.human_input = human_input - - if memory is None: - memory = ConversationBufferWindowMemory() - memory.ai_prefix = "Assistant:" - - template = f""" - Instructions: {self.instructions} - {{{memory.memory_key}}} - Human: {{human_input}} - Assistant: - """ - - prompt = PromptTemplate( - input_variables=["history", "human_input"], template=template - ) - - self.chain = LLMChain( - llm=self.llm(), - prompt=prompt, - verbose=True, - memory=ConversationBufferWindowMemory(), - ) - - def get_chat_history(self, chain_memory): - """Get Chat History from the memory""" - memory_key = chain_memory.memory_key - chat_history = chain_memory.load_memory_variables(memory_key)[memory_key] - return chat_history - - def get_new_instructions(self, meta_output): - """Get New Instructions from the meta_output""" - delimiter = "Instructions: " - new_instructions = meta_output[meta_output.find(delimiter) + len(delimiter) :] - return new_instructions - - def run(self, task: str): - """ - Run the MetaPrompterAgent - - Args: - task (str): The task to be completed - - Returns: - str: The response from the agent - """ - key_phrases = [self.success_phrase, self.failed_phrase] - - for i in range(self.max_meta_iters): - print(f"[Epsisode: {i+1}/{self.max_meta_iters}]") - - chain = self.chain(memory=None) - - output = chain.predict(human_input=task) - - for j in range(self.max_iters): - print(f"(Step {j+1}/{self.max_iters})") - print(f"Assistant: {output}") - print("Human: ") - - if self.human_input: - human_input = input() - - if any(phrase in human_input.lower() for phrase in key_phrases): - break - - output = chain.predict(human_input.lower) - - if self.success_phrase in human_input.lower(): - print("You succeed! Thanks for using!") - return - - meta_chain = self.initialize_meta_chain() - meta_output = meta_chain.predict( - chat_history=self.get_chat_history(chain.memory) - ) - print(f"Feedback: {meta_output}") - - self.instructions = self.get_new_instructions(meta_output) - print(f"New Instruction: {self.instructions}") - print("\n" + "#" * 80 + "\n") diff --git a/swarms/agents/multi_modal_visual_agent.py b/swarms/agents/multi_modal_visual_agent.py deleted file mode 100644 index 347805945..000000000 --- a/swarms/agents/multi_modal_visual_agent.py +++ /dev/null @@ -1,2189 +0,0 @@ -import inspect -import math -import os -import random -import re -import uuid - -import cv2 - -# Grounding DINO -import groundingdino.datasets.transforms as T -import matplotlib.pyplot as plt -import numpy as np -import torch -import wget -from controlnet_aux import HEDdetector, MLSDdetector, OpenposeDetector -from diffusers import ( - ControlNetModel, - EulerAncestralDiscreteScheduler, - StableDiffusionControlNetPipeline, - StableDiffusionInpaintPipeline, - StableDiffusionInstructPix2PixPipeline, - StableDiffusionPipeline, - UniPCMultistepScheduler, -) -from diffusers.pipelines.stable_diffusion import StableDiffusionSafetyChecker -from groundingdino.models import build_model -from groundingdino.util.slconfig import SLConfig -from groundingdino.util.utils import clean_state_dict, get_phrases_from_posmap -from langchain.agents.initialize import initialize_agent -from langchain.agents.tools import Tool -from langchain.chains.conversation.memory import ConversationBufferMemory -from langchain.llms.openai import OpenAI -from PIL import Image, ImageDraw, ImageFont, ImageOps - -# segment anything -from segment_anything import SamAutomaticMaskGenerator, SamPredictor, build_sam -from transformers import ( - BlipForConditionalGeneration, - BlipForQuestionAnswering, - BlipProcessor, - pipeline, -) - -from swarms.agents.message import Message - -# prompts -VISUAL_AGENT_PREFIX = """ -Worker Multi-Modal Agent is designed to be able to assist with -a wide range of text and visual related tasks, from answering simple questions to providing in-depth explanations and discussions on a wide range of topics. -Worker Multi-Modal Agent is able to generate human-like text based on the input it receives, allowing it to engage in natural-sounding conversations and provide responses that are coherent and relevant to the topic at hand. - -Worker Multi-Modal Agent is able to process and understand large amounts of text and images. As a language model, Worker Multi-Modal Agent can not directly read images, but it has a list of tools to finish different visual tasks. Each image will have a file name formed as "image/xxx.png", and Worker Multi-Modal Agent can invoke different tools to indirectly understand pictures. When talking about images, Worker Multi-Modal Agent is very strict to the file name and will never fabricate nonexistent files. When using tools to generate new image files, Worker Multi-Modal Agent is also known that the image may not be the same as the user's demand, and will use other visual question answering tools or description tools to observe the real image. Worker Multi-Modal Agent is able to use tools in a sequence, and is loyal to the tool observation outputs rather than faking the image content and image file name. It will remember to provide the file name from the last tool observation, if a new image is generated. - -Human may provide new figures to Worker Multi-Modal Agent with a description. The description helps Worker Multi-Modal Agent to understand this image, but Worker Multi-Modal Agent should use tools to finish following tasks, rather than directly imagine from the description. - -Overall, Worker Multi-Modal Agent is a powerful visual dialogue assistant tool that can help with a wide range of tasks and provide valuable insights and information on a wide range of topics. - - -TOOLS: ------- - -Worker Multi-Modal Agent has access to the following tools:""" - -VISUAL_AGENT_FORMAT_INSTRUCTIONS = """To use a tool, please use the following format: - -``` -Thought: Do I need to use a tool? Yes -Action: the action to take, should be one of [{tool_names}] -Action Input: the input to the action -Observation: the result of the action -``` - -When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format: - -``` -Thought: Do I need to use a tool? No -{ai_prefix}: [your response here] -``` -""" - -VISUAL_AGENT_SUFFIX = """You are very strict to the filename correctness and will never fake a file name if it does not exist. -You will remember to provide the image file name loyally if it's provided in the last tool observation. - -Begin! - -Previous conversation history: -{chat_history} - -New input: {input} -Since Worker Multi-Modal Agent is a text language model, Worker Multi-Modal Agent must use tools to observe images rather than imagination. -The thoughts and observations are only visible for Worker Multi-Modal Agent, Worker Multi-Modal Agent should remember to repeat important information in the final response for Human. -Thought: Do I need to use a tool? {agent_scratchpad} Let's think step by step. -""" - -VISUAL_AGENT_PREFIX_CN = """Worker Multi-Modal Agent 旨在能够协助完成范围广泛的文本和视觉相关任务,从回答简单的问题到提供对广泛主题的深入解释和讨论。 Worker Multi-Modal Agent 能够根据收到的输入生成类似人类的文本,使其能够进行听起来自然的对话,并提供连贯且与手头主题相关的响应。 - -Worker Multi-Modal Agent 能够处理和理解大量文本和图像。作为一种语言模型,Worker Multi-Modal Agent 不能直接读取图像,但它有一系列工具来完成不同的视觉任务。每张图片都会有一个文件名,格式为“image/xxx.png”,Worker Multi-Modal Agent可以调用不同的工具来间接理解图片。在谈论图片时,Worker Multi-Modal Agent 对文件名的要求非常严格,绝不会伪造不存在的文件。在使用工具生成新的图像文件时,Worker Multi-Modal Agent也知道图像可能与用户需求不一样,会使用其他视觉问答工具或描述工具来观察真实图像。 Worker Multi-Modal Agent 能够按顺序使用工具,并且忠于工具观察输出,而不是伪造图像内容和图像文件名。如果生成新图像,它将记得提供上次工具观察的文件名。 - -Human 可能会向 Worker Multi-Modal Agent 提供带有描述的新图形。描述帮助 Worker Multi-Modal Agent 理解这个图像,但 Worker Multi-Modal Agent 应该使用工具来完成以下任务,而不是直接从描述中想象。有些工具将会返回英文描述,但你对用户的聊天应当采用中文。 - -总的来说,Worker Multi-Modal Agent 是一个强大的可视化对话辅助工具,可以帮助处理范围广泛的任务,并提供关于范围广泛的主题的有价值的见解和信息。 - -工具列表: ------- - -Worker Multi-Modal Agent 可以使用这些工具:""" - -VISUAL_AGENT_FORMAT_INSTRUCTIONS_CN = """用户使用中文和你进行聊天,但是工具的参数应当使用英文。如果要调用工具,你必须遵循如下格式: - -``` -Thought: Do I need to use a tool? Yes -Action: the action to take, should be one of [{tool_names}] -Action Input: the input to the action -Observation: the result of the action -``` - -当你不再需要继续调用工具,而是对观察结果进行总结回复时,你必须使用如下格式: - - -``` -Thought: Do I need to use a tool? No -{ai_prefix}: [your response here] -``` -""" - -VISUAL_AGENT_SUFFIX_CN = """你对文件名的正确性非常严格,而且永远不会伪造不存在的文件。 - -开始! - -因为Worker Multi-Modal Agent是一个文本语言模型,必须使用工具去观察图片而不是依靠想象。 -推理想法和观察结果只对Worker Multi-Modal Agent可见,需要记得在最终回复时把重要的信息重复给用户,你只能给用户返回中文句子。我们一步一步思考。在你使用工具时,工具的参数只能是英文。 - -聊天历史: -{chat_history} - -新输入: {input} -Thought: Do I need to use a tool? {agent_scratchpad} -""" - -os.makedirs("image", exist_ok=True) - - -def seed_everything(seed): - random.seed(seed) - np.random.seed(seed) - torch.manual_seed(seed) - torch.cuda.manual_seed_all(seed) - return seed - - -def prompts(name, description): - def decorator(func): - func.name = name - func.description = description - return func - - return decorator - - -def blend_gt2pt(old_image, new_image, sigma=0.15, steps=100): - new_size = new_image.size - old_size = old_image.size - easy_img = np.array(new_image) - gt_img_array = np.array(old_image) - pos_w = (new_size[0] - old_size[0]) // 2 - pos_h = (new_size[1] - old_size[1]) // 2 - - kernel_h = cv2.getGaussianKernel(old_size[1], old_size[1] * sigma) - kernel_w = cv2.getGaussianKernel(old_size[0], old_size[0] * sigma) - kernel = np.multiply(kernel_h, np.transpose(kernel_w)) - - kernel[steps:-steps, steps:-steps] = 1 - kernel[:steps, :steps] = kernel[:steps, :steps] / kernel[steps - 1, steps - 1] - kernel[:steps, -steps:] = kernel[:steps, -steps:] / kernel[steps - 1, -(steps)] - kernel[-steps:, :steps] = kernel[-steps:, :steps] / kernel[-steps, steps - 1] - kernel[-steps:, -steps:] = kernel[-steps:, -steps:] / kernel[-steps, -steps] - kernel = np.expand_dims(kernel, 2) - kernel = np.repeat(kernel, 3, 2) - - weight = np.linspace(0, 1, steps) - top = np.expand_dims(weight, 1) - top = np.repeat(top, old_size[0] - 2 * steps, 1) - top = np.expand_dims(top, 2) - top = np.repeat(top, 3, 2) - - weight = np.linspace(1, 0, steps) - down = np.expand_dims(weight, 1) - down = np.repeat(down, old_size[0] - 2 * steps, 1) - down = np.expand_dims(down, 2) - down = np.repeat(down, 3, 2) - - weight = np.linspace(0, 1, steps) - left = np.expand_dims(weight, 0) - left = np.repeat(left, old_size[1] - 2 * steps, 0) - left = np.expand_dims(left, 2) - left = np.repeat(left, 3, 2) - - weight = np.linspace(1, 0, steps) - right = np.expand_dims(weight, 0) - right = np.repeat(right, old_size[1] - 2 * steps, 0) - right = np.expand_dims(right, 2) - right = np.repeat(right, 3, 2) - - kernel[:steps, steps:-steps] = top - kernel[-steps:, steps:-steps] = down - kernel[steps:-steps, :steps] = left - kernel[steps:-steps, -steps:] = right - - pt_gt_img = easy_img[pos_h : pos_h + old_size[1], pos_w : pos_w + old_size[0]] - gaussian_gt_img = ( - kernel * gt_img_array + (1 - kernel) * pt_gt_img - ) # gt img with blur img - gaussian_gt_img = gaussian_gt_img.astype(np.int64) - easy_img[pos_h : pos_h + old_size[1], pos_w : pos_w + old_size[0]] = gaussian_gt_img - gaussian_img = Image.fromarray(easy_img) - return gaussian_img - - -def cut_dialogue_history(history_memory, keep_last_n_words=500): - if history_memory is None or len(history_memory) == 0: - return history_memory - tokens = history_memory.split() - n_tokens = len(tokens) - print(f"history_memory:{history_memory}, n_tokens: {n_tokens}") - if n_tokens < keep_last_n_words: - return history_memory - paragraphs = history_memory.split("\n") - last_n_tokens = n_tokens - while last_n_tokens >= keep_last_n_words: - last_n_tokens -= len(paragraphs[0].split(" ")) - paragraphs = paragraphs[1:] - return "\n" + "\n".join(paragraphs) - - -def get_new_image_name(org_img_name, func_name="update"): - head_tail = os.path.split(org_img_name) - head = head_tail[0] - tail = head_tail[1] - name_split = tail.split(".")[0].split("_") - this_new_uuid = str(uuid.uuid4())[:4] - if len(name_split) == 1: - most_org_file_name = name_split[0] - else: - assert len(name_split) == 4 - most_org_file_name = name_split[3] - recent_prev_file_name = name_split[0] - new_file_name = ( - f"{this_new_uuid}_{func_name}_{recent_prev_file_name}_{most_org_file_name}.png" - ) - return os.path.join(head, new_file_name) - - -class InstructPix2Pix: - def __init__(self, device): - print(f"Initializing InstructPix2Pix to {device}") - self.device = device - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - - self.pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained( - "timbrooks/instruct-pix2pix", - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ).to(device) - self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config( - self.pipe.scheduler.config - ) - - @prompts( - name="Instruct Image Using Text", - description=( - "useful when you want to the style of the image to be like the text. " - "like: make it look like a painting. or make it like a robot. " - "The input to this tool should be a comma separated string of two, " - "representing the image_path and the text. " - ), - ) - def inference(self, inputs): - """Change style of image.""" - print("===>Starting InstructPix2Pix Inference") - image_path, text = inputs.split(",")[0], ",".join(inputs.split(",")[1:]) - original_image = Image.open(image_path) - image = self.pipe( - text, image=original_image, num_inference_steps=40, image_guidance_scale=1.2 - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="pix2pix") - image.save(updated_image_path) - print( - f"\nProcessed InstructPix2Pix, Input Image: {image_path}, Instruct Text:" - f" {text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class Text2Image: - def __init__(self, device): - print(f"Initializing Text2Image to {device}") - self.device = device - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.pipe = StableDiffusionPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", torch_dtype=self.torch_dtype - ) - self.pipe.to(device) - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " - "fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image From User Input Text", - description=( - "useful when you want to generate an image from a user input text and save" - " it to a file. like: generate an image of an object or something, or" - " generate an image that includes some objects. The input to this tool" - " should be a string, representing the text used to generate image. " - ), - ) - def inference(self, text): - image_filename = os.path.join("image", f"{str(uuid.uuid4())[:8]}.png") - prompt = text + ", " + self.a_prompt - image = self.pipe(prompt, negative_prompt=self.n_prompt).images[0] - image.save(image_filename) - print( - f"\nProcessed Text2Image, Input Text: {text}, Output Image:" - f" {image_filename}" - ) - return image_filename - - -class ImageCaptioning: - def __init__(self, device): - print(f"Initializing ImageCaptioning to {device}") - self.device = device - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.processor = BlipProcessor.from_pretrained( - "Salesforce/blip-image-captioning-base" - ) - self.model = BlipForConditionalGeneration.from_pretrained( - "Salesforce/blip-image-captioning-base", torch_dtype=self.torch_dtype - ).to(self.device) - - @prompts( - name="Get Photo Description", - description=( - "useful when you want to know what is inside the photo. receives image_path" - " as input. The input to this tool should be a string, representing the" - " image_path. " - ), - ) - def inference(self, image_path): - inputs = self.processor(Image.open(image_path), return_tensors="pt").to( - self.device, self.torch_dtype - ) - out = self.model.generate(**inputs) - captions = self.processor.decode(out[0], skip_special_tokens=True) - print( - f"\nProcessed ImageCaptioning, Input Image: {image_path}, Output Text:" - f" {captions}" - ) - return captions - - -class Image2Canny: - def __init__(self, device): - print("Initializing Image2Canny") - self.low_threshold = 100 - self.high_threshold = 200 - - @prompts( - name="Edge Detection On Image", - description=( - "useful when you want to detect the edge of the image. like: detect the" - " edges of this image, or canny detection on image, or perform edge" - " detection on this image, or detect the canny image of this image. The" - " input to this tool should be a string, representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - image = np.array(image) - canny = cv2.Canny(image, self.low_threshold, self.high_threshold) - canny = canny[:, :, None] - canny = np.concatenate([canny, canny, canny], axis=2) - canny = Image.fromarray(canny) - updated_image_path = get_new_image_name(inputs, func_name="edge") - canny.save(updated_image_path) - print( - f"\nProcessed Image2Canny, Input Image: {inputs}, Output Text:" - f" {updated_image_path}" - ) - return updated_image_path - - -class CannyText2Image: - def __init__(self, device): - print(f"Initializing CannyText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-canny", - torch_dtype=self.torch_dtype, - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " - "fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Canny Image", - description=( - "useful when you want to generate a new real image from both the user" - " description and a canny image. like: generate a real image of a object or" - " something from this canny image, or generate a new real image of a object" - " or something from this edge image. The input to this tool should be a" - " comma separated string of two, representing the image_path and the user" - " description. " - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="canny2image") - image.save(updated_image_path) - print( - f"\nProcessed CannyText2Image, Input Canny: {image_path}, Input Text:" - f" {instruct_text}, Output Text: {updated_image_path}" - ) - return updated_image_path - - -class Image2Line: - def __init__(self, device): - print("Initializing Image2Line") - self.detector = MLSDdetector.from_pretrained("lllyasviel/ControlNet") - - @prompts( - name="Line Detection On Image", - description=( - "useful when you want to detect the straight line of the image. like:" - " detect the straight lines of this image, or straight line detection on" - " image, or perform straight line detection on this image, or detect the" - " straight line image of this image. The input to this tool should be a" - " string, representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - mlsd = self.detector(image) - updated_image_path = get_new_image_name(inputs, func_name="line-of") - mlsd.save(updated_image_path) - print( - f"\nProcessed Image2Line, Input Image: {inputs}, Output Line:" - f" {updated_image_path}" - ) - return updated_image_path - - -class LineText2Image: - def __init__(self, device): - print(f"Initializing LineText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-mlsd", torch_dtype=self.torch_dtype - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " - "fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Line Image", - description=( - "useful when you want to generate a new real image from both the user" - " description and a straight line image. like: generate a real image of a" - " object or something from this straight line image, or generate a new real" - " image of a object or something from this straight lines. The input to" - " this tool should be a comma separated string of two, representing the" - " image_path and the user description. " - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="line2image") - image.save(updated_image_path) - print( - f"\nProcessed LineText2Image, Input Line: {image_path}, Input Text:" - f" {instruct_text}, Output Text: {updated_image_path}" - ) - return updated_image_path - - -class Image2Hed: - def __init__(self, device): - print("Initializing Image2Hed") - self.detector = HEDdetector.from_pretrained("lllyasviel/ControlNet") - - @prompts( - name="Hed Detection On Image", - description=( - "useful when you want to detect the soft hed boundary of the image. like:" - " detect the soft hed boundary of this image, or hed boundary detection on" - " image, or perform hed boundary detection on this image, or detect soft" - " hed boundary image of this image. The input to this tool should be a" - " string, representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - hed = self.detector(image) - updated_image_path = get_new_image_name(inputs, func_name="hed-boundary") - hed.save(updated_image_path) - print( - f"\nProcessed Image2Hed, Input Image: {inputs}, Output Hed:" - f" {updated_image_path}" - ) - return updated_image_path - - -class HedText2Image: - def __init__(self, device): - print(f"Initializing HedText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-hed", torch_dtype=self.torch_dtype - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " - "fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Soft Hed Boundary Image", - description=( - "useful when you want to generate a new real image from both the user" - " description and a soft hed boundary image. like: generate a real image of" - " a object or something from this soft hed boundary image, or generate a" - " new real image of a object or something from this hed boundary. The input" - " to this tool should be a comma separated string of two, representing the" - " image_path and the user description" - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="hed2image") - image.save(updated_image_path) - print( - f"\nProcessed HedText2Image, Input Hed: {image_path}, Input Text:" - f" {instruct_text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class Image2Scribble: - def __init__(self, device): - print("Initializing Image2Scribble") - self.detector = HEDdetector.from_pretrained("lllyasviel/ControlNet") - - @prompts( - name="Sketch Detection On Image", - description=( - "useful when you want to generate a scribble of the image. like: generate a" - " scribble of this image, or generate a sketch from this image, detect the" - " sketch from this image. The input to this tool should be a string," - " representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - scribble = self.detector(image, scribble=True) - updated_image_path = get_new_image_name(inputs, func_name="scribble") - scribble.save(updated_image_path) - print( - f"\nProcessed Image2Scribble, Input Image: {inputs}, Output Scribble:" - f" {updated_image_path}" - ) - return updated_image_path - - -class ScribbleText2Image: - def __init__(self, device): - print(f"Initializing ScribbleText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-scribble", - torch_dtype=self.torch_dtype, - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " - "fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Sketch Image", - description=( - "useful when you want to generate a new real image from both the user" - " description and a scribble image or a sketch image. The input to this" - " tool should be a comma separated string of two, representing the" - " image_path and the user description" - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="scribble2image") - image.save(updated_image_path) - print( - f"\nProcessed ScribbleText2Image, Input Scribble: {image_path}, Input Text:" - f" {instruct_text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class Image2Pose: - def __init__(self, device): - print("Initializing Image2Pose") - self.detector = OpenposeDetector.from_pretrained("lllyasviel/ControlNet") - - @prompts( - name="Pose Detection On Image", - description=( - "useful when you want to detect the human pose of the image. like: generate" - " human poses of this image, or generate a pose image from this image. The" - " input to this tool should be a string, representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - pose = self.detector(image) - updated_image_path = get_new_image_name(inputs, func_name="human-pose") - pose.save(updated_image_path) - print( - f"\nProcessed Image2Pose, Input Image: {inputs}, Output Pose:" - f" {updated_image_path}" - ) - return updated_image_path - - -class PoseText2Image: - def __init__(self, device): - print(f"Initializing PoseText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-openpose", - torch_dtype=self.torch_dtype, - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.num_inference_steps = 20 - self.seed = -1 - self.unconditional_guidance_scale = 9.0 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit," - " fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Pose Image", - description=( - "useful when you want to generate a new real image from both the user" - " description and a human pose image. like: generate a real image of a" - " human from this human pose image, or generate a new real image of a human" - " from this pose. The input to this tool should be a comma separated string" - " of two, representing the image_path and the user description" - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="pose2image") - image.save(updated_image_path) - print( - f"\nProcessed PoseText2Image, Input Pose: {image_path}, Input Text:" - f" {instruct_text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class SegText2Image: - def __init__(self, device): - print(f"Initializing SegText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-seg", torch_dtype=self.torch_dtype - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit," - " fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Segmentations", - description=( - "useful when you want to generate a new real image from both the user" - " description and segmentations. like: generate a real image of a object or" - " something from this segmentation image, or generate a new real image of a" - " object or something from these segmentations. The input to this tool" - " should be a comma separated string of two, representing the image_path" - " and the user description" - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="segment2image") - image.save(updated_image_path) - print( - f"\nProcessed SegText2Image, Input Seg: {image_path}, Input Text:" - f" {instruct_text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class Image2Depth: - def __init__(self, device): - print("Initializing Image2Depth") - self.depth_estimator = pipeline("depth-estimation") - - @prompts( - name="Predict Depth On Image", - description=( - "useful when you want to detect depth of the image. like: generate the" - " depth from this image, or detect the depth map on this image, or predict" - " the depth for this image. The input to this tool should be a string," - " representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - depth = self.depth_estimator(image)["depth"] - depth = np.array(depth) - depth = depth[:, :, None] - depth = np.concatenate([depth, depth, depth], axis=2) - depth = Image.fromarray(depth) - updated_image_path = get_new_image_name(inputs, func_name="depth") - depth.save(updated_image_path) - print( - f"\nProcessed Image2Depth, Input Image: {inputs}, Output Depth:" - f" {updated_image_path}" - ) - return updated_image_path - - -class DepthText2Image: - def __init__(self, device): - print(f"Initializing DepthText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-depth", - torch_dtype=self.torch_dtype, - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit," - " fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Depth", - description=( - "useful when you want to generate a new real image from both the user" - " description and depth image. like: generate a real image of a object or" - " something from this depth image, or generate a new real image of a object" - " or something from the depth map. The input to this tool should be a comma" - " separated string of two, representing the image_path and the user" - " description" - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="depth2image") - image.save(updated_image_path) - print( - f"\nProcessed DepthText2Image, Input Depth: {image_path}, Input Text:" - f" {instruct_text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class Image2Normal: - def __init__(self, device): - print("Initializing Image2Normal") - self.depth_estimator = pipeline( - "depth-estimation", model="Intel/dpt-hybrid-midas" - ) - self.bg_threhold = 0.4 - - @prompts( - name="Predict Normal Map On Image", - description=( - "useful when you want to detect norm map of the image. like: generate" - " normal map from this image, or predict normal map of this image. The" - " input to this tool should be a string, representing the image_path" - ), - ) - def inference(self, inputs): - image = Image.open(inputs) - original_size = image.size - image = self.depth_estimator(image)["predicted_depth"][0] - image = image.numpy() - image_depth = image.copy() - image_depth -= np.min(image_depth) - image_depth /= np.max(image_depth) - x = cv2.Sobel(image, cv2.CV_32F, 1, 0, ksize=3) - x[image_depth < self.bg_threhold] = 0 - y = cv2.Sobel(image, cv2.CV_32F, 0, 1, ksize=3) - y[image_depth < self.bg_threhold] = 0 - z = np.ones_like(x) * np.pi * 2.0 - image = np.stack([x, y, z], axis=2) - image /= np.sum(image**2.0, axis=2, keepdims=True) ** 0.5 - image = (image * 127.5 + 127.5).clip(0, 255).astype(np.uint8) - image = Image.fromarray(image) - image = image.resize(original_size) - updated_image_path = get_new_image_name(inputs, func_name="normal-map") - image.save(updated_image_path) - print( - f"\nProcessed Image2Normal, Input Image: {inputs}, Output Depth:" - f" {updated_image_path}" - ) - return updated_image_path - - -class NormalText2Image: - def __init__(self, device): - print(f"Initializing NormalText2Image to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.controlnet = ControlNetModel.from_pretrained( - "fusing/stable-diffusion-v1-5-controlnet-normal", - torch_dtype=self.torch_dtype, - ) - self.pipe = StableDiffusionControlNetPipeline.from_pretrained( - "runwayml/stable-diffusion-v1-5", - controlnet=self.controlnet, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - torch_dtype=self.torch_dtype, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config( - self.pipe.scheduler.config - ) - self.pipe.to(device) - self.seed = -1 - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit," - " fewer digits, cropped, worst quality, low quality" - ) - - @prompts( - name="Generate Image Condition On Normal Map", - description=( - "useful when you want to generate a new real image from both the user" - " description and normal map. like: generate a real image of a object or" - " something from this normal map, or generate a new real image of a object" - " or something from the normal map. The input to this tool should be a" - " comma separated string of two, representing the image_path and the user" - " description" - ), - ) - def inference(self, inputs): - image_path, instruct_text = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - image = Image.open(image_path) - self.seed = random.randint(0, 65535) - seed_everything(self.seed) - prompt = f"{instruct_text}, {self.a_prompt}" - image = self.pipe( - prompt, - image, - num_inference_steps=20, - eta=0.0, - negative_prompt=self.n_prompt, - guidance_scale=9.0, - ).images[0] - updated_image_path = get_new_image_name(image_path, func_name="normal2image") - image.save(updated_image_path) - print( - f"\nProcessed NormalText2Image, Input Normal: {image_path}, Input Text:" - f" {instruct_text}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class VisualQuestionAnswering: - def __init__(self, device): - print(f"Initializing VisualQuestionAnswering to {device}") - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.device = device - self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") - self.model = BlipForQuestionAnswering.from_pretrained( - "Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype - ).to(self.device) - - @prompts( - name="Answer Question About The Image", - description=( - "useful when you need an answer for a question based on an image. like:" - " what is the background color of the last image, how many cats in this" - " figure, what is in this figure. The input to this tool should be a comma" - " separated string of two, representing the image_path and the question" - ), - ) - def inference(self, inputs): - image_path, question = inputs.split(",")[0], ",".join(inputs.split(",")[1:]) - raw_image = Image.open(image_path).convert("RGB") - inputs = self.processor(raw_image, question, return_tensors="pt").to( - self.device, self.torch_dtype - ) - out = self.model.generate(**inputs) - answer = self.processor.decode(out[0], skip_special_tokens=True) - print( - f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input" - f" Question: {question}, Output Answer: {answer}" - ) - return answer - - -class Segmenting: - def __init__(self, device): - print(f"Inintializing Segmentation to {device}") - self.device = device - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.model_checkpoint_path = os.path.join("checkpoints", "sam") - - self.download_parameters() - self.sam = build_sam(checkpoint=self.model_checkpoint_path).to(device) - self.sam_predictor = SamPredictor(self.sam) - self.mask_generator = SamAutomaticMaskGenerator(self.sam) - - self.saved_points = [] - self.saved_labels = [] - - def download_parameters(self): - url = "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth" - if not os.path.exists(self.model_checkpoint_path): - wget.download(url, out=self.model_checkpoint_path) - - def show_mask( - self, - mask: np.ndarray, - image: np.ndarray, - random_color: bool = False, - transparency=1, - ) -> np.ndarray: - """Visualize a mask on top of an image. - Args: - mask (np.ndarray): A 2D array of shape (H, W). - image (np.ndarray): A 3D array of shape (H, W, 3). - random_color (bool): Whether to use a random color for the mask. - Outputs: - np.ndarray: A 3D array of shape (H, W, 3) with the mask - visualized on top of the image. - transparenccy: the transparency of the segmentation mask - """ - - if random_color: - color = np.concatenate([np.random.random(3)], axis=0) - else: - color = np.array([30 / 255, 144 / 255, 255 / 255]) - h, w = mask.shape[-2:] - mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1) * 255 - - image = cv2.addWeighted(image, 0.7, mask_image.astype("uint8"), transparency, 0) - - return image - - def show_box(self, box, ax, label): - x0, y0 = box[0], box[1] - w, h = box[2] - box[0], box[3] - box[1] - ax.add_patch( - plt.Rectangle( - (x0, y0), w, h, edgecolor="green", facecolor=(0, 0, 0, 0), lw=2 - ) - ) - ax.text(x0, y0, label) - - def get_mask_with_boxes(self, image_pil, image, boxes_filt): - size = image_pil.size - H, W = size[1], size[0] - for i in range(boxes_filt.size(0)): - boxes_filt[i] = boxes_filt[i] * torch.Tensor([W, H, W, H]) - boxes_filt[i][:2] -= boxes_filt[i][2:] / 2 - boxes_filt[i][2:] += boxes_filt[i][:2] - - boxes_filt = boxes_filt.cpu() - transformed_boxes = self.sam_predictor.transform.apply_boxes_torch( - boxes_filt, image.shape[:2] - ).to(self.device) - - masks, _, _ = self.sam_predictor.predict_torch( - point_coords=None, - point_labels=None, - boxes=transformed_boxes.to(self.device), - multimask_output=False, - ) - return masks - - def segment_image_with_boxes(self, image_pil, image_path, boxes_filt, pred_phrases): - image = cv2.imread(image_path) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - self.sam_predictor.set_image(image) - - masks = self.get_mask_with_boxes(image_pil, image, boxes_filt) - - # draw output image - - for mask in masks: - image = self.show_mask( - mask[0].cpu().numpy(), image, random_color=True, transparency=0.3 - ) - - updated_image_path = get_new_image_name(image_path, func_name="segmentation") - - new_image = Image.fromarray(image) - new_image.save(updated_image_path) - - return updated_image_path - - def set_image(self, img) -> None: - """Set the image for the predictor.""" - with torch.cuda.amp.autocast(): - self.sam_predictor.set_image(img) - - def show_points( - self, coords: np.ndarray, labels: np.ndarray, image: np.ndarray - ) -> np.ndarray: - """Visualize points on top of an image. - - Args: - coords (np.ndarray): A 2D array of shape (N, 2). - labels (np.ndarray): A 1D array of shape (N,). - image (np.ndarray): A 3D array of shape (H, W, 3). - Returns: - np.ndarray: A 3D array of shape (H, W, 3) with the points - visualized on top of the image. - """ - pos_points = coords[labels == 1] - neg_points = coords[labels == 0] - for p in pos_points: - image = cv2.circle( - image, p.astype(int), radius=3, color=(0, 255, 0), thickness=-1 - ) - for p in neg_points: - image = cv2.circle( - image, p.astype(int), radius=3, color=(255, 0, 0), thickness=-1 - ) - return image - - def segment_image_with_click(self, img, is_positive: bool): - self.sam_predictor.set_image(img) - # self.saved_points.append([evt.index[0], evt.index[1]]) - self.saved_labels.append(1 if is_positive else 0) - input_point = np.array(self.saved_points) - input_label = np.array(self.saved_labels) - - # Predict the mask - with torch.cuda.amp.autocast(): - masks, scores, logits = self.sam_predictor.predict( - point_coords=input_point, - point_labels=input_label, - multimask_output=False, - ) - - img = self.show_mask(masks[0], img, random_color=False, transparency=0.3) - - img = self.show_points(input_point, input_label, img) - - return img - - def segment_image_with_coordinate(self, img, is_positive: bool, coordinate: tuple): - """ - Args: - img (numpy.ndarray): the given image, shape: H x W x 3. - is_positive: whether the click is positive, if want to add mask use True else False. - coordinate: the position of the click - If the position is (x,y), means click at the x-th column and y-th row of the pixel matrix. - So x correspond to W, and y correspond to H. - Output: - img (PLI.Image.Image): the result image - result_mask (numpy.ndarray): the result mask, shape: H x W - - Other parameters: - transparency (float): the transparenccy of the mask - to control he degree of transparency after the mask is superimposed. - if transparency=1, then the masked part will be completely replaced with other colors. - """ - self.sam_predictor.set_image(img) - self.saved_points.append([coordinate[0], coordinate[1]]) - self.saved_labels.append(1 if is_positive else 0) - input_point = np.array(self.saved_points) - input_label = np.array(self.saved_labels) - - # Predict the mask - with torch.cuda.amp.autocast(): - masks, scores, logits = self.sam_predictor.predict( - point_coords=input_point, - point_labels=input_label, - multimask_output=False, - ) - - img = self.show_mask(masks[0], img, random_color=False, transparency=0.3) - - img = self.show_points(input_point, input_label, img) - - img = Image.fromarray(img) - - result_mask = masks[0] - - return img, result_mask - - @prompts( - name="Segment the Image", - description=( - "useful when you want to segment all the part of the image, but not segment" - " a certain object.like: segment all the object in this image, or generate" - " segmentations on this image, or segment the image,or perform segmentation" - " on this image, or segment all the object in this image.The input to this" - " tool should be a string, representing the image_path" - ), - ) - def inference_all(self, image_path): - image = cv2.imread(image_path) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - masks = self.mask_generator.generate(image) - plt.figure(figsize=(20, 20)) - plt.imshow(image) - if len(masks) == 0: - return - sorted_anns = sorted(masks, key=(lambda x: x["area"]), reverse=True) - ax = plt.gca() - ax.set_autoscale_on(False) - for ann in sorted_anns: - m = ann["segmentation"] - img = np.ones((m.shape[0], m.shape[1], 3)) - color_mask = np.random.random((1, 3)).tolist()[0] - for i in range(3): - img[:, :, i] = color_mask[i] - ax.imshow(np.dstack((img, m))) - - updated_image_path = get_new_image_name(image_path, func_name="segment-image") - plt.axis("off") - plt.savefig(updated_image_path, bbox_inches="tight", dpi=300, pad_inches=0.0) - return updated_image_path - - -class Text2Box: - def __init__(self, device): - print(f"Initializing ObjectDetection to {device}") - self.device = device - self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 - self.model_checkpoint_path = os.path.join("checkpoints", "groundingdino") - self.model_config_path = os.path.join("checkpoints", "grounding_config.py") - self.download_parameters() - self.box_threshold = 0.3 - self.text_threshold = 0.25 - self.grounding = (self.load_model()).to(self.device) - - def download_parameters(self): - url = "https://github.com/IDEA-Research/GroundingDINO/releases/download/v0.1.0-alpha/groundingdino_swint_ogc.pth" - if not os.path.exists(self.model_checkpoint_path): - wget.download(url, out=self.model_checkpoint_path) - config_url = "https://raw.githubusercontent.com/IDEA-Research/GroundingDINO/main/groundingdino/config/GroundingDINO_SwinT_OGC.py" - if not os.path.exists(self.model_config_path): - wget.download(config_url, out=self.model_config_path) - - def load_image(self, image_path): - # load image - image_pil = Image.open(image_path).convert("RGB") # load image - - transform = T.Compose( - [ - T.RandomResize([512], max_size=1333), - T.ToTensor(), - T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), - ] - ) - image, _ = transform(image_pil, None) # 3, h, w - return image_pil, image - - def load_model(self): - args = SLConfig.fromfile(self.model_config_path) - args.device = self.device - model = build_model(args) - checkpoint = torch.load(self.model_checkpoint_path, map_location="cpu") - load_res = model.load_state_dict( - clean_state_dict(checkpoint["model"]), strict=False - ) - print(load_res) - _ = model.eval() - return model - - def get_grounding_boxes(self, image, caption, with_logits=True): - caption = caption.lower() - caption = caption.strip() - if not caption.endswith("."): - caption = caption + "." - image = image.to(self.device) - with torch.no_grad(): - outputs = self.grounding(image[None], captions=[caption]) - logits = outputs["pred_logits"].cpu().sigmoid()[0] # (nq, 256) - boxes = outputs["pred_boxes"].cpu()[0] # (nq, 4) - logits.shape[0] - - # filter output - logits_filt = logits.clone() - boxes_filt = boxes.clone() - filt_mask = logits_filt.max(dim=1)[0] > self.box_threshold - logits_filt = logits_filt[filt_mask] # num_filt, 256 - boxes_filt = boxes_filt[filt_mask] # num_filt, 4 - logits_filt.shape[0] - - # get phrase - tokenlizer = self.grounding.tokenizer - tokenized = tokenlizer(caption) - # build pred - pred_phrases = [] - for logit, box in zip(logits_filt, boxes_filt): - pred_phrase = get_phrases_from_posmap( - logit > self.text_threshold, tokenized, tokenlizer - ) - if with_logits: - pred_phrases.append(pred_phrase + f"({str(logit.max().item())[:4]})") - else: - pred_phrases.append(pred_phrase) - - return boxes_filt, pred_phrases - - def plot_boxes_to_image(self, image_pil, tgt): - H, W = tgt["size"] - boxes = tgt["boxes"] - labels = tgt["labels"] - assert len(boxes) == len(labels), "boxes and labels must have same length" - - draw = ImageDraw.Draw(image_pil) - mask = Image.new("L", image_pil.size, 0) - mask_draw = ImageDraw.Draw(mask) - - # draw boxes and masks - for box, label in zip(boxes, labels): - # from 0..1 to 0..W, 0..H - box = box * torch.Tensor([W, H, W, H]) - # from xywh to xyxy - box[:2] -= box[2:] / 2 - box[2:] += box[:2] - # random color - color = tuple(np.random.randint(0, 255, size=3).tolist()) - # draw - x0, y0, x1, y1 = box - x0, y0, x1, y1 = int(x0), int(y0), int(x1), int(y1) - - draw.rectangle([x0, y0, x1, y1], outline=color, width=6) - # draw.text((x0, y0), str(label), fill=color) - - font = ImageFont.load_default() - if hasattr(font, "getbbox"): - bbox = draw.textbbox((x0, y0), str(label), font) - else: - w, h = draw.textsize(str(label), font) - bbox = (x0, y0, w + x0, y0 + h) - # bbox = draw.textbbox((x0, y0), str(label)) - draw.rectangle(bbox, fill=color) - draw.text((x0, y0), str(label), fill="white") - - mask_draw.rectangle([x0, y0, x1, y1], fill=255, width=2) - - return image_pil, mask - - @prompts( - name="Detect the Give Object", - description=( - "useful when you only want to detect or find out given objects in the" - " pictureThe input to this tool should be a comma separated string of two," - " representing the image_path, the text description of the object to be" - " found" - ), - ) - def inference(self, inputs): - image_path, det_prompt = inputs.split(",") - print(f"image_path={image_path}, text_prompt={det_prompt}") - image_pil, image = self.load_image(image_path) - - boxes_filt, pred_phrases = self.get_grounding_boxes(image, det_prompt) - - size = image_pil.size - pred_dict = { - "boxes": boxes_filt, - "size": [size[1], size[0]], # H,W - "labels": pred_phrases, - } - - image_with_box = self.plot_boxes_to_image(image_pil, pred_dict)[0] - - updated_image_path = get_new_image_name( - image_path, func_name="detect-something" - ) - updated_image = image_with_box.resize(size) - updated_image.save(updated_image_path) - print( - f"\nProcessed ObejectDetecting, Input Image: {image_path}, Object to be" - f" Detect {det_prompt}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class Inpainting: - def __init__(self, device): - self.device = device - self.revision = "fp16" if "cuda" in self.device else None - self.torch_dtype = torch.float16 if "cuda" in self.device else torch.float32 - - self.inpaint = StableDiffusionInpaintPipeline.from_pretrained( - "runwayml/stable-diffusion-inpainting", - revision=self.revision, - torch_dtype=self.torch_dtype, - safety_checker=StableDiffusionSafetyChecker.from_pretrained( - "CompVis/stable-diffusion-safety-checker" - ), - ).to(device) - - def __call__( - self, prompt, image, mask_image, height=512, width=512, num_inference_steps=50 - ): - update_image = self.inpaint( - prompt=prompt, - image=image.resize((width, height)), - mask_image=mask_image.resize((width, height)), - height=height, - width=width, - num_inference_steps=num_inference_steps, - ).images[0] - return update_image - - -class InfinityOutPainting: - template_model = True # Add this line to show this is a template model. - - def __init__(self, ImageCaptioning, Inpainting, VisualQuestionAnswering): - self.llm = OpenAI(temperature=0) - self.ImageCaption = ImageCaptioning - self.inpaint = Inpainting - self.ImageVQA = VisualQuestionAnswering - self.a_prompt = "best quality, extremely detailed" - self.n_prompt = ( - "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " - "fewer digits, cropped, worst quality, low quality" - ) - - def get_BLIP_vqa(self, image, question): - inputs = self.ImageVQA.processor(image, question, return_tensors="pt").to( - self.ImageVQA.device, self.ImageVQA.torch_dtype - ) - out = self.ImageVQA.model.generate(**inputs) - answer = self.ImageVQA.processor.decode(out[0], skip_special_tokens=True) - print( - f"\nProcessed VisualQuestionAnswering, Input Question: {question}, Output" - f" Answer: {answer}" - ) - return answer - - def get_BLIP_caption(self, image): - inputs = self.ImageCaption.processor(image, return_tensors="pt").to( - self.ImageCaption.device, self.ImageCaption.torch_dtype - ) - out = self.ImageCaption.model.generate(**inputs) - BLIP_caption = self.ImageCaption.processor.decode( - out[0], skip_special_tokens=True - ) - return BLIP_caption - - def check_prompt(self, prompt): - check = ( - "Here is a paragraph with adjectives. " - f"{prompt} " - "Please change all plural forms in the adjectives to singular forms. " - ) - return self.llm(check) - - def get_imagine_caption(self, image, imagine): - BLIP_caption = self.get_BLIP_caption(image) - background_color = self.get_BLIP_vqa( - image, "what is the background color of this image" - ) - style = self.get_BLIP_vqa(image, "what is the style of this image") - imagine_prompt = ( - "let's pretend you are an excellent painter and now there is an incomplete" - f" painting with {BLIP_caption} in the center, please imagine the complete" - " painting and describe ityou should consider the background color is" - f" {background_color}, the style is {style}You should make the painting as" - " vivid and realistic as possibleYou can not use words like painting or" - " pictureand you should use no more than 50 words to describe it" - ) - caption = self.llm(imagine_prompt) if imagine else BLIP_caption - caption = self.check_prompt(caption) - print( - f"BLIP observation: {BLIP_caption}, ChatGPT imagine to {caption}" - ) if imagine else print(f"Prompt: {caption}") - return caption - - def resize_image(self, image, max_size=1000000, multiple=8): - aspect_ratio = image.size[0] / image.size[1] - new_width = int(math.sqrt(max_size * aspect_ratio)) - new_height = int(new_width / aspect_ratio) - new_width, new_height = new_width - (new_width % multiple), new_height - ( - new_height % multiple - ) - return image.resize((new_width, new_height)) - - def dowhile(self, original_img, tosize, expand_ratio, imagine, usr_prompt): - old_img = original_img - while old_img.size != tosize: - prompt = ( - self.check_prompt(usr_prompt) - if usr_prompt - else self.get_imagine_caption(old_img, imagine) - ) - crop_w = 15 if old_img.size[0] != tosize[0] else 0 - crop_h = 15 if old_img.size[1] != tosize[1] else 0 - old_img = ImageOps.crop(old_img, (crop_w, crop_h, crop_w, crop_h)) - temp_canvas_size = ( - expand_ratio * old_img.width - if expand_ratio * old_img.width < tosize[0] - else tosize[0], - expand_ratio * old_img.height - if expand_ratio * old_img.height < tosize[1] - else tosize[1], - ) - temp_canvas, temp_mask = Image.new( - "RGB", temp_canvas_size, color="white" - ), Image.new("L", temp_canvas_size, color="white") - x, y = (temp_canvas.width - old_img.width) // 2, ( - temp_canvas.height - old_img.height - ) // 2 - temp_canvas.paste(old_img, (x, y)) - temp_mask.paste(0, (x, y, x + old_img.width, y + old_img.height)) - resized_temp_canvas, resized_temp_mask = self.resize_image( - temp_canvas - ), self.resize_image(temp_mask) - image = self.inpaint( - prompt=prompt, - image=resized_temp_canvas, - mask_image=resized_temp_mask, - height=resized_temp_canvas.height, - width=resized_temp_canvas.width, - num_inference_steps=50, - ).resize((temp_canvas.width, temp_canvas.height), Image.ANTIALIAS) - image = blend_gt2pt(old_img, image) - old_img = image - return old_img - - @prompts( - name="Extend An Image", - description=( - "useful when you need to extend an image into a larger image.like: extend" - " the image into a resolution of 2048x1024, extend the image into" - " 2048x1024. The input to this tool should be a comma separated string of" - " two, representing the image_path and the resolution of widthxheight" - ), - ) - def inference(self, inputs): - image_path, resolution = inputs.split(",") - width, height = resolution.split("x") - tosize = (int(width), int(height)) - image = Image.open(image_path) - image = ImageOps.crop(image, (10, 10, 10, 10)) - out_painted_image = self.dowhile(image, tosize, 4, True, False) - updated_image_path = get_new_image_name(image_path, func_name="outpainting") - out_painted_image.save(updated_image_path) - print( - f"\nProcessed InfinityOutPainting, Input Image: {image_path}, Input" - f" Resolution: {resolution}, Output Image: {updated_image_path}" - ) - return updated_image_path - - -class ObjectSegmenting: - template_model = True # Add this line to show this is a template model. - - def __init__(self, Text2Box: Text2Box, Segmenting: Segmenting): - # self.llm = OpenAI(temperature=0) - self.grounding = Text2Box - self.sam = Segmenting - - @prompts( - name="Segment the given object", - description=( - "useful when you only want to segment the certain objects in the" - " pictureaccording to the given textlike: segment the cat,or can you" - " segment an obeject for meThe input to this tool should be a comma" - " separated string of two, representing the image_path, the text" - " description of the object to be found" - ), - ) - def inference(self, inputs): - image_path, det_prompt = inputs.split(",") - print(f"image_path={image_path}, text_prompt={det_prompt}") - image_pil, image = self.grounding.load_image(image_path) - - boxes_filt, pred_phrases = self.grounding.get_grounding_boxes(image, det_prompt) - updated_image_path = self.sam.segment_image_with_boxes( - image_pil, image_path, boxes_filt, pred_phrases - ) - print( - f"\nProcessed ObejectSegmenting, Input Image: {image_path}, Object to be" - f" Segment {det_prompt}, Output Image: {updated_image_path}" - ) - return updated_image_path - - def merge_masks(self, masks): - """ - Args: - mask (numpy.ndarray): shape N x 1 x H x W - Outputs: - new_mask (numpy.ndarray): shape H x W - """ - if type(masks) == torch.Tensor: - x = masks - elif type(masks) == np.ndarray: - x = torch.tensor(masks, dtype=int) - else: - raise TypeError( - "the type of the input masks must be numpy.ndarray or torch.tensor" - ) - x = x.squeeze(dim=1) - value, _ = x.max(dim=0) - new_mask = value.cpu().numpy() - new_mask.astype(np.uint8) - return new_mask - - def get_mask(self, image_path, text_prompt): - print(f"image_path={image_path}, text_prompt={text_prompt}") - # image_pil (PIL.Image.Image) -> size: W x H - # image (numpy.ndarray) -> H x W x 3 - image_pil, image = self.grounding.load_image(image_path) - - boxes_filt, pred_phrases = self.grounding.get_grounding_boxes( - image, text_prompt - ) - image = cv2.imread(image_path) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - self.sam.sam_predictor.set_image(image) - - # masks (torch.tensor) -> N x 1 x H x W - masks = self.sam.get_mask_with_boxes(image_pil, image, boxes_filt) - - # merged_mask -> H x W - merged_mask = self.merge_masks(masks) - # draw output image - - for mask in masks: - image = self.sam.show_mask( - mask[0].cpu().numpy(), image, random_color=True, transparency=0.3 - ) - - Image.fromarray(merged_mask) - - return merged_mask - - -class ImageEditing: - template_model = True - - def __init__( - self, Text2Box: Text2Box, Segmenting: Segmenting, Inpainting: Inpainting - ): - print("Initializing ImageEditing") - self.sam = Segmenting - self.grounding = Text2Box - self.inpaint = Inpainting - - def pad_edge(self, mask, padding): - # mask Tensor [H,W] - mask = mask.numpy() - true_indices = np.argwhere(mask) - mask_array = np.zeros_like(mask, dtype=bool) - for idx in true_indices: - padded_slice = tuple( - slice(max(0, i - padding), i + padding + 1) for i in idx - ) - mask_array[padded_slice] = True - new_mask = (mask_array * 255).astype(np.uint8) - # new_mask - return new_mask - - @prompts( - name="Remove Something From The Photo", - description=( - "useful when you want to remove and object or something from the photo " - "from its description or location. " - "The input to this tool should be a comma separated string of two, " - "representing the image_path and the object need to be removed. " - ), - ) - def inference_remove(self, inputs): - image_path, to_be_removed_txt = inputs.split(",")[0], ",".join( - inputs.split(",")[1:] - ) - return self.inference_replace_sam( - f"{image_path},{to_be_removed_txt},background" - ) - - @prompts( - name="Replace Something From The Photo", - description=( - "useful when you want to replace an object from the object description or" - " location with another object from its description. The input to this tool" - " should be a comma separated string of three, representing the image_path," - " the object to be replaced, the object to be replaced with " - ), - ) - def inference_replace_sam(self, inputs): - image_path, to_be_replaced_txt, replace_with_txt = inputs.split(",") - - print(f"image_path={image_path}, to_be_replaced_txt={to_be_replaced_txt}") - image_pil, image = self.grounding.load_image(image_path) - boxes_filt, pred_phrases = self.grounding.get_grounding_boxes( - image, to_be_replaced_txt - ) - image = cv2.imread(image_path) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - self.sam.sam_predictor.set_image(image) - masks = self.sam.get_mask_with_boxes(image_pil, image, boxes_filt) - mask = torch.sum(masks, dim=0).unsqueeze(0) - mask = torch.where(mask > 0, True, False) - mask = mask.squeeze(0).squeeze(0).cpu() # tensor - - mask = self.pad_edge(mask, padding=20) # numpy - mask_image = Image.fromarray(mask) - - updated_image = self.inpaint( - prompt=replace_with_txt, image=image_pil, mask_image=mask_image - ) - updated_image_path = get_new_image_name( - image_path, func_name="replace-something" - ) - updated_image = updated_image.resize(image_pil.size) - updated_image.save(updated_image_path) - print( - f"\nProcessed ImageEditing, Input Image: {image_path}, Replace" - f" {to_be_replaced_txt} to {replace_with_txt}, Output Image:" - f" {updated_image_path}" - ) - return updated_image_path - - -class BackgroundRemoving: - """ - using to remove the background of the given picture - """ - - template_model = True - - def __init__( - self, - VisualQuestionAnswering: VisualQuestionAnswering, - Text2Box: Text2Box, - Segmenting: Segmenting, - ): - self.vqa = VisualQuestionAnswering - self.obj_segmenting = ObjectSegmenting(Text2Box, Segmenting) - - @prompts( - name="Remove the background", - description=( - "useful when you want to extract the object or remove the background," - "the input should be a string image_path" - ), - ) - def inference(self, image_path): - """ - given a image, return the picture only contains the extracted main object - """ - updated_image_path = None - - mask = self.get_mask(image_path) - - image = Image.open(image_path) - mask = Image.fromarray(mask) - image.putalpha(mask) - - updated_image_path = get_new_image_name( - image_path, func_name="detect-something" - ) - image.save(updated_image_path) - - return updated_image_path - - def get_mask(self, image_path): - """ - Description: - given an image path, return the mask of the main object. - Args: - image_path (string): the file path of the image - Outputs: - mask (numpy.ndarray): H x W - """ - vqa_input = f"{image_path}, what is the main object in the image?" - text_prompt = self.vqa.inference(vqa_input) - - mask = self.obj_segmenting.get_mask(image_path, text_prompt) - - return mask - - -class MultiModalVisualAgent: - def __init__( - self, - load_dict, - prefix: str = VISUAL_AGENT_PREFIX, - format_instructions: str = VISUAL_AGENT_FORMAT_INSTRUCTIONS, - suffix: str = VISUAL_AGENT_SUFFIX, - ): - print(f"Initializing MultiModalVisualAgent, load_dict={load_dict}") - - if "ImageCaptioning" not in load_dict: - raise ValueError( - "You have to load ImageCaptioning as a basic function for" - " MultiModalVisualAgent" - ) - - self.models = {} - - for class_name, device in load_dict.items(): - self.models[class_name] = globals()[class_name](device=device) - - for class_name, module in globals().items(): - if getattr(module, "template_model", False): - template_required_names = { - k - for k in inspect.signature(module.__init__).parameters.keys() - if k != "self" - } - - loaded_names = set([type(e).__name__ for e in self.models.values()]) - - if template_required_names.issubset(loaded_names): - self.models[class_name] = globals()[class_name]( - **{name: self.models[name] for name in template_required_names} - ) - - print(f"All the Available Functions: {self.models}") - - self.tools = [] - for instance in self.models.values(): - for e in dir(instance): - if e.startswith("inference"): - func = getattr(instance, e) - self.tools.append( - Tool(name=func.name, description=func.description, func=func) - ) - - self.llm = OpenAI(temperature=0) - self.memory = ConversationBufferMemory( - memory_key="chat_history", output_key="output" - ) - - def init_agent(self, lang): - self.memory.clear() - - agent_prefix = self.prefix - agent_suffix = self.suffix - agent_format_instructions = self.format_instructions - - if lang == "English": - PREFIX, FORMAT_INSTRUCTIONS, SUFFIX = ( - agent_prefix, - agent_format_instructions, - agent_suffix, - ) - else: - PREFIX, FORMAT_INSTRUCTIONS, SUFFIX = ( - VISUAL_AGENT_PREFIX_CN, - VISUAL_AGENT_FORMAT_INSTRUCTIONS_CN, - VISUAL_AGENT_SUFFIX_CN, - ) - - self.agent = initialize_agent( - self.tools, - self.llm, - agent="conversational-react-description", - verbose=True, - memory=self.memory, - return_intermediate_steps=True, - agent_kwargs={ - "prefix": PREFIX, - "format_instructions": FORMAT_INSTRUCTIONS, - "suffix": SUFFIX, - }, - ) - - def run_text(self, text): - self.agent.memory.buffer = cut_dialogue_history( - self.agent.memory.buffer, keep_last_n_words=500 - ) - - res = self.agent({"input": text.strip()}) - res["output"] = res["output"].replace("\\", "/") - response = re.sub( - "(image/[-\w]*.png)", - lambda m: f"![](file={m.group(0)})*{m.group(0)}*", - res["output"], - ) - - print( - f"\nProcessed run_text, Input text: {text}\n" - f"Current Memory: {self.agent.memory.buffer}" - ) - - return response - - def run_image(self, image, lang): - image_filename = os.path.join("image", f"{str(uuid.uuid4())[:8]}.png") - - img = Image.open(image) - width, height = img.size - ratio = min(512 / width, 512 / height) - - width_new, height_new = (round(width * ratio), round(height * ratio)) - width_new = int(np.round(width_new / 64.0)) * 64 - height_new = int(np.round(height_new / 64.0)) * 64 - - img = img.resize((width_new, height_new)) - img = img.convert("RGB") - img.save(image_filename, "PNG") - - description = self.models["ImageCaptioning"].inference(image_filename) - - if lang == "Chinese": - Human_prompt = ( - f"\nHuman: 提供一张名为 {image_filename}的图片。它的描述是:" - f" {description}。 这些信息帮助你理解这个图像," - "但是你应该使用工具来完成下面的任务,而不是直接从我的描述中想象。" - ' 如果你明白了, 说 "收到". \n' - ) - AI_prompt = "收到。 " - else: - Human_prompt = ( - f"\nHuman: provide a figure named {image_filename}. The description is:" - f" {description}. This information helps you to understand this image," - " but you should use tools to finish following tasks, rather than" - " directly imagine from my description. If you understand, say" - ' "Received". \n' - ) - AI_prompt = "Received. " - - self.agent.memory.buffer = ( - self.agent.memory.buffer + Human_prompt + "AI: " + AI_prompt - ) - - print( - f"\nProcessed run_image, Input image: {image_filename}\n" - f"Current Memory: {self.agent.memory.buffer}" - ) - - return AI_prompt - - def clear_memory(self): - self.memory.clear() - - -# usage - - -class MultiModalAgent: - """ - A user-friendly abstraction over the MultiModalVisualAgent that provides a simple interface - to process both text and images. - - Initializes the MultiModalAgent. - - Architecture: - - - Parameters: - load_dict (dict, optional): Dictionary of class names and devices to load. - Defaults to a basic configuration. - - temperature (float, optional): Temperature for the OpenAI model. Defaults to 0. - - default_language (str, optional): Default language for the agent. - Defaults to "English". - - Usage - -------------- - For chats: - ------------ - agent = MultiModalAgent() - agent.chat("Hello") - - ----------- - - Or just with text - ------------ - agent = MultiModalAgent() - agent.run_text("Hello") - - - """ - - def __init__(self, load_dict, temperature: int = 0.1, language: str = "english"): - self.load_dict = load_dict - self.temperature = temperature - self.langigage = language - - # if load_dict is None: - # self.load_dict = { - # "ImageCaptioning": "default_device" - # } - - self.agent = MultiModalVisualAgent(load_dict, temperature) - self.language = language - self.history = [] - - def run_text(self, text: str = None, language="english"): - """Run text through the model""" - - if language is None: - language = self.language - - try: - self.agent.init_agent(language) - return self.agent.run_text(text) - except Exception as e: - return f"Error processing text: {str(e)}" - - def run_img(self, image_path: str, language="english"): - """If language is None""" - if language is None: - language = self.default_language - - try: - return self.agent.run_image(image_path, language) - except Exception as error: - return f"Error processing image: {str(error)}" - - def chat(self, msg: str = None, language: str = "english", streaming: bool = False): - """ - Run chat with the multi-modal agent - - Args: - msg (str, optional): Message to send to the agent. Defaults to None. - language (str, optional): Language to use. Defaults to None. - streaming (bool, optional): Whether to stream the response. Defaults to False. - - Returns: - str: Response from the agent - - Usage: - -------------- - agent = MultiModalAgent() - agent.chat("Hello") - - """ - if language is None: - language = self.default_language - - # add users message to the history - self.history.append(Message("User", msg)) - - # process msg - try: - self.agent.init_agent(language) - response = self.agent.run_text(msg) - - # add agent's response to the history - self.history.append(Message("Agent", response)) - - # if streaming is = True - if streaming: - return self._stream_response(response) - else: - response - - except Exception as error: - error_message = f"Error processing message: {str(error)}" - - # add error to history - self.history.append(Message("Agent", error_message)) - return error_message - - def _stream_response(self, response: str = None): - """ - Yield the response token by token (word by word) - - Usage: - -------------- - for token in _stream_response(response): - print(token) - - """ - for token in response.split(): - yield token - - def clear(self): - """Clear agent's memory""" - try: - self.agent.clear_memory() - except Exception as e: - return f"Error cleaning memory: {str(e)}" diff --git a/swarms/agents/neural_architecture_search_worker.py b/swarms/agents/neural_architecture_search_worker.py deleted file mode 100644 index fd253b951..000000000 --- a/swarms/agents/neural_architecture_search_worker.py +++ /dev/null @@ -1,12 +0,0 @@ -"""The Replicator""" - - -class Replicator: - def __init__( - self, - model_name, - ): - pass - - def run(self, task): - pass diff --git a/swarms/agents/operations_agent.py b/swarms/agents/operations_agent.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/swarms/agents/refiner_agent.py b/swarms/agents/refiner_agent.py deleted file mode 100644 index 2a1383e99..000000000 --- a/swarms/agents/refiner_agent.py +++ /dev/null @@ -1,9 +0,0 @@ -class PromptRefiner: - def __init__(self, system_prompt: str, llm): - super().__init__() - self.system_prompt = system_prompt - self.llm = llm - - def run(self, task: str): - refine = self.llm(f"System Prompt: {self.system_prompt} Current task: {task}") - return refine diff --git a/swarms/agents/simple_agent.py b/swarms/agents/simple_agent.py deleted file mode 100644 index 88327095b..000000000 --- a/swarms/agents/simple_agent.py +++ /dev/null @@ -1,37 +0,0 @@ -from termcolor import colored - - -class SimpleAgent: - """ - Simple Agent is a simple agent that runs a flow. - - Args: - name (str): Name of the agent - flow (Flow): Flow to run - - Example: - >>> from swarms.agents.simple_agent import SimpleAgent - >>> from swarms.structs import Flow - >>> from swarms.models import OpenAIChat - >>> api_key = "" - >>> llm = OpenAIChat() - - """ - - def __init__( - self, - name: str, - flow, - ): - self.name = name - self.flow = flow - self.message_history = [] - - def run(self, task: str) -> str: - """Run method""" - metrics = print(colored(f"Agent {self.name} is running task: {task}", "red")) - print(metrics) - - response = self.flow.run(task) - self.message_history.append((self.name, response)) - return response diff --git a/swarms/chunkers/omni_chunker.py b/swarms/chunkers/omni_chunker.py index 70a113804..a858a9e8e 100644 --- a/swarms/chunkers/omni_chunker.py +++ b/swarms/chunkers/omni_chunker.py @@ -17,7 +17,6 @@ from typing import List, Optional, Callable from termcolor import colored import os -import sys @dataclass diff --git a/swarms/models/__init__.py b/swarms/models/__init__.py index 04872e066..539e88322 100644 --- a/swarms/models/__init__.py +++ b/swarms/models/__init__.py @@ -3,7 +3,7 @@ from swarms.models.petals import Petals from swarms.models.mistral import Mistral -# from swarms.models.openai_models import OpenAI, AzureOpenAI, OpenAIChat +from swarms.models.openai_models import OpenAI, AzureOpenAI, OpenAIChat from swarms.models.zephyr import Zephyr from swarms.models.biogpt import BioGPT from swarms.models.huggingface import HuggingfaceLLM @@ -31,9 +31,9 @@ "Anthropic", "Petals", "Mistral", - # "OpenAI", - # "AzureOpenAI", - # "OpenAIChat", + "OpenAI", + "AzureOpenAI", + "OpenAIChat", "Zephyr", "Idefics", "Kosmos", @@ -44,7 +44,7 @@ "HuggingfaceLLM", "MPT7B", "WizardLLMStoryTeller", - # "GPT4Vision", - # "Dalle3", - # "Fuyu", + "GPT4Vision", + "Dalle3", + "Fuyu", ] diff --git a/swarms/models/base.py b/swarms/models/base.py index 32a45c438..4e92ae458 100644 --- a/swarms/models/base.py +++ b/swarms/models/base.py @@ -52,7 +52,7 @@ def _num_tokens(self, text: str) -> int: def _time_for_generation(self, task: str) -> float: """Time for Generation""" self.start_time = time.time() - output = self.run(task) + self.run(task) self.end_time = time.time() return self.end_time - self.start_time diff --git a/swarms/models/dalle3.py b/swarms/models/dalle3.py index dba633968..45b80480b 100644 --- a/swarms/models/dalle3.py +++ b/swarms/models/dalle3.py @@ -4,7 +4,7 @@ import uuid from dataclasses import dataclass from io import BytesIO -from typing import List, Optional +from typing import List import backoff import openai diff --git a/swarms/models/distilled_whisperx.py b/swarms/models/distilled_whisperx.py index 0a60aaacb..98b3660a9 100644 --- a/swarms/models/distilled_whisperx.py +++ b/swarms/models/distilled_whisperx.py @@ -121,7 +121,7 @@ def real_time_transcribe(self, audio_file_path, chunk_duration=5): # Load the whole audio file, but process and transcribe it in chunks audio_input = self.processor.audio_file_to_array(audio_file_path) sample_rate = audio_input.sampling_rate - total_duration = len(audio_input.array) / sample_rate + len(audio_input.array) / sample_rate chunks = [ audio_input.array[i : i + sample_rate * chunk_duration] for i in range( diff --git a/swarms/models/fastvit.py b/swarms/models/fastvit.py index a2d6bc0a6..d04787773 100644 --- a/swarms/models/fastvit.py +++ b/swarms/models/fastvit.py @@ -2,7 +2,6 @@ import os from typing import List -import numpy as np import timm import torch from PIL import Image diff --git a/swarms/models/kosmos2.py b/swarms/models/kosmos2.py index 12d5638ab..2178c4833 100644 --- a/swarms/models/kosmos2.py +++ b/swarms/models/kosmos2.py @@ -1,6 +1,5 @@ from typing import List, Tuple -import numpy as np from PIL import Image from pydantic import BaseModel, root_validator, validator from transformers import AutoModelForVision2Seq, AutoProcessor diff --git a/swarms/models/openai_chat.py b/swarms/models/openai_chat.py index 3933d8a79..546f3509b 100644 --- a/swarms/models/openai_chat.py +++ b/swarms/models/openai_chat.py @@ -1,4 +1,3 @@ -"""OpenAI chat wrapper.""" from __future__ import annotations import logging diff --git a/swarms/models/timm.py b/swarms/models/timm.py index 5d9b965a7..9947ec7bb 100644 --- a/swarms/models/timm.py +++ b/swarms/models/timm.py @@ -2,7 +2,7 @@ import timm import torch -from pydantic import BaseModel, conlist +from pydantic import BaseModel class TimmModelInfo(BaseModel): diff --git a/swarms/models/trocr.py b/swarms/models/trocr.py index f4a4156dc..0aab29991 100644 --- a/swarms/models/trocr.py +++ b/swarms/models/trocr.py @@ -4,9 +4,6 @@ """ -from transformers import TrOCRProcessor, VisionEncoderDecoderModel -from PIL import Image -import requests class TrOCR: diff --git a/swarms/prompts/__init__.py b/swarms/prompts/__init__.py index 511cf2112..b087a1a40 100644 --- a/swarms/prompts/__init__.py +++ b/swarms/prompts/__init__.py @@ -4,3 +4,14 @@ from swarms.prompts.legal_agent_prompt import LEGAL_AGENT_PROMPT from swarms.prompts.operations_agent_prompt import OPERATIONS_AGENT_PROMPT from swarms.prompts.product_agent_prompt import PRODUCT_AGENT_PROMPT + + + +__all__ = [ + "CODE_INTERPRETER", + "FINANCE_AGENT_PROMPT", + "GROWTH_AGENT_PROMPT", + "LEGAL_AGENT_PROMPT", + "OPERATIONS_AGENT_PROMPT", + "PRODUCT_AGENT_PROMPT", +] diff --git a/swarms/prompts/autobloggen.py b/swarms/prompts/autobloggen.py index 121c2cf8e..64001d1da 100644 --- a/swarms/prompts/autobloggen.py +++ b/swarms/prompts/autobloggen.py @@ -1,4 +1,4 @@ -AUTOBLOG_GEN_GENERATOR = f""" +AUTOBLOG_GEN_GENERATOR = """ First search for a list of topics on the web based their relevance to Positive Med's long term vision then rank than based on the goals this month, then output a single headline title for a blog for the next autonomous agent to write the blog, utilize the SOP below to help you strategically select topics. Output a single topic that will be the foundation for a blog. @@ -238,7 +238,7 @@ # Agent that generates blogs -DRAFT_AGENT_SYSTEM_PROMPT = f""" +DRAFT_AGENT_SYSTEM_PROMPT = """ Write a 5,000+ word long narrative essay on the highest rated topic from a list of topics for positivemed.com, their vision is: to democratize health wisdom to modern young professionals in a healthy and conversational and friendly manner, diff --git a/swarms/prompts/chat_prompt.py b/swarms/prompts/chat_prompt.py index b0330e249..01f66a5b0 100644 --- a/swarms/prompts/chat_prompt.py +++ b/swarms/prompts/chat_prompt.py @@ -1,9 +1,8 @@ from __future__ import annotations from abc import abstractmethod -from typing import Any, Dict, List, Sequence +from typing import Dict, List, Sequence -from pydantic import Field class Message: diff --git a/swarms/structs/flow.py b/swarms/structs/flow.py index 17a3fe2c9..8fcd1d4da 100644 --- a/swarms/structs/flow.py +++ b/swarms/structs/flow.py @@ -19,7 +19,6 @@ from termcolor import colored import inspect import random -from swarms.tools.tool import BaseTool # Prompts DYNAMIC_STOP_PROMPT = """ @@ -299,7 +298,7 @@ def print_dashboard(self, task: str): model_config = self.get_llm_init_params() print(colored("Initializing Agent Dashboard...", "yellow")) - dashboard = print( + print( colored( f""" Flow Dashboard diff --git a/swarms/swarms/autobloggen.py b/swarms/swarms/autobloggen.py index 6c3c6bf1c..5a8702695 100644 --- a/swarms/swarms/autobloggen.py +++ b/swarms/swarms/autobloggen.py @@ -1,5 +1,3 @@ -import concurrent.futures -import os from termcolor import colored diff --git a/swarms/swarms/dialogue_simulator.py b/swarms/swarms/dialogue_simulator.py index 6eec2aa97..8ceddef4b 100644 --- a/swarms/swarms/dialogue_simulator.py +++ b/swarms/swarms/dialogue_simulator.py @@ -1,4 +1,3 @@ -from typing import List class DialogueSimulator: diff --git a/swarms/swarms/multi_agent_debate.py b/swarms/swarms/multi_agent_debate.py index 4bba3619e..45b25f595 100644 --- a/swarms/swarms/multi_agent_debate.py +++ b/swarms/swarms/multi_agent_debate.py @@ -1,4 +1,3 @@ -from typing import List, Callable # Define a selection function diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 92073d67e..da323121f 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -2,3 +2,10 @@ from swarms.utils.futures import execute_futures_dict from swarms.utils.code_interpreter import SubprocessCodeInterpreter from swarms.utils.parse_code import extract_code_in_backticks_in_string + +__all__ = [ + "display_markdown_message", + "execute_futures_dict", + "SubprocessCodeInterpreter", + "extract_code_in_backticks_in_string", +] diff --git a/swarms/workers/__init__.py b/swarms/workers/__init__.py index 9dabe94de..dc72e5656 100644 --- a/swarms/workers/__init__.py +++ b/swarms/workers/__init__.py @@ -1,2 +1 @@ # from swarms.workers.worker import Worker -from swarms.workers.base import AbstractWorker diff --git a/tests/models/cohere.py b/tests/models/cohere.py index 17bc2ddc0..9c85d795b 100644 --- a/tests/models/cohere.py +++ b/tests/models/cohere.py @@ -2,9 +2,7 @@ from unittest.mock import Mock, patch import pytest -from cohere.models.response import GenerationChunk from dotenv import load_dotenv - from swarms.models.cohere_chat import BaseCohere, Cohere # Load the environment variables @@ -17,6 +15,66 @@ def cohere_instance(): return Cohere(cohere_api_key=api_key) + +def test_cohere_custom_configuration(cohere_instance): + # Test customizing Cohere configurations + cohere_instance.model = "base" + cohere_instance.temperature = 0.5 + cohere_instance.max_tokens = 100 + cohere_instance.k = 1 + cohere_instance.p = 0.8 + cohere_instance.frequency_penalty = 0.2 + cohere_instance.presence_penalty = 0.4 + response = cohere_instance("Customize configurations.") + assert isinstance(response, str) + + +def test_cohere_api_error_handling(cohere_instance): + # Test error handling when the API key is invalid + cohere_instance.model = "base" + cohere_instance.cohere_api_key = "invalid-api-key" + with pytest.raises(Exception): + cohere_instance("Error handling with invalid API key.") + + +def test_cohere_async_api_error_handling(cohere_instance): + # Test async error handling when the API key is invalid + cohere_instance.model = "base" + cohere_instance.cohere_api_key = "invalid-api-key" + with pytest.raises(Exception): + cohere_instance.async_call("Error handling with invalid API key.") + + +def test_cohere_stream_api_error_handling(cohere_instance): + # Test error handling in streaming mode when the API key is invalid + cohere_instance.model = "base" + cohere_instance.cohere_api_key = "invalid-api-key" + with pytest.raises(Exception): + generator = cohere_instance.stream("Error handling with invalid API key.") + for token in generator: + pass + + +def test_cohere_streaming_mode(cohere_instance): + # Test the streaming mode for large text generation + cohere_instance.model = "base" + cohere_instance.streaming = True + prompt = "Generate a lengthy text using streaming mode." + generator = cohere_instance.stream(prompt) + for token in generator: + assert isinstance(token, str) + + +def test_cohere_streaming_mode_async(cohere_instance): + # Test the async streaming mode for large text generation + cohere_instance.model = "base" + cohere_instance.streaming = True + prompt = "Generate a lengthy text using async streaming mode." + async_generator = cohere_instance.async_stream(prompt) + for token in async_generator: + assert isinstance(token, str) + + def test_cohere_wrap_prompt(cohere_instance): prompt = "What is the meaning of life?" wrapped_prompt = cohere_instance._wrap_prompt(prompt) @@ -210,7 +268,7 @@ def test_cohere_call_with_embed_multilingual_v3_model(cohere_instance): def test_cohere_call_with_invalid_model(cohere_instance): cohere_instance.model = "invalid-model" with pytest.raises(ValueError): - response = cohere_instance("Translate to French.") + cohere_instance("Translate to French.") def test_cohere_call_with_long_prompt(cohere_instance): @@ -223,7 +281,7 @@ def test_cohere_call_with_max_tokens_limit_exceeded(cohere_instance): cohere_instance.max_tokens = 10 prompt = "This is a test prompt that will exceed the max tokens limit." with pytest.raises(ValueError): - response = cohere_instance(prompt) + cohere_instance(prompt) def test_cohere_stream_with_command_model(cohere_instance): @@ -346,64 +404,6 @@ def test_cohere_async_stream_with_embed_multilingual_v3_model(cohere_instance): assert isinstance(token, str) -def test_cohere_custom_configuration(cohere_instance): - # Test customizing Cohere configurations - cohere_instance.model = "base" - cohere_instance.temperature = 0.5 - cohere_instance.max_tokens = 100 - cohere_instance.k = 1 - cohere_instance.p = 0.8 - cohere_instance.frequency_penalty = 0.2 - cohere_instance.presence_penalty = 0.4 - response = cohere_instance("Customize configurations.") - assert isinstance(response, str) - - -def test_cohere_api_error_handling(cohere_instance): - # Test error handling when the API key is invalid - cohere_instance.model = "base" - cohere_instance.cohere_api_key = "invalid-api-key" - with pytest.raises(Exception): - response = cohere_instance("Error handling with invalid API key.") - - -def test_cohere_async_api_error_handling(cohere_instance): - # Test async error handling when the API key is invalid - cohere_instance.model = "base" - cohere_instance.cohere_api_key = "invalid-api-key" - with pytest.raises(Exception): - response = cohere_instance.async_call("Error handling with invalid API key.") - - -def test_cohere_stream_api_error_handling(cohere_instance): - # Test error handling in streaming mode when the API key is invalid - cohere_instance.model = "base" - cohere_instance.cohere_api_key = "invalid-api-key" - with pytest.raises(Exception): - generator = cohere_instance.stream("Error handling with invalid API key.") - for token in generator: - pass - - -def test_cohere_streaming_mode(cohere_instance): - # Test the streaming mode for large text generation - cohere_instance.model = "base" - cohere_instance.streaming = True - prompt = "Generate a lengthy text using streaming mode." - generator = cohere_instance.stream(prompt) - for token in generator: - assert isinstance(token, str) - - -def test_cohere_streaming_mode_async(cohere_instance): - # Test the async streaming mode for large text generation - cohere_instance.model = "base" - cohere_instance.streaming = True - prompt = "Generate a lengthy text using async streaming mode." - async_generator = cohere_instance.async_stream(prompt) - for token in async_generator: - assert isinstance(token, str) - def test_cohere_representation_model_embedding(cohere_instance): # Test using the Representation model for text embedding @@ -435,7 +435,7 @@ def test_cohere_representation_model_max_tokens_limit_exceeded(cohere_instance): cohere_instance.max_tokens = 10 prompt = "This is a test prompt that will exceed the max tokens limit." with pytest.raises(ValueError): - embedding = cohere_instance.embed(prompt) + cohere_instance.embed(prompt) # Add more production-grade test cases based on real-world scenarios @@ -475,7 +475,7 @@ def test_cohere_representation_model_multilingual_max_tokens_limit_exceeded( cohere_instance.max_tokens = 10 prompt = "This is a test prompt that will exceed the max tokens limit for multilingual model." with pytest.raises(ValueError): - embedding = cohere_instance.embed(prompt) + cohere_instance.embed(prompt) def test_cohere_representation_model_multilingual_light_embedding(cohere_instance): @@ -514,7 +514,7 @@ def test_cohere_representation_model_multilingual_light_max_tokens_limit_exceede cohere_instance.max_tokens = 10 prompt = "This is a test prompt that will exceed the max tokens limit for multilingual light model." with pytest.raises(ValueError): - embedding = cohere_instance.embed(prompt) + cohere_instance.embed(prompt) def test_cohere_command_light_model(cohere_instance): @@ -570,7 +570,7 @@ def test_cohere_representation_model_english_max_tokens_limit_exceeded(cohere_in "This is a test prompt that will exceed the max tokens limit for English model." ) with pytest.raises(ValueError): - embedding = cohere_instance.embed(prompt) + cohere_instance.embed(prompt) def test_cohere_representation_model_english_light_embedding(cohere_instance): @@ -607,7 +607,7 @@ def test_cohere_representation_model_english_light_max_tokens_limit_exceeded( cohere_instance.max_tokens = 10 prompt = "This is a test prompt that will exceed the max tokens limit for English light model." with pytest.raises(ValueError): - embedding = cohere_instance.embed(prompt) + cohere_instance.embed(prompt) def test_cohere_command_model(cohere_instance): @@ -624,18 +624,7 @@ def test_cohere_invalid_model(cohere_instance): # Test using an invalid model name cohere_instance.model = "invalid-model" with pytest.raises(ValueError): - response = cohere_instance("Generate text using an invalid model.") - - -def test_cohere_streaming_generation(cohere_instance): - # Test streaming generation with the Command model - cohere_instance.model = "command" - prompt = "Generate text using streaming." - chunks = list(cohere_instance.stream(prompt)) - assert isinstance(chunks, list) - assert len(chunks) > 0 - assert all(isinstance(chunk, GenerationChunk) for chunk in chunks) - + cohere_instance("Generate text using an invalid model.") def test_cohere_base_model_generation_with_max_tokens(cohere_instance): # Test generating text using the base model with a specified max_tokens limit