Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Commit

Permalink
added orchestrator framework
Browse files Browse the repository at this point in the history
  • Loading branch information
dudizimber committed Jul 8, 2024
1 parent f119a72 commit 5ab9eda
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 0 deletions.
13 changes: 13 additions & 0 deletions falkordb_gemini_kg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
from .kg import KnowledgeGraph
from .classes.model_config import KnowledgeGraphModelConfig
from .steps.create_ontology_step import CreateOntologyStep
from .classes.agent import Agent
from .classes.orchestrator import Orchestrator
from .classes.orchestrator_runner import OrchestratorRunner
from .models.model import (
GenerativeModel,
GenerationResponse,
GenerativeModelChatSession,
GenerativeModelConfig,
FinishReason,
)
from .classes.node import Node
from .classes.edge import Edge
from .classes.attribute import Attribute, AttributeType

# Setup Null handler
import logging
Expand Down
21 changes: 21 additions & 0 deletions falkordb_gemini_kg/classes/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from falkordb_gemini_kg.kg import KnowledgeGraph


class Agent:

def __init__(self, id: str, kg: KnowledgeGraph, introduction: str):
self.id = id
self._kg = kg
self._introduction = introduction

def ask(self, question: str):
return self._kg.ask(question)

def to_orchestrator(self):
return f"""
---
Agent ID: {self.id}
Knowledge Graph Name: {self._kg.name}
Introduction: {self._introduction}
"""
94 changes: 94 additions & 0 deletions falkordb_gemini_kg/classes/execution_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from json import loads


class StepBlockType:
PARALLEL = "parallel"
PROMPT_AGENT = "prompt_agent"
SUMMARY = "summary"

@staticmethod
def from_str(text: str) -> "StepBlockType":
if text == StepBlockType.PARALLEL:
return StepBlockType.PARALLEL
elif text == StepBlockType.PROMPT_AGENT:
return StepBlockType.PROMPT_AGENT
elif text == StepBlockType.SUMMARY:
return StepBlockType.SUMMARY


class PromptAgentProperties:
agent_id: str
prompt: str
response: str | None = None

@staticmethod
def from_json(json: dict) -> "PromptAgentProperties":
return PromptAgentProperties(
json["agent_id"], json["prompt"], json.get("response", None)
)

def to_json(self) -> dict:
return {
"agent_id": self.agent_id,
"prompt": self.prompt,
"response": self.response,
}


class ParallelProperties:
steps: list["PlanStep"]

@staticmethod
def from_json(json: dict) -> "ParallelProperties":
return ParallelProperties([PlanStep.from_json(step) for step in json["steps"]])

def to_json(self) -> dict:
return {"steps": [step.to_json() for step in self.steps]}


class PlanStep:
id: str
block: StepBlockType
properties: PromptAgentProperties | ParallelProperties

@staticmethod
def from_json(json: dict) -> "PlanStep":
block = StepBlockType.from_str(json["block"])
if block == StepBlockType.PROMPT_AGENT:
properties = PromptAgentProperties.from_json(json["properties"])
elif block == StepBlockType.PARALLEL:
properties = ParallelProperties.from_json(json["properties"])
else:
raise ValueError(f"Unknown block type: {block}")
return PlanStep(json["id"], block, properties)

def to_json(self) -> dict:
return {
"id": self.id,
"block": self.block,
"properties": self.properties.to_json(),
}


class ExecutionPlan:

_steps = []

@staticmethod
def from_json(json: str | dict) -> "ExecutionPlan":
if isinstance(json, str):
json = loads(json)
return ExecutionPlan([PlanStep.from_json(step) for step in json["steps"]])

def find_step(self, step_id: str) -> PlanStep:
for step in self._steps:
if step.id == step_id:
return step
if step.block == StepBlockType.PARALLEL:
for sub_step in step.properties.steps:
if sub_step.id == step_id:
return sub_step
raise ValueError(f"Step with id {step_id} not found")

def to_json(self) -> dict:
return {"steps": [step.to_json() for step in self._steps]}
48 changes: 48 additions & 0 deletions falkordb_gemini_kg/classes/orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from falkordb_gemini_kg.models import GenerativeModel
from falkordb_gemini_kg.classes.agent import Agent
from falkordb_gemini_kg.classes.orchestrator_runner import OrchestratorRunner
from falkordb_gemini_kg.fixtures.prompts import (
ORCHESTRATOR_SYSTEM,
ORCHESTRATOR_EXECUTION_PLAN_PROMPT,
)
from falkordb_gemini_kg.helpers import extract_json
from falkordb_gemini_kg.classes.execution_plan import (
ExecutionPlan,
PlanStep,
StepBlockType,
)


class Orchestrator:

_agents = []
_chat = None

def __init__(self, model: GenerativeModel):
self._model = model

def register_agent(self, agent: Agent):
self._agents.append(agent)

def ask(self, question: str):

self._chat = self._model.with_system_instruction(
ORCHESTRATOR_SYSTEM.replace(
"#AGENTS", ",".join([agent.to_orchestrator() for agent in self._agents])
)
).start_chat({"response_validation": False})

plan = self._create_execution_plan(question)

runner = OrchestratorRunner(self._chat, self._agents, plan)

return runner

def _create_execution_plan(self, question: str):
response = self._chat.send_message(
ORCHESTRATOR_EXECUTION_PLAN_PROMPT.replace("#QUESTION", question)
)

plan = ExecutionPlan.from_json(extract_json(response))

return plan
64 changes: 64 additions & 0 deletions falkordb_gemini_kg/classes/orchestrator_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from falkordb_gemini_kg.classes.agent import Agent
from falkordb_gemini_kg.models import GenerativeModelChatSession
from falkordb_gemini_kg.classes.execution_plan import (
ExecutionPlan,
PlanStep,
StepBlockType,
)
from concurrent.futures import ThreadPoolExecutor, wait
from falkordb_gemini_kg.fixtures.prompts import ORCHESTRATOR_SUMMARY_PROMPT


class OrchestratorRunner:

def __init__(
self,
chat: GenerativeModelChatSession,
agents: list[Agent],
plan: ExecutionPlan,
config: dict = {
"max_workers": 16,
},
):
self._chat = chat
self._agents = agents
self._plan = plan
self._config = config

def _run(self):
for step in self._plan.steps:
self._run_step(step)

return self._run_summary()

def _run_summary(self):
return self._chat.send_message(
ORCHESTRATOR_SUMMARY_PROMPT.replace("#EXECUTION_PLAN", self._plan.to_json())
)

def _run_step(self, step: PlanStep):
if step.block == StepBlockType.PROMPT_AGENT:
return self._run_prompt_agent(step)
elif step.block == StepBlockType.PARALLEL:
return self._run_parallel(step)
else:
raise ValueError(f"Unknown block type: {step.block}")

def _run_prompt_agent(self, step: PlanStep):
agent = next(
agent for agent in self._agents if agent.id == step.properties.agent_id
)
response = agent.ask(step.properties.prompt)
step.properties.response = response

def _run_parallel(self, step: PlanStep):
tasks = []
with ThreadPoolExecutor(
max_workers=min(self._config["max_workers"], len(step.properties.steps))
) as executor:
for step in step.properties.steps:
tasks.append(executor.submit(self._run_step, step))

wait(tasks)

return [task.result() for task in tasks]
93 changes: 93 additions & 0 deletions falkordb_gemini_kg/fixtures/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,96 @@
Question: {question}
Helpful Answer:"""



ORCHESTRATOR_SYSTEM = """
You are an orchestrator agent that manages the flow of information between different agent, in order to provide a complete and accurate answer to the user's question.
You will receive a question that requires information from different agent to answer.
You will need to interact with different agents to get the necessary information to answer the question.
For that to happen in the most efficient way, you create an execution plan that will be performed by each agent.
Once all the steps are completed, you will receive a summary of the execution plan to generate the final answer to the user's question.
Do not include any explanations or apologies in your responses.
Do not respond to any questions that might ask anything else than orchestrating the information flow.
#AGENTS
"""

ORCHESTRATOR_EXECUTION_PLAN_PROMPT = """
Considering the provided list of agents, create an execution plan to answer the following question:
#QUESTION
The execution plan should be valid in the following JSON schema.
Do not include any explanations or apologies in your responses.
Do not respond to any questions that might ask anything else than orchestrating the information flow.
Only return the execution plan, enclosed in triple backticks.
Do not skip lines in order to save tokens.
```json
{
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string",
},
"block": {
"type": "string",
"enum": ["parallel", "prompt_agent", "summary"]
},
"properties": {
"type": "object",
"properties": {
"steps": {
"type": "array",
"description": "Steps to execute in parallel. Required if block is 'parallel'",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Agent ID to execute"
},
"block": {
"type": "string",
"enum": ["prompt_agent"]
},
"agent": {
"type": "string",
"description": "Agent ID to prompt"
},
"prompt": {
"type": "string",
"description": "Text to prompt the agent"
}
},
"required": ["id", "block", "agent", "prompt"]
}
},
"agent": {
"type": "string",
"description": "Agent ID to prompt. Required if block is 'prompt_agent'"
},
"prompt": {
"type": "string",
"description": "Text to prompt the agent. Required if block is 'prompt_agent'"
}
}
}
},
"required": ["id", "block"]
}
}
```
"""


ORCHESTRATOR_SUMMARY_PROMPT = """
Given the following execution plan and responses, generate the final answer to the user's question.
#EXECUTION_PLAN
"""

0 comments on commit 5ab9eda

Please sign in to comment.