From 6fbd400f3462fdc0901412a5a0d667093c2ba220 Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Wed, 21 Feb 2024 14:29:02 -0800 Subject: [PATCH 01/14] add logging for AsyncOpenAI - current support only chat completion calls - used httpx event hooks and transport to trigger logging --- log10/_httpx_utils.py | 183 ++++++++++++++++++++++++++++++++++++++++++ log10/load.py | 22 +++++ 2 files changed, 205 insertions(+) create mode 100644 log10/_httpx_utils.py diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py new file mode 100644 index 00000000..39074f26 --- /dev/null +++ b/log10/_httpx_utils.py @@ -0,0 +1,183 @@ +import json +import logging +import time +import traceback + +import httpx +from httpx import Request, Response + +from log10.llm import Log10Config +from log10.load import global_tags, sessionID + + +logger: logging.Logger = logging.getLogger("LOG10") + + +_log10_config = Log10Config() +base_url = _log10_config.url +httpx_client = httpx.Client() + + +def _try_post_request(url: str, payload: dict = {}) -> httpx.Response: + headers = { + "x-log10-token": _log10_config.token, + "x-log10-organization-id": _log10_config.org_id, + "Content-Type": "application/json", + } + payload["organization_id"] = _log10_config.org_id + res = None + try: + res = httpx_client.post(url, headers=headers, json=payload) + res.raise_for_status() + return res + except httpx.HTTPError as http_err: + if "401" in str(http_err): + logger.error( + "Failed anthorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again." + + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" + ) + else: + logger.error(f"Failed with error: {http_err}") + except Exception as err: + logger.error(f"Failed to insert in log10: {payload} with error {err}") + + +async def get_completion_id(request: Request): + if "v1/chat/completions" not in str(request.url): + logger.warning("Currently logging is only available for v1/chat/completions.") + return + + completion_url = "/api/completions" + res = _try_post_request(url=f"{base_url}{completion_url}") + try: + completion_id = res.json().get("completionID") + except Exception as e: + logger.error(f"Failed to get completion ID. Error: {e}. Skipping completion recording.") + else: + request.headers["x-log10-completion-id"] = completion_id + + +async def log_request(request: Request): + start_time = time.time() + request.started = start_time + completion_id = request.headers.get("x-log10-completion-id", "") + if not completion_id: + return + + orig_module = "" + orig_qualname = "" + if "chat" in str(request.url): + kind = "chat" + orig_module = "openai.api_resources.chat_completion" + orig_qualname = "ChatCompletion.create" + else: + kind = "completion" + orig_module = "openai.api_resources.completion" + orig_qualname = "Completion.create" + log_row = { + "status": "started", + "kind": kind, + "orig_module": orig_module, + "orig_qualname": orig_qualname, + "request": request.content.decode("utf-8"), + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + + +class _LogResponse(Response): + async def aiter_bytes(self, *args, **kwargs): + full_response = "" + finished = False + async for chunk in super().aiter_bytes(*args, **kwargs): + full_response += chunk.decode(errors="ignore") + + if "data: [DONE]" in full_response: + finished = True + yield chunk + + completion_id = self.request.headers.get("x-log10-completion-id", "") + if finished and completion_id: + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + full_content = "" + responses = full_response.split("\n\n") + for r in responses: + if "data: [DONE]" in r: + break + + r_json = json.loads(r[6:]) + content = r_json["choices"][0]["delta"].get("content", "") + full_content += content + response_json = r_json.copy() + response_json["object"] = "completion" + response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} + log_row = { + "response": json.dumps(response_json), + "status": "finished", + "duration": int(time.time() - self.request.started) * 1000, + "stacktrace": json.dumps(stacktrace), + "kind": "chat", + "request": self.request.content.decode("utf-8"), + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + + +class _LogTransport(httpx.AsyncBaseTransport): + def __init__(self, transport: httpx.AsyncBaseTransport): + self.transport = transport + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + response = await self.transport.handle_async_request(request) + + completion_id = request.headers.get("x-log10-completion-id", "") + if not completion_id: + return response + + if response.headers.get("content-type") == "application/json": + await response.aread() + llm_response = response.json() + + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + + elapsed = time.time() - request.started + log_row = { + "response": json.dumps(llm_response), + "status": "finished", + "duration": int(elapsed * 1000), + "stacktrace": json.dumps(stacktrace), + "kind": "chat", + "request": request.content.decode("utf-8"), + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + return response + elif response.headers.get("content-type") == "text/event-stream": + return _LogResponse( + status_code=response.status_code, + headers=response.headers, + stream=response.stream, + extensions=response.extensions, + request=request, + ) diff --git a/log10/load.py b/log10/load.py index bde1d6d6..03b02b9f 100644 --- a/log10/load.py +++ b/log10/load.py @@ -643,6 +643,28 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): attr = module.resources.chat.completions.Completions method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) + + # support for async completions + # patch module.AsyncOpenAI.__init__ to new_init + origin_init = module.AsyncOpenAI.__init__ + + def new_init(self, *args, **kwargs): + logger.debug("LOG10: patching AsyncOpenAI.__init__") + import httpx + + from log10._httpx_utils import _LogTransport, get_completion_id, log_request + + event_hooks = { + "request": [get_completion_id, log_request], + } + async_httpx_client = httpx.AsyncClient( + event_hooks=event_hooks, transport=_LogTransport(httpx.AsyncHTTPTransport()) + ) + kwargs["http_client"] = async_httpx_client + origin_init(self, *args, **kwargs) + + module.AsyncOpenAI.__init__ = new_init + else: attr = module.api_resources.completion.Completion method = getattr(attr, "create") From 3c92a85d12009dc2b0e9f010cefc553471bb84cd Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Wed, 21 Feb 2024 22:42:47 -0800 Subject: [PATCH 02/14] add examples --- .../logging/magentic_async_stream_logging.py | 25 +++++++++++++++++++ examples/logging/openai_async_logging.py | 22 ++++++++++++++++ .../logging/openai_async_stream_logging.py | 24 ++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 examples/logging/magentic_async_stream_logging.py create mode 100644 examples/logging/openai_async_logging.py create mode 100644 examples/logging/openai_async_stream_logging.py diff --git a/examples/logging/magentic_async_stream_logging.py b/examples/logging/magentic_async_stream_logging.py new file mode 100644 index 00000000..c1cf5269 --- /dev/null +++ b/examples/logging/magentic_async_stream_logging.py @@ -0,0 +1,25 @@ +import asyncio + +import openai +from magentic import AsyncStreamedStr, prompt + +from log10.load import log10, log10_session + + +log10(openai) + + +@prompt("Tell me a 200-word story about {topic}") +async def tell_story(topic: str) -> AsyncStreamedStr: + ... + + +async def main(): + with log10_session(tags=["async_tag"]): + output = await tell_story("Europe.") + async for chunk in output: + print(chunk, end="", flush=True) + + +# Python 3.7+ +asyncio.run(main()) diff --git a/examples/logging/openai_async_logging.py b/examples/logging/openai_async_logging.py new file mode 100644 index 00000000..e89d0e2b --- /dev/null +++ b/examples/logging/openai_async_logging.py @@ -0,0 +1,22 @@ +import asyncio + +import openai +from openai import AsyncOpenAI + +from log10.load import log10 + + +log10(openai) + +client = AsyncOpenAI() + + +async def main(): + completion = await client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Say this is a test"}], + ) + print(completion.choices[0].message.content) + + +asyncio.run(main()) diff --git a/examples/logging/openai_async_stream_logging.py b/examples/logging/openai_async_stream_logging.py new file mode 100644 index 00000000..960dc784 --- /dev/null +++ b/examples/logging/openai_async_stream_logging.py @@ -0,0 +1,24 @@ +import asyncio + +import openai +from openai import AsyncOpenAI + +from log10.load import log10 + + +log10(openai) + +client = AsyncOpenAI() + + +async def main(): + stream = await client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Count to 50."}], + stream=True, + ) + async for chunk in stream: + print(chunk.choices[0].delta.content or "", end="", flush=True) + + +asyncio.run(main()) From d466a5522de603c58e1e38bdc0672019e626af58 Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Wed, 21 Feb 2024 22:51:46 -0800 Subject: [PATCH 03/14] minor --- examples/logging/magentic_async_stream_logging.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/logging/magentic_async_stream_logging.py b/examples/logging/magentic_async_stream_logging.py index c1cf5269..99bcb69b 100644 --- a/examples/logging/magentic_async_stream_logging.py +++ b/examples/logging/magentic_async_stream_logging.py @@ -21,5 +21,4 @@ async def main(): print(chunk, end="", flush=True) -# Python 3.7+ asyncio.run(main()) From f53807f41c97a18610e4af305604ddbc72e149f6 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Thu, 22 Feb 2024 07:39:19 -0800 Subject: [PATCH 04/14] Hot fix for OpenBB function calling --- log10/_httpx_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 39074f26..87d61939 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -117,7 +117,8 @@ async def aiter_bytes(self, *args, **kwargs): r_json = json.loads(r[6:]) content = r_json["choices"][0]["delta"].get("content", "") - full_content += content + if content: + full_content += content response_json = r_json.copy() response_json["object"] = "completion" response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} From a34f59ec720c52eeec15abe54aa5ec0a891ee396 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Thu, 22 Feb 2024 10:05:39 -0800 Subject: [PATCH 05/14] Hot fix for OpenBB function calling --- log10/_httpx_utils.py | 56 ++++++++++++++++++--- log10/load.py | 110 +++++++++++++++++++++++++++++++++++------- 2 files changed, 140 insertions(+), 26 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 87d61939..3132f0ec 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -52,7 +52,9 @@ async def get_completion_id(request: Request): try: completion_id = res.json().get("completionID") except Exception as e: - logger.error(f"Failed to get completion ID. Error: {e}. Skipping completion recording.") + logger.error( + f"Failed to get completion ID. Error: {e}. Skipping completion recording." + ) else: request.headers["x-log10-completion-id"] = completion_id @@ -83,12 +85,16 @@ async def log_request(request: Request): "session_id": sessionID, "tags": global_tags, } - _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + _try_post_request( + url=f"{base_url}/api/completions/{completion_id}", payload=log_row + ) class _LogResponse(Response): async def aiter_bytes(self, *args, **kwargs): full_response = "" + function_name = "" + full_argument = "" finished = False async for chunk in super().aiter_bytes(*args, **kwargs): full_response += chunk.decode(errors="ignore") @@ -109,6 +115,7 @@ async def aiter_bytes(self, *args, **kwargs): } for frame in current_stack_frame ] + print("full_response:", full_response) full_content = "" responses = full_response.split("\n\n") for r in responses: @@ -116,12 +123,41 @@ async def aiter_bytes(self, *args, **kwargs): break r_json = json.loads(r[6:]) - content = r_json["choices"][0]["delta"].get("content", "") - if content: - full_content += content + + delta = r_json["choices"][0]["delta"] + + # Delta may have content + if "content" in delta: + content = delta["content"] + if content: + full_content += content + + # May be a function call, and have to reconstruct the arguments + if "function_call" in delta: + # May be function name + if "name" in delta["function_call"]: + function_name = delta["function_call"]["name"] + # May be function arguments + if "arguments" in delta["function_call"]: + full_argument += delta["function_call"]["arguments"] + response_json = r_json.copy() response_json["object"] = "completion" - response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} + + # If finish_reason is function_call - don't log the response + print(response_json) + if not ( + "choices" in response_json + and response_json["choices"] + and response_json["choices"][0]["finish_reason"] == "function_call" + ): + response_json["choices"][0]["message"]["content"] = full_content + else: + response_json["choices"][0]["function_call"] = { + "name": function_name, + "arguments": full_argument, + } + log_row = { "response": json.dumps(response_json), "status": "finished", @@ -132,7 +168,9 @@ async def aiter_bytes(self, *args, **kwargs): "session_id": sessionID, "tags": global_tags, } - _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + _try_post_request( + url=f"{base_url}/api/completions/{completion_id}", payload=log_row + ) class _LogTransport(httpx.AsyncBaseTransport): @@ -172,7 +210,9 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: "session_id": sessionID, "tags": global_tags, } - _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + _try_post_request( + url=f"{base_url}/api/completions/{completion_id}", payload=log_row + ) return response elif response.headers.get("content-type") == "text/event-stream": return _LogResponse( diff --git a/log10/load.py b/log10/load.py index 03b02b9f..fc97167a 100644 --- a/log10/load.py +++ b/log10/load.py @@ -15,6 +15,7 @@ from dotenv import load_dotenv from packaging.version import parse +import pprint load_dotenv() @@ -80,7 +81,9 @@ def post_request(url: str, json_payload: dict = {}) -> requests.Response: # raise_for_status() will raise an exception if the status is 4xx, 5xxx res.raise_for_status() - logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}") + logger.debug( + f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}" + ) return res except requests.Timeout: logger.error("HTTP request: POST Timeout") @@ -179,7 +182,9 @@ def timed_block(block_name): yield finally: elapsed_time = time.perf_counter() - start_time - logger.debug(f"TIMED BLOCK - {block_name} took {elapsed_time:.6f} seconds to execute.") + logger.debug( + f"TIMED BLOCK - {block_name} took {elapsed_time:.6f} seconds to execute." + ) else: yield @@ -207,6 +212,8 @@ async def log_async(completion_url, func, **kwargs): if "api_key" in kwargs: kwargs.pop("api_key") + kwargs["messages"] = flatten_messages(kwargs["messages"]) + log_row = { # do we want to also store args? "status": "started", @@ -216,6 +223,7 @@ async def log_async(completion_url, func, **kwargs): "session_id": sessionID, "tags": global_tags, } + if target_service == "log10": try: res = post_request(completion_url + "/" + completionID, log_row) @@ -228,7 +236,7 @@ async def log_async(completion_url, func, **kwargs): # NOTE: We only save on request finalization. except Exception as e: - logging.warn(f"LOG10: failed to log: {e}. SKipping") + logging.warn(f"LOG10: failed to log: {e}. Skipping") return None return completionID @@ -338,12 +346,18 @@ def __next__(self): ], } self.partial_log_row["response"] = json.dumps(response) - self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) + self.partial_log_row["duration"] = int( + (time.perf_counter() - self.start_time) * 1000 + ) try: - res = post_request(self.completion_url + "/" + self.completionID, self.partial_log_row) + res = post_request( + self.completion_url + "/" + self.completionID, self.partial_log_row + ) if res.status_code != 200: - logger.error(f"LOG10: failed to insert in log10: {self.partial_log_row} with error {res.text}") + logger.error( + f"LOG10: failed to insert in log10: {self.partial_log_row} with error {res.text}" + ) except Exception as e: traceback.print_tb(e.__traceback__) logging.warn(f"LOG10: failed to log: {e}. Skipping") @@ -351,6 +365,28 @@ def __next__(self): raise se +def flatten_messages(messages): + flat_messages = [] + for message in messages: + if type(message).__name__ == "dict": + flat_messages.append(message) + else: + flat_messages.append(message.model_dump()) + + pprint.pprint(flat_messages) + + return flat_messages + + +def flatten_response(response): + if "choices" in response: + # May have to flatten, if not a dictionary + if type(response.choices[0].message).__name__ != "dict": + response.choices[0].message = response.choices[0].message.model_dump() + + return response + + def intercepting_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -371,10 +407,14 @@ def wrapper(*args, **kwargs): }, ).start() else: - completionID = log_sync(completion_url=completion_url, func=func, **kwargs) + completionID = log_sync( + completion_url=completion_url, func=func, **kwargs + ) if completionID is None: - logging.warn("LOG10: failed to get completionID from log10. Skipping log.") + logging.warn( + "LOG10: failed to get completionID from log10. Skipping log." + ) func_with_backoff(func, *args, **kwargs) return @@ -401,12 +441,18 @@ def wrapper(*args, **kwargs): completionID = result_queue.get() if completionID is None: - logging.warn(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") + logging.warn( + f"LOG10: failed to get completionID from log10: {e}. Skipping log." + ) return logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): + if type( + e + ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( + e + ): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ @@ -425,7 +471,9 @@ def wrapper(*args, **kwargs): try: res = post_request(completion_url + "/" + completionID, log_row) except Exception as le: - logging.warn(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") + logging.warn( + f"LOG10: failed to log: {le}. Skipping, but raising LLM error." + ) raise e else: # finished with no exceptions @@ -441,10 +489,14 @@ def wrapper(*args, **kwargs): if "anthropic" in type(output).__module__: from log10.anthropic import Anthropic - response = Anthropic.prepare_response(kwargs["prompt"], output, "text") + response = Anthropic.prepare_response( + kwargs["prompt"], output, "text" + ) kind = "completion" elif type(output).__name__ == "Stream": - kind = "chat" # Should be "stream", but we don't have that kind yet. + kind = ( + "chat" # Should be "stream", but we don't have that kind yet. + ) return StreamingResponseWrapper( completion_url=completion_url, completionID=completionID, @@ -464,16 +516,27 @@ def wrapper(*args, **kwargs): else: response = output - kind = "chat" if output.object == "chat.completion" else "completion" + kind = ( + "chat" if output.object == "chat.completion" else "completion" + ) # in case the usage of load(openai) and langchain.ChatOpenAI if "api_key" in kwargs: kwargs.pop("api_key") + # We may have to flatten messages from their ChatCompletionMessage with nested ChatCompletionMessageToolCall to json serializable format + # Rewrite in-place + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) + + if "choices" in response: + response = flatten_response(response) + if hasattr(response, "model_dump_json"): response = response.model_dump_json() else: response = json.dumps(response) + log_row = { "response": response, "status": "finished", @@ -487,11 +550,15 @@ def wrapper(*args, **kwargs): "tags": global_tags, } + pprint.pprint(log_row) + if target_service == "log10": try: res = post_request(completion_url + "/" + completionID, log_row) if res.status_code != 200: - logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}") + logger.error( + f"LOG10: failed to insert in log10: {log_row} with error {res.text}" + ) except Exception as e: logging.warn(f"LOG10: failed to log: {e}. Skipping") @@ -513,7 +580,9 @@ def wrapper(*args, **kwargs): bigquery_client.insert_rows_json(bigquery_table, [log_row]) except Exception as e: - logging.error(f"LOG10: failed to insert in Bigquery: {log_row} with error {e}") + logging.error( + f"LOG10: failed to insert in Bigquery: {log_row} with error {e}" + ) return output @@ -652,13 +721,18 @@ def new_init(self, *args, **kwargs): logger.debug("LOG10: patching AsyncOpenAI.__init__") import httpx - from log10._httpx_utils import _LogTransport, get_completion_id, log_request + from log10._httpx_utils import ( + _LogTransport, + get_completion_id, + log_request, + ) event_hooks = { "request": [get_completion_id, log_request], } async_httpx_client = httpx.AsyncClient( - event_hooks=event_hooks, transport=_LogTransport(httpx.AsyncHTTPTransport()) + event_hooks=event_hooks, + transport=_LogTransport(httpx.AsyncHTTPTransport()), ) kwargs["http_client"] = async_httpx_client origin_init(self, *args, **kwargs) From 52925ec3f43773f5b8d825acdb0fb86054986bcd Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Thu, 22 Feb 2024 10:06:05 -0800 Subject: [PATCH 06/14] Tools and functions support --- log10/load.py | 66 +++++++++++++-------------------------------------- 1 file changed, 17 insertions(+), 49 deletions(-) diff --git a/log10/load.py b/log10/load.py index fc97167a..d1576d42 100644 --- a/log10/load.py +++ b/log10/load.py @@ -3,6 +3,7 @@ import json import logging import os +import pprint import queue import threading import time @@ -15,7 +16,6 @@ from dotenv import load_dotenv from packaging.version import parse -import pprint load_dotenv() @@ -81,9 +81,7 @@ def post_request(url: str, json_payload: dict = {}) -> requests.Response: # raise_for_status() will raise an exception if the status is 4xx, 5xxx res.raise_for_status() - logger.debug( - f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}" - ) + logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}") return res except requests.Timeout: logger.error("HTTP request: POST Timeout") @@ -182,9 +180,7 @@ def timed_block(block_name): yield finally: elapsed_time = time.perf_counter() - start_time - logger.debug( - f"TIMED BLOCK - {block_name} took {elapsed_time:.6f} seconds to execute." - ) + logger.debug(f"TIMED BLOCK - {block_name} took {elapsed_time:.6f} seconds to execute.") else: yield @@ -223,7 +219,7 @@ async def log_async(completion_url, func, **kwargs): "session_id": sessionID, "tags": global_tags, } - + if target_service == "log10": try: res = post_request(completion_url + "/" + completionID, log_row) @@ -346,18 +342,12 @@ def __next__(self): ], } self.partial_log_row["response"] = json.dumps(response) - self.partial_log_row["duration"] = int( - (time.perf_counter() - self.start_time) * 1000 - ) + self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) try: - res = post_request( - self.completion_url + "/" + self.completionID, self.partial_log_row - ) + res = post_request(self.completion_url + "/" + self.completionID, self.partial_log_row) if res.status_code != 200: - logger.error( - f"LOG10: failed to insert in log10: {self.partial_log_row} with error {res.text}" - ) + logger.error(f"LOG10: failed to insert in log10: {self.partial_log_row} with error {res.text}") except Exception as e: traceback.print_tb(e.__traceback__) logging.warn(f"LOG10: failed to log: {e}. Skipping") @@ -407,14 +397,10 @@ def wrapper(*args, **kwargs): }, ).start() else: - completionID = log_sync( - completion_url=completion_url, func=func, **kwargs - ) + completionID = log_sync(completion_url=completion_url, func=func, **kwargs) if completionID is None: - logging.warn( - "LOG10: failed to get completionID from log10. Skipping log." - ) + logging.warn("LOG10: failed to get completionID from log10. Skipping log.") func_with_backoff(func, *args, **kwargs) return @@ -441,18 +427,12 @@ def wrapper(*args, **kwargs): completionID = result_queue.get() if completionID is None: - logging.warn( - f"LOG10: failed to get completionID from log10: {e}. Skipping log." - ) + logging.warn(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") return logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type( - e - ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( - e - ): + if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ @@ -471,9 +451,7 @@ def wrapper(*args, **kwargs): try: res = post_request(completion_url + "/" + completionID, log_row) except Exception as le: - logging.warn( - f"LOG10: failed to log: {le}. Skipping, but raising LLM error." - ) + logging.warn(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") raise e else: # finished with no exceptions @@ -489,14 +467,10 @@ def wrapper(*args, **kwargs): if "anthropic" in type(output).__module__: from log10.anthropic import Anthropic - response = Anthropic.prepare_response( - kwargs["prompt"], output, "text" - ) + response = Anthropic.prepare_response(kwargs["prompt"], output, "text") kind = "completion" elif type(output).__name__ == "Stream": - kind = ( - "chat" # Should be "stream", but we don't have that kind yet. - ) + kind = "chat" # Should be "stream", but we don't have that kind yet. return StreamingResponseWrapper( completion_url=completion_url, completionID=completionID, @@ -516,9 +490,7 @@ def wrapper(*args, **kwargs): else: response = output - kind = ( - "chat" if output.object == "chat.completion" else "completion" - ) + kind = "chat" if output.object == "chat.completion" else "completion" # in case the usage of load(openai) and langchain.ChatOpenAI if "api_key" in kwargs: @@ -556,9 +528,7 @@ def wrapper(*args, **kwargs): try: res = post_request(completion_url + "/" + completionID, log_row) if res.status_code != 200: - logger.error( - f"LOG10: failed to insert in log10: {log_row} with error {res.text}" - ) + logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}") except Exception as e: logging.warn(f"LOG10: failed to log: {e}. Skipping") @@ -580,9 +550,7 @@ def wrapper(*args, **kwargs): bigquery_client.insert_rows_json(bigquery_table, [log_row]) except Exception as e: - logging.error( - f"LOG10: failed to insert in Bigquery: {log_row} with error {e}" - ) + logging.error(f"LOG10: failed to insert in Bigquery: {log_row} with error {e}") return output From b588ec2069adf225b9bfc1a43f8569407481abe7 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Thu, 22 Feb 2024 10:07:50 -0800 Subject: [PATCH 07/14] Remove print statements --- log10/load.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/log10/load.py b/log10/load.py index d1576d42..d602f301 100644 --- a/log10/load.py +++ b/log10/load.py @@ -3,7 +3,6 @@ import json import logging import os -import pprint import queue import threading import time @@ -362,9 +361,6 @@ def flatten_messages(messages): flat_messages.append(message) else: flat_messages.append(message.model_dump()) - - pprint.pprint(flat_messages) - return flat_messages @@ -522,8 +518,6 @@ def wrapper(*args, **kwargs): "tags": global_tags, } - pprint.pprint(log_row) - if target_service == "log10": try: res = post_request(completion_url + "/" + completionID, log_row) From 5678269e3b3b452e2e856839284abe5f38fb5bef Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Thu, 22 Feb 2024 11:35:15 -0800 Subject: [PATCH 08/14] fix response with message --- .../magentic_async_function_logging.py | 34 +++++++ examples/logging/openai_tools.py | 91 +++++++++++++++++++ log10/_httpx_utils.py | 24 ++--- log10/load.py | 4 +- 4 files changed, 134 insertions(+), 19 deletions(-) create mode 100644 examples/logging/magentic_async_function_logging.py create mode 100644 examples/logging/openai_tools.py diff --git a/examples/logging/magentic_async_function_logging.py b/examples/logging/magentic_async_function_logging.py new file mode 100644 index 00000000..81641c7a --- /dev/null +++ b/examples/logging/magentic_async_function_logging.py @@ -0,0 +1,34 @@ +import openai +from magentic import AsyncStreamedStr, FunctionCall, prompt + +from log10.load import log10 + + +log10(openai) + + +def add(x: int, y: int) -> int: + """Add together two numbers.""" + return x + y + + +@prompt("What is 1+1? Use tools", functions=[add]) +async def agent() -> AsyncStreamedStr: + ... + + +# Define an async main function +async def main(): + response = await agent() + if isinstance(response, FunctionCall): + print(response) + else: + async for chunk in response: + print(chunk, end="", flush=True) + + +# Running the main function using asyncio +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/logging/openai_tools.py b/examples/logging/openai_tools.py new file mode 100644 index 00000000..97b2222c --- /dev/null +++ b/examples/logging/openai_tools.py @@ -0,0 +1,91 @@ +import json + +from log10.load import OpenAI + + +client = OpenAI() + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location, unit="fahrenheit"): + """Get the current weather in a given location""" + if "tokyo" in location.lower(): + return json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + return json.dumps({"location": "San Francisco", "temperature": "72", "unit": unit}) + elif "paris" in location.lower(): + return json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + return json.dumps({"location": location, "temperature": "unknown"}) + + +def run_conversation(): + # Step 1: send the conversation and available functions to the model + messages = [ + { + "role": "user", + "content": "What's the weather like in San Francisco, Tokyo, and Paris?", + } + ] + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + } + ] + response = client.chat.completions.create( + model="gpt-3.5-turbo-0125", + messages=messages, + tools=tools, + tool_choice="auto", # auto is default, but we'll be explicit + ) + response_message = response.choices[0].message + tool_calls = response_message.tool_calls + # Step 2: check if the model wanted to call a function + if tool_calls: + # Step 3: call the function + # Note: the JSON response may not always be valid; be sure to handle errors + available_functions = { + "get_current_weather": get_current_weather, + } # only one function in this example, but you can have multiple + messages.append(response_message) # extend conversation with assistant's reply + # Step 4: send the info for each function call and function response to the model + for tool_call in tool_calls: + function_name = tool_call.function.name + function_to_call = available_functions[function_name] + function_args = json.loads(tool_call.function.arguments) + function_response = function_to_call( + location=function_args.get("location"), + unit=function_args.get("unit"), + ) + messages.append( + { + "tool_call_id": tool_call.id, + "role": "tool", + "name": function_name, + "content": function_response, + } + ) # extend conversation with function response + second_response = client.chat.completions.create( + model="gpt-3.5-turbo-0125", + messages=messages, + ) # get a new response from the model where it can see the function response + return second_response + + +print(run_conversation()) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 3132f0ec..b3fd214d 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -52,9 +52,7 @@ async def get_completion_id(request: Request): try: completion_id = res.json().get("completionID") except Exception as e: - logger.error( - f"Failed to get completion ID. Error: {e}. Skipping completion recording." - ) + logger.error(f"Failed to get completion ID. Error: {e}. Skipping completion recording.") else: request.headers["x-log10-completion-id"] = completion_id @@ -85,16 +83,12 @@ async def log_request(request: Request): "session_id": sessionID, "tags": global_tags, } - _try_post_request( - url=f"{base_url}/api/completions/{completion_id}", payload=log_row - ) + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) class _LogResponse(Response): async def aiter_bytes(self, *args, **kwargs): full_response = "" - function_name = "" - full_argument = "" finished = False async for chunk in super().aiter_bytes(*args, **kwargs): full_response += chunk.decode(errors="ignore") @@ -115,8 +109,9 @@ async def aiter_bytes(self, *args, **kwargs): } for frame in current_stack_frame ] - print("full_response:", full_response) full_content = "" + function_name = "" + full_argument = "" responses = full_response.split("\n\n") for r in responses: if "data: [DONE]" in r: @@ -145,13 +140,12 @@ async def aiter_bytes(self, *args, **kwargs): response_json["object"] = "completion" # If finish_reason is function_call - don't log the response - print(response_json) if not ( "choices" in response_json and response_json["choices"] and response_json["choices"][0]["finish_reason"] == "function_call" ): - response_json["choices"][0]["message"]["content"] = full_content + response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} else: response_json["choices"][0]["function_call"] = { "name": function_name, @@ -168,9 +162,7 @@ async def aiter_bytes(self, *args, **kwargs): "session_id": sessionID, "tags": global_tags, } - _try_post_request( - url=f"{base_url}/api/completions/{completion_id}", payload=log_row - ) + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) class _LogTransport(httpx.AsyncBaseTransport): @@ -210,9 +202,7 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: "session_id": sessionID, "tags": global_tags, } - _try_post_request( - url=f"{base_url}/api/completions/{completion_id}", payload=log_row - ) + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) return response elif response.headers.get("content-type") == "text/event-stream": return _LogResponse( diff --git a/log10/load.py b/log10/load.py index d602f301..6c085d52 100644 --- a/log10/load.py +++ b/log10/load.py @@ -357,7 +357,7 @@ def __next__(self): def flatten_messages(messages): flat_messages = [] for message in messages: - if type(message).__name__ == "dict": + if isinstance(message, dict): flat_messages.append(message) else: flat_messages.append(message.model_dump()) @@ -367,7 +367,7 @@ def flatten_messages(messages): def flatten_response(response): if "choices" in response: # May have to flatten, if not a dictionary - if type(response.choices[0].message).__name__ != "dict": + if not isinstance(response.choices[0].message, dict): response.choices[0].message = response.choices[0].message.model_dump() return response From 71b0e121eb867eb4f8d80c47dfbbff300e443296 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Thu, 22 Feb 2024 13:07:56 -0800 Subject: [PATCH 09/14] Fix non-message prompts --- log10/load.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/log10/load.py b/log10/load.py index d602f301..6339d6cc 100644 --- a/log10/load.py +++ b/log10/load.py @@ -207,7 +207,8 @@ async def log_async(completion_url, func, **kwargs): if "api_key" in kwargs: kwargs.pop("api_key") - kwargs["messages"] = flatten_messages(kwargs["messages"]) + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) log_row = { # do we want to also store args? From a25a4dc1fb328a2ff04a2c7ac7d4c7a19a8b87bd Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Thu, 22 Feb 2024 13:48:54 -0800 Subject: [PATCH 10/14] flatten only for openai chat completions --- log10/load.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/log10/load.py b/log10/load.py index d781a2a9..6fd1bcfb 100644 --- a/log10/load.py +++ b/log10/load.py @@ -489,18 +489,18 @@ def wrapper(*args, **kwargs): response = output kind = "chat" if output.object == "chat.completion" else "completion" + # We may have to flatten messages from their ChatCompletionMessage with nested ChatCompletionMessageToolCall to json serializable format + # Rewrite in-place + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) + + if "choices" in response: + response = flatten_response(response) + # in case the usage of load(openai) and langchain.ChatOpenAI if "api_key" in kwargs: kwargs.pop("api_key") - # We may have to flatten messages from their ChatCompletionMessage with nested ChatCompletionMessageToolCall to json serializable format - # Rewrite in-place - if "messages" in kwargs: - kwargs["messages"] = flatten_messages(kwargs["messages"]) - - if "choices" in response: - response = flatten_response(response) - if hasattr(response, "model_dump_json"): response = response.model_dump_json() else: From d249400a8a75b7472923864b5326648a677f33c1 Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Thu, 22 Feb 2024 14:06:40 -0800 Subject: [PATCH 11/14] add an example of regular magentic function call --- examples/logging/magentic_function_logging.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 examples/logging/magentic_function_logging.py diff --git a/examples/logging/magentic_function_logging.py b/examples/logging/magentic_function_logging.py new file mode 100644 index 00000000..af71a8b7 --- /dev/null +++ b/examples/logging/magentic_function_logging.py @@ -0,0 +1,32 @@ +# taken from Magentic README +# https://github.com/jackmpcollins/magentic/blob/2493419f2db3a3be58fb308d7df51a51bf1989c1/README.md#usage + +from typing import Literal + +import openai +from magentic import FunctionCall, prompt + +from log10.load import log10, global_tags + + +log10(openai) +global_tags = ["magentic", "function", "example"] + + +def activate_oven(temperature: int, mode: Literal["broil", "bake", "roast"]) -> str: + """Turn the oven on with the provided settings.""" + return f"Preheating to {temperature} F with mode {mode}" + + +@prompt( + "Prepare the oven so I can make {food}", + functions=[activate_oven], +) +def configure_oven(food: str) -> FunctionCall[str]: + ... + + +output = configure_oven("cookies!") +# FunctionCall(, temperature=350, mode='bake') +print(output()) +# 'Preheating to 350 F with mode bake' From 5fe9cbd8c3dc93fea680baea3e3337baf9580fdc Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Thu, 22 Feb 2024 14:39:03 -0800 Subject: [PATCH 12/14] enable stream function --- examples/logging/magentic_function_logging.py | 4 +- log10/load.py | 62 ++++++++++++++----- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/examples/logging/magentic_function_logging.py b/examples/logging/magentic_function_logging.py index af71a8b7..e61f0611 100644 --- a/examples/logging/magentic_function_logging.py +++ b/examples/logging/magentic_function_logging.py @@ -6,11 +6,11 @@ import openai from magentic import FunctionCall, prompt -from log10.load import log10, global_tags +from log10.load import global_tags, log10 log10(openai) -global_tags = ["magentic", "function", "example"] +global_tags = ["magentic", "function", "example"] # noqa: F811 def activate_oven(temperature: int, mode: Literal["broil", "bake", "roast"]) -> str: diff --git a/log10/load.py b/log10/load.py index 6fd1bcfb..7fdfc501 100644 --- a/log10/load.py +++ b/log10/load.py @@ -300,6 +300,8 @@ def __init__(self, completion_url, completionID, response, partial_log_row): self.partial_log_row = partial_log_row self.response = response self.final_result = "" # Store the final result + self.function_name = "" + self.function_arguments = "" self.start_time = time.perf_counter() self.gpt_id = None self.model = None @@ -311,14 +313,20 @@ def __iter__(self): def __next__(self): try: chunk = next(self.response) - content = chunk.choices[0].delta.content - if content: + # import ipdb; ipdb.set_trace() + if chunk.choices[0].delta.content: # Here you can intercept and modify content if needed + content = chunk.choices[0].delta.content self.final_result += content # Save the content # Yield the original or modified content self.model = chunk.model self.gpt_id = chunk.id + elif chunk.choices[0].delta.function_call: + arguments = chunk.choices[0].delta.function_call.arguments + self.function_arguments += arguments + if chunk.choices[0].delta.function_call.name: + self.function_name = chunk.choices[0].delta.function_call.name else: self.finish_reason = chunk.choices[0].finish_reason @@ -326,21 +334,38 @@ def __next__(self): except StopIteration as se: # Log the final result # Create fake response for openai format. - response = { - "id": self.gpt_id, - "object": "completion", - "model": self.model, - "choices": [ - { - "index": 0, - "finish_reason": self.finish_reason, - "message": { - "role": "assistant", - "content": self.final_result, - }, - } - ], - } + if self.final_result: + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "message": { + "role": "assistant", + "content": self.final_result, + }, + } + ], + } + elif self.function_arguments: + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "function_call": { + "name": self.function_name, + "arguments": self.function_arguments, + }, + } + ], + } self.partial_log_row["response"] = json.dumps(response) self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) @@ -635,6 +660,9 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): global DEBUG, USE_ASYNC, sync_log_text DEBUG = DEBUG_ or os.environ.get("LOG10_DEBUG", False) logger.setLevel(logging.DEBUG if DEBUG else logging.WARNING) + if DEBUG: + openai_logger = logging.getLogger("openai") + openai_logger.setLevel(logging.DEBUG) USE_ASYNC = USE_ASYNC_ sync_log_text = set_sync_log_text(USE_ASYNC=USE_ASYNC) From ed1d0d68abdbf88ded47164cbc4c289c94377591 Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Thu, 22 Feb 2024 14:52:13 -0800 Subject: [PATCH 13/14] minor --- examples/logging/magentic_function_logging.py | 3 +-- log10/load.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/logging/magentic_function_logging.py b/examples/logging/magentic_function_logging.py index e61f0611..d1c4d2c4 100644 --- a/examples/logging/magentic_function_logging.py +++ b/examples/logging/magentic_function_logging.py @@ -6,11 +6,10 @@ import openai from magentic import FunctionCall, prompt -from log10.load import global_tags, log10 +from log10.load import log10 log10(openai) -global_tags = ["magentic", "function", "example"] # noqa: F811 def activate_oven(temperature: int, mode: Literal["broil", "bake", "roast"]) -> str: diff --git a/log10/load.py b/log10/load.py index 7fdfc501..df08eb3a 100644 --- a/log10/load.py +++ b/log10/load.py @@ -325,7 +325,7 @@ def __next__(self): elif chunk.choices[0].delta.function_call: arguments = chunk.choices[0].delta.function_call.arguments self.function_arguments += arguments - if chunk.choices[0].delta.function_call.name: + if not self.function_name and chunk.choices[0].delta.function_call.name: self.function_name = chunk.choices[0].delta.function_call.name else: self.finish_reason = chunk.choices[0].finish_reason From 01df62481ea7825daf8de0369f3fea5972484ffc Mon Sep 17 00:00:00 2001 From: Wenzhe Xue Date: Thu, 22 Feb 2024 16:13:23 -0800 Subject: [PATCH 14/14] skip two xdoctest for openai v0 --- log10/load.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/log10/load.py b/log10/load.py index df08eb3a..1a7e39e3 100644 --- a/log10/load.py +++ b/log10/load.py @@ -592,7 +592,8 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): Openai V0 example: Example: - >>> from log10.load import log10 # xdoctest: +SKIP + >>> # xdoctest: +SKIP + >>> from log10.load import log10 >>> import openai >>> log10(openai) >>> completion = openai.Completion.create( @@ -603,7 +604,8 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): >>> print(completion) Example: - >>> from log10.load import log10 # xdoctest: +SKIP + >>> # xdoctest: +SKIP + >>> from log10.load import log10 >>> import openai >>> log10(openai) >>> completion = openai.ChatCompletion.create( @@ -660,9 +662,6 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): global DEBUG, USE_ASYNC, sync_log_text DEBUG = DEBUG_ or os.environ.get("LOG10_DEBUG", False) logger.setLevel(logging.DEBUG if DEBUG else logging.WARNING) - if DEBUG: - openai_logger = logging.getLogger("openai") - openai_logger.setLevel(logging.DEBUG) USE_ASYNC = USE_ASYNC_ sync_log_text = set_sync_log_text(USE_ASYNC=USE_ASYNC)