forked from docker/genai-stack
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
146 lines (112 loc) · 3.51 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
from langchain.graphs import Neo4jGraph
from dotenv import load_dotenv
from utils import (
create_vector_index,
BaseLogger,
)
from chains import (
load_embedding_model,
load_llm,
configure_llm_only_chain,
configure_qa_rag_chain,
)
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from langchain.callbacks.base import BaseCallbackHandler
from threading import Thread
from queue import Queue, Empty
from collections.abc import Generator
from sse_starlette.sse import EventSourceResponse
from fastapi.middleware.cors import CORSMiddleware
import json
load_dotenv(".env")
url = os.getenv("NEO4J_URI")
username = os.getenv("NEO4J_USERNAME")
password = os.getenv("NEO4J_PASSWORD")
ollama_base_url = os.getenv("OLLAMA_BASE_URL")
embedding_model_name = os.getenv("EMBEDDING_MODEL")
llm_name = os.getenv("LLM")
# Remapping for Langchain Neo4j integration
os.environ["NEO4J_URL"] = url
embeddings, dimension = load_embedding_model(
embedding_model_name,
config={ollama_base_url: ollama_base_url},
logger=BaseLogger(),
)
# if Neo4j is local, you can go to http://localhost:7474/ to browse the database
neo4j_graph = Neo4jGraph(url=url, username=username, password=password)
create_vector_index(neo4j_graph, dimension)
llm = load_llm(
llm_name, logger=BaseLogger(), config={"ollama_base_url": ollama_base_url}
)
llm_chain = configure_llm_only_chain(llm)
rag_chain = configure_qa_rag_chain(
llm, embeddings, embeddings_store_url=url, username=username, password=password
)
class QueueCallback(BaseCallbackHandler):
"""Callback handler for streaming LLM responses to a queue."""
def __init__(self, q):
self.q = q
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.q.put(token)
def on_llm_end(self, *args, **kwargs) -> None:
return self.q.empty()
def stream(cb, q) -> Generator:
job_done = object()
def task():
x = cb()
q.put(job_done)
t = Thread(target=task)
t.start()
content = ""
# Get each new token from the queue and yield for our generator
while True:
try:
next_token = q.get(True, timeout=1)
if next_token is job_done:
break
content += next_token
yield next_token, content
except Empty:
continue
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {"message": "Hello World"}
class Question(BaseModel):
text: str
rag: bool = False
@app.get("/query-stream")
def qstream(question: Question = Depends()):
output_function = llm_chain
if question.rag:
output_function = rag_chain
q = Queue()
def cb():
output_function(
{"question": question.text, "chat_history": []},
callbacks=[QueueCallback(q)],
)
def generate():
yield json.dumps({"init": True, "model": llm_name})
for token, _ in stream(cb, q):
yield json.dumps({"token": token})
return EventSourceResponse(generate(), media_type="text/event-stream")
@app.get("/query")
async def ask(question: Question = Depends()):
output_function = llm_chain
if question.rag:
output_function = rag_chain
result = output_function(
{"question": question.text, "chat_history": []}, callbacks=[]
)
return json.dumps({"result": result["answer"], "model": llm_name})