diff --git a/examples/logging/magentic_async_function_logging.py b/examples/logging/magentic_async_function_logging.py new file mode 100644 index 00000000..81641c7a --- /dev/null +++ b/examples/logging/magentic_async_function_logging.py @@ -0,0 +1,34 @@ +import openai +from magentic import AsyncStreamedStr, FunctionCall, prompt + +from log10.load import log10 + + +log10(openai) + + +def add(x: int, y: int) -> int: + """Add together two numbers.""" + return x + y + + +@prompt("What is 1+1? Use tools", functions=[add]) +async def agent() -> AsyncStreamedStr: + ... + + +# Define an async main function +async def main(): + response = await agent() + if isinstance(response, FunctionCall): + print(response) + else: + async for chunk in response: + print(chunk, end="", flush=True) + + +# Running the main function using asyncio +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/logging/magentic_async_stream_logging.py b/examples/logging/magentic_async_stream_logging.py new file mode 100644 index 00000000..99bcb69b --- /dev/null +++ b/examples/logging/magentic_async_stream_logging.py @@ -0,0 +1,24 @@ +import asyncio + +import openai +from magentic import AsyncStreamedStr, prompt + +from log10.load import log10, log10_session + + +log10(openai) + + +@prompt("Tell me a 200-word story about {topic}") +async def tell_story(topic: str) -> AsyncStreamedStr: + ... + + +async def main(): + with log10_session(tags=["async_tag"]): + output = await tell_story("Europe.") + async for chunk in output: + print(chunk, end="", flush=True) + + +asyncio.run(main()) diff --git a/examples/logging/magentic_function_logging.py b/examples/logging/magentic_function_logging.py new file mode 100644 index 00000000..d1c4d2c4 --- /dev/null +++ b/examples/logging/magentic_function_logging.py @@ -0,0 +1,31 @@ +# taken from Magentic README +# https://github.com/jackmpcollins/magentic/blob/2493419f2db3a3be58fb308d7df51a51bf1989c1/README.md#usage + +from typing import Literal + +import openai +from magentic import FunctionCall, prompt + +from log10.load import log10 + + +log10(openai) + + +def activate_oven(temperature: int, mode: Literal["broil", "bake", "roast"]) -> str: + """Turn the oven on with the provided settings.""" + return f"Preheating to {temperature} F with mode {mode}" + + +@prompt( + "Prepare the oven so I can make {food}", + functions=[activate_oven], +) +def configure_oven(food: str) -> FunctionCall[str]: + ... + + +output = configure_oven("cookies!") +# FunctionCall(, temperature=350, mode='bake') +print(output()) +# 'Preheating to 350 F with mode bake' diff --git a/examples/logging/openai_async_logging.py b/examples/logging/openai_async_logging.py new file mode 100644 index 00000000..e89d0e2b --- /dev/null +++ b/examples/logging/openai_async_logging.py @@ -0,0 +1,22 @@ +import asyncio + +import openai +from openai import AsyncOpenAI + +from log10.load import log10 + + +log10(openai) + +client = AsyncOpenAI() + + +async def main(): + completion = await client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Say this is a test"}], + ) + print(completion.choices[0].message.content) + + +asyncio.run(main()) diff --git a/examples/logging/openai_async_stream_logging.py b/examples/logging/openai_async_stream_logging.py new file mode 100644 index 00000000..960dc784 --- /dev/null +++ b/examples/logging/openai_async_stream_logging.py @@ -0,0 +1,24 @@ +import asyncio + +import openai +from openai import AsyncOpenAI + +from log10.load import log10 + + +log10(openai) + +client = AsyncOpenAI() + + +async def main(): + stream = await client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Count to 50."}], + stream=True, + ) + async for chunk in stream: + print(chunk.choices[0].delta.content or "", end="", flush=True) + + +asyncio.run(main()) diff --git a/examples/logging/openai_tools.py b/examples/logging/openai_tools.py new file mode 100644 index 00000000..97b2222c --- /dev/null +++ b/examples/logging/openai_tools.py @@ -0,0 +1,91 @@ +import json + +from log10.load import OpenAI + + +client = OpenAI() + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location, unit="fahrenheit"): + """Get the current weather in a given location""" + if "tokyo" in location.lower(): + return json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + return json.dumps({"location": "San Francisco", "temperature": "72", "unit": unit}) + elif "paris" in location.lower(): + return json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + return json.dumps({"location": location, "temperature": "unknown"}) + + +def run_conversation(): + # Step 1: send the conversation and available functions to the model + messages = [ + { + "role": "user", + "content": "What's the weather like in San Francisco, Tokyo, and Paris?", + } + ] + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + } + ] + response = client.chat.completions.create( + model="gpt-3.5-turbo-0125", + messages=messages, + tools=tools, + tool_choice="auto", # auto is default, but we'll be explicit + ) + response_message = response.choices[0].message + tool_calls = response_message.tool_calls + # Step 2: check if the model wanted to call a function + if tool_calls: + # Step 3: call the function + # Note: the JSON response may not always be valid; be sure to handle errors + available_functions = { + "get_current_weather": get_current_weather, + } # only one function in this example, but you can have multiple + messages.append(response_message) # extend conversation with assistant's reply + # Step 4: send the info for each function call and function response to the model + for tool_call in tool_calls: + function_name = tool_call.function.name + function_to_call = available_functions[function_name] + function_args = json.loads(tool_call.function.arguments) + function_response = function_to_call( + location=function_args.get("location"), + unit=function_args.get("unit"), + ) + messages.append( + { + "tool_call_id": tool_call.id, + "role": "tool", + "name": function_name, + "content": function_response, + } + ) # extend conversation with function response + second_response = client.chat.completions.create( + model="gpt-3.5-turbo-0125", + messages=messages, + ) # get a new response from the model where it can see the function response + return second_response + + +print(run_conversation()) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py new file mode 100644 index 00000000..b3fd214d --- /dev/null +++ b/log10/_httpx_utils.py @@ -0,0 +1,214 @@ +import json +import logging +import time +import traceback + +import httpx +from httpx import Request, Response + +from log10.llm import Log10Config +from log10.load import global_tags, sessionID + + +logger: logging.Logger = logging.getLogger("LOG10") + + +_log10_config = Log10Config() +base_url = _log10_config.url +httpx_client = httpx.Client() + + +def _try_post_request(url: str, payload: dict = {}) -> httpx.Response: + headers = { + "x-log10-token": _log10_config.token, + "x-log10-organization-id": _log10_config.org_id, + "Content-Type": "application/json", + } + payload["organization_id"] = _log10_config.org_id + res = None + try: + res = httpx_client.post(url, headers=headers, json=payload) + res.raise_for_status() + return res + except httpx.HTTPError as http_err: + if "401" in str(http_err): + logger.error( + "Failed anthorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again." + + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" + ) + else: + logger.error(f"Failed with error: {http_err}") + except Exception as err: + logger.error(f"Failed to insert in log10: {payload} with error {err}") + + +async def get_completion_id(request: Request): + if "v1/chat/completions" not in str(request.url): + logger.warning("Currently logging is only available for v1/chat/completions.") + return + + completion_url = "/api/completions" + res = _try_post_request(url=f"{base_url}{completion_url}") + try: + completion_id = res.json().get("completionID") + except Exception as e: + logger.error(f"Failed to get completion ID. Error: {e}. Skipping completion recording.") + else: + request.headers["x-log10-completion-id"] = completion_id + + +async def log_request(request: Request): + start_time = time.time() + request.started = start_time + completion_id = request.headers.get("x-log10-completion-id", "") + if not completion_id: + return + + orig_module = "" + orig_qualname = "" + if "chat" in str(request.url): + kind = "chat" + orig_module = "openai.api_resources.chat_completion" + orig_qualname = "ChatCompletion.create" + else: + kind = "completion" + orig_module = "openai.api_resources.completion" + orig_qualname = "Completion.create" + log_row = { + "status": "started", + "kind": kind, + "orig_module": orig_module, + "orig_qualname": orig_qualname, + "request": request.content.decode("utf-8"), + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + + +class _LogResponse(Response): + async def aiter_bytes(self, *args, **kwargs): + full_response = "" + finished = False + async for chunk in super().aiter_bytes(*args, **kwargs): + full_response += chunk.decode(errors="ignore") + + if "data: [DONE]" in full_response: + finished = True + yield chunk + + completion_id = self.request.headers.get("x-log10-completion-id", "") + if finished and completion_id: + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + full_content = "" + function_name = "" + full_argument = "" + responses = full_response.split("\n\n") + for r in responses: + if "data: [DONE]" in r: + break + + r_json = json.loads(r[6:]) + + delta = r_json["choices"][0]["delta"] + + # Delta may have content + if "content" in delta: + content = delta["content"] + if content: + full_content += content + + # May be a function call, and have to reconstruct the arguments + if "function_call" in delta: + # May be function name + if "name" in delta["function_call"]: + function_name = delta["function_call"]["name"] + # May be function arguments + if "arguments" in delta["function_call"]: + full_argument += delta["function_call"]["arguments"] + + response_json = r_json.copy() + response_json["object"] = "completion" + + # If finish_reason is function_call - don't log the response + if not ( + "choices" in response_json + and response_json["choices"] + and response_json["choices"][0]["finish_reason"] == "function_call" + ): + response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} + else: + response_json["choices"][0]["function_call"] = { + "name": function_name, + "arguments": full_argument, + } + + log_row = { + "response": json.dumps(response_json), + "status": "finished", + "duration": int(time.time() - self.request.started) * 1000, + "stacktrace": json.dumps(stacktrace), + "kind": "chat", + "request": self.request.content.decode("utf-8"), + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + + +class _LogTransport(httpx.AsyncBaseTransport): + def __init__(self, transport: httpx.AsyncBaseTransport): + self.transport = transport + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + response = await self.transport.handle_async_request(request) + + completion_id = request.headers.get("x-log10-completion-id", "") + if not completion_id: + return response + + if response.headers.get("content-type") == "application/json": + await response.aread() + llm_response = response.json() + + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + + elapsed = time.time() - request.started + log_row = { + "response": json.dumps(llm_response), + "status": "finished", + "duration": int(elapsed * 1000), + "stacktrace": json.dumps(stacktrace), + "kind": "chat", + "request": request.content.decode("utf-8"), + "session_id": sessionID, + "tags": global_tags, + } + _try_post_request(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) + return response + elif response.headers.get("content-type") == "text/event-stream": + return _LogResponse( + status_code=response.status_code, + headers=response.headers, + stream=response.stream, + extensions=response.extensions, + request=request, + ) diff --git a/log10/load.py b/log10/load.py index bde1d6d6..1a7e39e3 100644 --- a/log10/load.py +++ b/log10/load.py @@ -207,6 +207,9 @@ async def log_async(completion_url, func, **kwargs): if "api_key" in kwargs: kwargs.pop("api_key") + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) + log_row = { # do we want to also store args? "status": "started", @@ -216,6 +219,7 @@ async def log_async(completion_url, func, **kwargs): "session_id": sessionID, "tags": global_tags, } + if target_service == "log10": try: res = post_request(completion_url + "/" + completionID, log_row) @@ -228,7 +232,7 @@ async def log_async(completion_url, func, **kwargs): # NOTE: We only save on request finalization. except Exception as e: - logging.warn(f"LOG10: failed to log: {e}. SKipping") + logging.warn(f"LOG10: failed to log: {e}. Skipping") return None return completionID @@ -296,6 +300,8 @@ def __init__(self, completion_url, completionID, response, partial_log_row): self.partial_log_row = partial_log_row self.response = response self.final_result = "" # Store the final result + self.function_name = "" + self.function_arguments = "" self.start_time = time.perf_counter() self.gpt_id = None self.model = None @@ -307,14 +313,20 @@ def __iter__(self): def __next__(self): try: chunk = next(self.response) - content = chunk.choices[0].delta.content - if content: + # import ipdb; ipdb.set_trace() + if chunk.choices[0].delta.content: # Here you can intercept and modify content if needed + content = chunk.choices[0].delta.content self.final_result += content # Save the content # Yield the original or modified content self.model = chunk.model self.gpt_id = chunk.id + elif chunk.choices[0].delta.function_call: + arguments = chunk.choices[0].delta.function_call.arguments + self.function_arguments += arguments + if not self.function_name and chunk.choices[0].delta.function_call.name: + self.function_name = chunk.choices[0].delta.function_call.name else: self.finish_reason = chunk.choices[0].finish_reason @@ -322,21 +334,38 @@ def __next__(self): except StopIteration as se: # Log the final result # Create fake response for openai format. - response = { - "id": self.gpt_id, - "object": "completion", - "model": self.model, - "choices": [ - { - "index": 0, - "finish_reason": self.finish_reason, - "message": { - "role": "assistant", - "content": self.final_result, - }, - } - ], - } + if self.final_result: + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "message": { + "role": "assistant", + "content": self.final_result, + }, + } + ], + } + elif self.function_arguments: + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "function_call": { + "name": self.function_name, + "arguments": self.function_arguments, + }, + } + ], + } self.partial_log_row["response"] = json.dumps(response) self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) @@ -351,6 +380,25 @@ def __next__(self): raise se +def flatten_messages(messages): + flat_messages = [] + for message in messages: + if isinstance(message, dict): + flat_messages.append(message) + else: + flat_messages.append(message.model_dump()) + return flat_messages + + +def flatten_response(response): + if "choices" in response: + # May have to flatten, if not a dictionary + if not isinstance(response.choices[0].message, dict): + response.choices[0].message = response.choices[0].message.model_dump() + + return response + + def intercepting_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -466,6 +514,14 @@ def wrapper(*args, **kwargs): response = output kind = "chat" if output.object == "chat.completion" else "completion" + # We may have to flatten messages from their ChatCompletionMessage with nested ChatCompletionMessageToolCall to json serializable format + # Rewrite in-place + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) + + if "choices" in response: + response = flatten_response(response) + # in case the usage of load(openai) and langchain.ChatOpenAI if "api_key" in kwargs: kwargs.pop("api_key") @@ -474,6 +530,7 @@ def wrapper(*args, **kwargs): response = response.model_dump_json() else: response = json.dumps(response) + log_row = { "response": response, "status": "finished", @@ -535,7 +592,8 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): Openai V0 example: Example: - >>> from log10.load import log10 # xdoctest: +SKIP + >>> # xdoctest: +SKIP + >>> from log10.load import log10 >>> import openai >>> log10(openai) >>> completion = openai.Completion.create( @@ -546,7 +604,8 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): >>> print(completion) Example: - >>> from log10.load import log10 # xdoctest: +SKIP + >>> # xdoctest: +SKIP + >>> from log10.load import log10 >>> import openai >>> log10(openai) >>> completion = openai.ChatCompletion.create( @@ -643,6 +702,33 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): attr = module.resources.chat.completions.Completions method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) + + # support for async completions + # patch module.AsyncOpenAI.__init__ to new_init + origin_init = module.AsyncOpenAI.__init__ + + def new_init(self, *args, **kwargs): + logger.debug("LOG10: patching AsyncOpenAI.__init__") + import httpx + + from log10._httpx_utils import ( + _LogTransport, + get_completion_id, + log_request, + ) + + event_hooks = { + "request": [get_completion_id, log_request], + } + async_httpx_client = httpx.AsyncClient( + event_hooks=event_hooks, + transport=_LogTransport(httpx.AsyncHTTPTransport()), + ) + kwargs["http_client"] = async_httpx_client + origin_init(self, *args, **kwargs) + + module.AsyncOpenAI.__init__ = new_init + else: attr = module.api_resources.completion.Completion method = getattr(attr, "create")