Skip to content

Commit

Permalink
wip stash
Browse files Browse the repository at this point in the history
  • Loading branch information
wenzhe-log10 committed Feb 22, 2024
1 parent b3d2447 commit 994ff2e
Showing 1 changed file with 123 additions and 9 deletions.
132 changes: 123 additions & 9 deletions log10/_httpx_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import logging
import os
import traceback

import httpx
from dotenv import load_dotenv
from httpx import Request

from load import sessionID, global_tags
from httpx import Request, Response
from load import global_tags, sessionID


load_dotenv()
Expand All @@ -20,7 +21,7 @@
httpx_client = httpx.Client(transport=transport)


def _try_post_request(url: str, headers: dict = {}, payload: dict = {}) -> httpx.Response:
def _try_post_request(url: str, payload: dict = {}) -> httpx.Response:
payload["organization_id"] = org_id
headers = {
"x-log10-token": token,
Expand All @@ -45,28 +46,141 @@ def _try_post_request(url: str, headers: dict = {}, payload: dict = {}) -> httpx

async def get_completion_id(request: Request):
completion_url = "/api/completions"
res = _try_post_request(base_url + completion_url)
res = _try_post_request(url=f"{base_url}{completion_url}")
try:
completion_id = res.json().get("completionID")
except Exception as e:
logger.error(f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording.")
else:
request.headers["x-log10_completion_id"] = completion_id
request.headers["x-log10-completion-id"] = completion_id


async def log_request(request: Request):
completion_id = request.headers.get("x-log10_completion_id", "")
completion_id = request.headers.get("x-log10-completion-id", "")
if not completion_id:
return

orig_module = ""
orig_qualname = ""
if "chat" in request.url:
orig_module = "openai.api_resources.chat_completion"
orig_qualname = "ChatCompletion.create"
else:
orig_module = "openai.api_resources.completion"
orig_qualname = "Completion.create"
log_row = {
"status": "started",
"orig_module": , # could fake
"orig_qualname": "ChatCompletion.create" if "chat" in request.url else "Completion.create",
"orig_module": orig_module,
"orig_qualname": orig_qualname,
"request": request.content,
"session_id": sessionID,
"tags": global_tags,
}
_try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)


class _LogResponse(Response):
async def aiter_bytes(self, *args, **kwargs):
full_response = ""
finished = False
async for chunk in super().aiter_bytes(*args, **kwargs):
# print(chunk.decode(errors='ignore'), end='', flush=True)
full_response += chunk.decode(errors="ignore")

if "data: [DONE]" in full_response:
finished = True
yield chunk
completion_id = self.request.headers.get("x-log10-completion-id", "")
if finished and completion_id:
# import ipdb; ipdb.set_trace()
current_stack_frame = traceback.extract_stack()
stacktrace = [
{
"file": frame.filename,
"line": frame.line,
"lineno": frame.lineno,
"name": frame.name,
}
for frame in current_stack_frame
]
full_content = ""
responses = full_response.split("\n\n")
for r in responses:
if "data: [DONE]" in r:
break
# print(f"{r=}")
r_json = json.loads(r[6:])
content = r_json["choices"][0]["delta"].get("content", "")
full_content += content
response_json = r_json.copy()
response_json["object"] = "completion"
response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content}
log_row = {
"response": json.dumps(response_json),
"status": "finished",
"stacktrace": json.dumps(stacktrace),
"kind": "chat",
"reqest": self.request.content,
"session_id": sessionID,
"tags": global_tags,
}
_try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)

print(
f"\n\nLOG10 💬 [completion_id: {completion_id}]\n Full Response Content:\n {full_content}"
)


class _LogTransport(httpx.AsyncBaseTransport):
def __init__(self, transport: httpx.AsyncBaseTransport):
self.transport = transport

async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
response = await self.transport.handle_async_request(request)

if response.headers.get("content-type") == "application/json":
await response.aread()

completion_id = request.headers.get("x-log10-completion-id", "")
if not completion_id:
return response

llm_response = response.json()

current_stack_frame = traceback.extract_stack()
stacktrace = [
{
"file": frame.filename,
"line": frame.line,
"lineno": frame.lineno,
"name": frame.name,
}
for frame in current_stack_frame
]
log_row = {
"response": json.dumps(llm_response),
"status": "finished",
"stacktrace": json.dumps(stacktrace),
"kind": "chat",
"reqest": self.request.content,
"session_id": sessionID,
"tags": global_tags,
}
_try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
print(
f"LOG10 💬 [completion_id: {request.headers['log10_completion_id']}] Response: {response.json()['choices'][0]['message']['content']}"
)
return response
elif response.headers.get("content-type") == "text/event-stream":
return _LogResponse(
status_code=response.status_code,
headers=response.headers,
stream=response.stream,
extensions=response.extensions,
request=request,
)

async def



0 comments on commit 994ff2e

Please sign in to comment.