From 994ff2ef1aecbc6091c0a0701ee5a45364450fc8 Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Wed, 21 Feb 2024 17:15:11 -0800 Subject: [PATCH] wip stash --- log10/_httpx_utils.py | 132 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 123 insertions(+), 9 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 61be202c..efeec436 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -1,11 +1,12 @@ +import json import logging import os +import traceback import httpx from dotenv import load_dotenv -from httpx import Request - -from load import sessionID, global_tags +from httpx import Request, Response +from load import global_tags, sessionID load_dotenv() @@ -20,7 +21,7 @@ httpx_client = httpx.Client(transport=transport) -def _try_post_request(url: str, headers: dict = {}, payload: dict = {}) -> httpx.Response: +def _try_post_request(url: str, payload: dict = {}) -> httpx.Response: payload["organization_id"] = org_id headers = { "x-log10-token": token, @@ -45,28 +46,141 @@ def _try_post_request(url: str, headers: dict = {}, payload: dict = {}) -> httpx async def get_completion_id(request: Request): completion_url = "/api/completions" - res = _try_post_request(base_url + completion_url) + res = _try_post_request(url=f"{base_url}{completion_url}") try: completion_id = res.json().get("completionID") except Exception as e: logger.error(f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording.") else: - request.headers["x-log10_completion_id"] = completion_id + request.headers["x-log10-completion-id"] = completion_id async def log_request(request: Request): - completion_id = request.headers.get("x-log10_completion_id", "") + completion_id = request.headers.get("x-log10-completion-id", "") if not completion_id: return + orig_module = "" + orig_qualname = "" + if "chat" in request.url: + orig_module = "openai.api_resources.chat_completion" + orig_qualname = "ChatCompletion.create" + else: + orig_module = "openai.api_resources.completion" + orig_qualname = "Completion.create" log_row = { "status": "started", - "orig_module": , # could fake - "orig_qualname": "ChatCompletion.create" if "chat" in request.url else "Completion.create", + "orig_module": orig_module, + "orig_qualname": orig_qualname, "request": request.content, "session_id": sessionID, "tags": global_tags, } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + + +class _LogResponse(Response): + async def aiter_bytes(self, *args, **kwargs): + full_response = "" + finished = False + async for chunk in super().aiter_bytes(*args, **kwargs): + # print(chunk.decode(errors='ignore'), end='', flush=True) + full_response += chunk.decode(errors="ignore") + + if "data: [DONE]" in full_response: + finished = True + yield chunk + completion_id = self.request.headers.get("x-log10-completion-id", "") + if finished and completion_id: + # import ipdb; ipdb.set_trace() + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + full_content = "" + responses = full_response.split("\n\n") + for r in responses: + if "data: [DONE]" in r: + break + # print(f"{r=}") + r_json = json.loads(r[6:]) + content = r_json["choices"][0]["delta"].get("content", "") + full_content += content + response_json = r_json.copy() + response_json["object"] = "completion" + response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} + log_row = { + "response": json.dumps(response_json), + "status": "finished", + "stacktrace": json.dumps(stacktrace), + "kind": "chat", + "reqest": self.request.content, + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + + print( + f"\n\nLOG10 💬 [completion_id: {completion_id}]\n Full Response Content:\n {full_content}" + ) + + +class _LogTransport(httpx.AsyncBaseTransport): + def __init__(self, transport: httpx.AsyncBaseTransport): + self.transport = transport + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + response = await self.transport.handle_async_request(request) + + if response.headers.get("content-type") == "application/json": + await response.aread() + + completion_id = request.headers.get("x-log10-completion-id", "") + if not completion_id: + return response + + llm_response = response.json() + + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + log_row = { + "response": json.dumps(llm_response), + "status": "finished", + "stacktrace": json.dumps(stacktrace), + "kind": "chat", + "reqest": self.request.content, + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + print( + f"LOG10 💬 [completion_id: {request.headers['log10_completion_id']}] Response: {response.json()['choices'][0]['message']['content']}" + ) + return response + elif response.headers.get("content-type") == "text/event-stream": + return _LogResponse( + status_code=response.status_code, + headers=response.headers, + stream=response.stream, + extensions=response.extensions, + request=request, + ) + +async def