Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Commit

Permalink
demo orchestrator
Browse files Browse the repository at this point in the history
agent abstraction [wip]
  • Loading branch information
dudizimber committed Jul 10, 2024
1 parent 0d32391 commit d2ba29f
Show file tree
Hide file tree
Showing 16 changed files with 970 additions and 55 deletions.
1 change: 0 additions & 1 deletion falkordb_gemini_kg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from .kg import KnowledgeGraph
from .classes.model_config import KnowledgeGraphModelConfig
from .steps.create_ontology_step import CreateOntologyStep
from .classes.agent import Agent
from .classes.orchestrator import Orchestrator
from .classes.orchestrator_runner import OrchestratorRunner
from .models.model import (
Expand Down
1 change: 1 addition & 0 deletions falkordb_gemini_kg/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .agent import Agent
22 changes: 22 additions & 0 deletions falkordb_gemini_kg/agents/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from falkordb_gemini_kg.kg import KnowledgeGraph


class Agent(object):

@property
def agent_id(self) -> str:
pass

@property
def introduction(self) -> str:
pass

@property
def _schema(self) -> list[dict]:
pass

def run(self, params: dict):
pass

def to_orchestrator(self):
pass
60 changes: 60 additions & 0 deletions falkordb_gemini_kg/agents/kg_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from falkordb_gemini_kg.kg import KnowledgeGraph
from .agent import Agent


class KGAgent(Agent):

_schema = [
{
"name": "prompt",
"type": "string",
"required": True,
"description": "The prompt to ask the agent.",
}
]

def __init__(self, agent_id: str, kg: KnowledgeGraph, introduction: str):
super().__init__()
self.agent_id = agent_id
self.introduction = introduction
self.kg = kg

@property
def agent_id(self) -> str:
return self._agent_id

@agent_id.setter
def agent_id(self, value):
self._agent_id = value

@property
def introduction(self) -> str:
return self._introduction

@introduction.setter
def introduction(self, value):
self._introduction = value

@property
def _schema(self) -> list[dict]:
return self._schema

@property
def kg(self) -> KnowledgeGraph:
return self._kg

@kg.setter
def kg(self, value):
self._kg = value

def run(self, params: dict):
return self._kg.ask(params["prompt"])

def to_orchestrator(self):
return f"""
---
Agent ID: {self.agent_id}
Knowledge Graph Name: {self._kg.name}
Introduction: {self.introduction}
"""
21 changes: 0 additions & 21 deletions falkordb_gemini_kg/classes/agent.py

This file was deleted.

42 changes: 34 additions & 8 deletions falkordb_gemini_kg/classes/execution_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@ def from_str(text: str) -> "StepBlockType":


class PromptAgentProperties:
agent_id: str
agent: str
prompt: str
response: str | None = None

def __init__(self, agent: str, prompt: str, response: str | None = None):
self.agent = agent
self.prompt = prompt
self.response = response

@staticmethod
def from_json(json: dict) -> "PromptAgentProperties":
return PromptAgentProperties(
json["agent_id"], json["prompt"], json.get("response", None)
json["agent"], json["prompt"], json.get("response", None)
)

def to_json(self) -> dict:
return {
"agent_id": self.agent_id,
"agent": self.agent,
"prompt": self.prompt,
"response": self.response,
}
Expand All @@ -38,6 +43,9 @@ def to_json(self) -> dict:
class ParallelProperties:
steps: list["PlanStep"]

def __init__(self, steps: list["PlanStep"]):
self.steps = steps

@staticmethod
def from_json(json: dict) -> "ParallelProperties":
return ParallelProperties([PlanStep.from_json(step) for step in json["steps"]])
Expand All @@ -51,13 +59,25 @@ class PlanStep:
block: StepBlockType
properties: PromptAgentProperties | ParallelProperties

def __init__(
self,
id: str,
block: StepBlockType,
properties: PromptAgentProperties | ParallelProperties,
):
self.id = id
self.block = block
self.properties = properties

@staticmethod
def from_json(json: dict) -> "PlanStep":
block = StepBlockType.from_str(json["block"])
if block == StepBlockType.PROMPT_AGENT:
properties = PromptAgentProperties.from_json(json["properties"])
elif block == StepBlockType.PARALLEL:
properties = ParallelProperties.from_json(json["properties"])
elif block == StepBlockType.SUMMARY:
properties = None
else:
raise ValueError(f"Unknown block type: {block}")
return PlanStep(json["id"], block, properties)
Expand All @@ -66,22 +86,25 @@ def to_json(self) -> dict:
return {
"id": self.id,
"block": self.block,
"properties": self.properties.to_json(),
"properties": self.properties.to_json() if self.properties else None,
}


class ExecutionPlan:

_steps = []
steps = []

def __init__(self, steps: list[PlanStep]):
self.steps = steps

@staticmethod
def from_json(json: str | dict) -> "ExecutionPlan":
if isinstance(json, str):
json = loads(json)
return ExecutionPlan([PlanStep.from_json(step) for step in json["steps"]])
return ExecutionPlan([PlanStep.from_json(step) for step in json])

def find_step(self, step_id: str) -> PlanStep:
for step in self._steps:
for step in self.steps:
if step.id == step_id:
return step
if step.block == StepBlockType.PARALLEL:
Expand All @@ -91,4 +114,7 @@ def find_step(self, step_id: str) -> PlanStep:
raise ValueError(f"Step with id {step_id} not found")

def to_json(self) -> dict:
return {"steps": [step.to_json() for step in self._steps]}
return {"steps": [step.to_json() for step in self.steps]}

def __str__(self) -> str:
return f"ExecutionPlan(steps={self.to_json()})"
11 changes: 9 additions & 2 deletions falkordb_gemini_kg/classes/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from falkordb_gemini_kg.models import GenerativeModel
from falkordb_gemini_kg.classes.agent import Agent
from falkordb_gemini_kg.agents import Agent
from falkordb_gemini_kg.classes.orchestrator_runner import OrchestratorRunner
from falkordb_gemini_kg.fixtures.prompts import (
ORCHESTRATOR_SYSTEM,
Expand All @@ -11,6 +11,9 @@
PlanStep,
StepBlockType,
)
import logging

logger = logging.getLogger(__name__)


class Orchestrator:
Expand Down Expand Up @@ -43,6 +46,10 @@ def _create_execution_plan(self, question: str):
ORCHESTRATOR_EXECUTION_PLAN_PROMPT.replace("#QUESTION", question)
)

plan = ExecutionPlan.from_json(extract_json(response))
logger.debug(f"Execution plan response: {response.text}")

plan = ExecutionPlan.from_json(extract_json(response.text))

logger.debug(f"Execution plan: {plan}")

return plan
29 changes: 20 additions & 9 deletions falkordb_gemini_kg/classes/orchestrator_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from falkordb_gemini_kg.classes.agent import Agent
from falkordb_gemini_kg.agents import Agent
from falkordb_gemini_kg.models import GenerativeModelChatSession
from falkordb_gemini_kg.classes.execution_plan import (
ExecutionPlan,
PlanStep,
StepBlockType,
)
from falkordb_gemini_kg.models.model import GenerationResponse
from concurrent.futures import ThreadPoolExecutor, wait
from falkordb_gemini_kg.fixtures.prompts import ORCHESTRATOR_SUMMARY_PROMPT
import logging

logger = logging.getLogger(__name__)


class OrchestratorRunner:
Expand All @@ -25,31 +29,38 @@ def __init__(
self._plan = plan
self._config = config

def _run(self):
for step in self._plan.steps:
def run(self) -> GenerationResponse:
for step in self._plan.steps[:-1]:
self._run_step(step)

return self._run_summary()
return self._run_step(self._plan.steps[-1])

def _run_summary(self):
logger.debug(f"Execution plan summary: {self._plan.to_json()}")
return self._chat.send_message(
ORCHESTRATOR_SUMMARY_PROMPT.replace("#EXECUTION_PLAN", self._plan.to_json())
ORCHESTRATOR_SUMMARY_PROMPT.replace(
"#EXECUTION_PLAN", str(self._plan.to_json())
)
)

def _run_step(self, step: PlanStep):
def _run_step(self, step: PlanStep) -> GenerationResponse:
if step.block == StepBlockType.PROMPT_AGENT:
return self._run_prompt_agent(step)
elif step.block == StepBlockType.PARALLEL:
return self._run_parallel(step)
self._run_parallel(step)
return None
elif step.block == StepBlockType.SUMMARY:
return self._run_summary()
else:
raise ValueError(f"Unknown block type: {step.block}")

def _run_prompt_agent(self, step: PlanStep):
agent = next(
agent for agent in self._agents if agent.id == step.properties.agent_id
agent for agent in self._agents if agent.agent_id == step.properties.agent
)
response = agent.ask(step.properties.prompt)
response = agent.run(step.properties.to_json())
step.properties.response = response
return response

def _run_parallel(self, step: PlanStep):
tasks = []
Expand Down
2 changes: 1 addition & 1 deletion falkordb_gemini_kg/classes/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
label: str,
source: _RelationEntity | str,
target: _RelationEntity | str,
attributes: list[Attribute],
attributes: list[Attribute] = [],
):

if isinstance(source, str):
Expand Down
Loading

0 comments on commit d2ba29f

Please sign in to comment.