diff --git a/examples/logging/magentic_async_function_logging.py b/examples/logging/magentic_async_function_logging.py new file mode 100644 index 00000000..81641c7a --- /dev/null +++ b/examples/logging/magentic_async_function_logging.py @@ -0,0 +1,34 @@ +import openai +from magentic import AsyncStreamedStr, FunctionCall, prompt + +from log10.load import log10 + + +log10(openai) + + +def add(x: int, y: int) -> int: + """Add together two numbers.""" + return x + y + + +@prompt("What is 1+1? Use tools", functions=[add]) +async def agent() -> AsyncStreamedStr: + ... + + +# Define an async main function +async def main(): + response = await agent() + if isinstance(response, FunctionCall): + print(response) + else: + async for chunk in response: + print(chunk, end="", flush=True) + + +# Running the main function using asyncio +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/logging/magentic_function_logging.py b/examples/logging/magentic_function_logging.py new file mode 100644 index 00000000..d1c4d2c4 --- /dev/null +++ b/examples/logging/magentic_function_logging.py @@ -0,0 +1,31 @@ +# taken from Magentic README +# https://github.com/jackmpcollins/magentic/blob/2493419f2db3a3be58fb308d7df51a51bf1989c1/README.md#usage + +from typing import Literal + +import openai +from magentic import FunctionCall, prompt + +from log10.load import log10 + + +log10(openai) + + +def activate_oven(temperature: int, mode: Literal["broil", "bake", "roast"]) -> str: + """Turn the oven on with the provided settings.""" + return f"Preheating to {temperature} F with mode {mode}" + + +@prompt( + "Prepare the oven so I can make {food}", + functions=[activate_oven], +) +def configure_oven(food: str) -> FunctionCall[str]: + ... + + +output = configure_oven("cookies!") +# FunctionCall(, temperature=350, mode='bake') +print(output()) +# 'Preheating to 350 F with mode bake' diff --git a/examples/logging/openai_tools.py b/examples/logging/openai_tools.py new file mode 100644 index 00000000..97b2222c --- /dev/null +++ b/examples/logging/openai_tools.py @@ -0,0 +1,91 @@ +import json + +from log10.load import OpenAI + + +client = OpenAI() + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location, unit="fahrenheit"): + """Get the current weather in a given location""" + if "tokyo" in location.lower(): + return json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + return json.dumps({"location": "San Francisco", "temperature": "72", "unit": unit}) + elif "paris" in location.lower(): + return json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + return json.dumps({"location": location, "temperature": "unknown"}) + + +def run_conversation(): + # Step 1: send the conversation and available functions to the model + messages = [ + { + "role": "user", + "content": "What's the weather like in San Francisco, Tokyo, and Paris?", + } + ] + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + } + ] + response = client.chat.completions.create( + model="gpt-3.5-turbo-0125", + messages=messages, + tools=tools, + tool_choice="auto", # auto is default, but we'll be explicit + ) + response_message = response.choices[0].message + tool_calls = response_message.tool_calls + # Step 2: check if the model wanted to call a function + if tool_calls: + # Step 3: call the function + # Note: the JSON response may not always be valid; be sure to handle errors + available_functions = { + "get_current_weather": get_current_weather, + } # only one function in this example, but you can have multiple + messages.append(response_message) # extend conversation with assistant's reply + # Step 4: send the info for each function call and function response to the model + for tool_call in tool_calls: + function_name = tool_call.function.name + function_to_call = available_functions[function_name] + function_args = json.loads(tool_call.function.arguments) + function_response = function_to_call( + location=function_args.get("location"), + unit=function_args.get("unit"), + ) + messages.append( + { + "tool_call_id": tool_call.id, + "role": "tool", + "name": function_name, + "content": function_response, + } + ) # extend conversation with function response + second_response = client.chat.completions.create( + model="gpt-3.5-turbo-0125", + messages=messages, + ) # get a new response from the model where it can see the function response + return second_response + + +print(run_conversation()) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 87d61939..b3fd214d 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -110,18 +110,48 @@ async def aiter_bytes(self, *args, **kwargs): for frame in current_stack_frame ] full_content = "" + function_name = "" + full_argument = "" responses = full_response.split("\n\n") for r in responses: if "data: [DONE]" in r: break r_json = json.loads(r[6:]) - content = r_json["choices"][0]["delta"].get("content", "") - if content: - full_content += content + + delta = r_json["choices"][0]["delta"] + + # Delta may have content + if "content" in delta: + content = delta["content"] + if content: + full_content += content + + # May be a function call, and have to reconstruct the arguments + if "function_call" in delta: + # May be function name + if "name" in delta["function_call"]: + function_name = delta["function_call"]["name"] + # May be function arguments + if "arguments" in delta["function_call"]: + full_argument += delta["function_call"]["arguments"] + response_json = r_json.copy() response_json["object"] = "completion" - response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} + + # If finish_reason is function_call - don't log the response + if not ( + "choices" in response_json + and response_json["choices"] + and response_json["choices"][0]["finish_reason"] == "function_call" + ): + response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} + else: + response_json["choices"][0]["function_call"] = { + "name": function_name, + "arguments": full_argument, + } + log_row = { "response": json.dumps(response_json), "status": "finished", diff --git a/log10/load.py b/log10/load.py index 03b02b9f..df08eb3a 100644 --- a/log10/load.py +++ b/log10/load.py @@ -207,6 +207,9 @@ async def log_async(completion_url, func, **kwargs): if "api_key" in kwargs: kwargs.pop("api_key") + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) + log_row = { # do we want to also store args? "status": "started", @@ -216,6 +219,7 @@ async def log_async(completion_url, func, **kwargs): "session_id": sessionID, "tags": global_tags, } + if target_service == "log10": try: res = post_request(completion_url + "/" + completionID, log_row) @@ -228,7 +232,7 @@ async def log_async(completion_url, func, **kwargs): # NOTE: We only save on request finalization. except Exception as e: - logging.warn(f"LOG10: failed to log: {e}. SKipping") + logging.warn(f"LOG10: failed to log: {e}. Skipping") return None return completionID @@ -296,6 +300,8 @@ def __init__(self, completion_url, completionID, response, partial_log_row): self.partial_log_row = partial_log_row self.response = response self.final_result = "" # Store the final result + self.function_name = "" + self.function_arguments = "" self.start_time = time.perf_counter() self.gpt_id = None self.model = None @@ -307,14 +313,20 @@ def __iter__(self): def __next__(self): try: chunk = next(self.response) - content = chunk.choices[0].delta.content - if content: + # import ipdb; ipdb.set_trace() + if chunk.choices[0].delta.content: # Here you can intercept and modify content if needed + content = chunk.choices[0].delta.content self.final_result += content # Save the content # Yield the original or modified content self.model = chunk.model self.gpt_id = chunk.id + elif chunk.choices[0].delta.function_call: + arguments = chunk.choices[0].delta.function_call.arguments + self.function_arguments += arguments + if not self.function_name and chunk.choices[0].delta.function_call.name: + self.function_name = chunk.choices[0].delta.function_call.name else: self.finish_reason = chunk.choices[0].finish_reason @@ -322,21 +334,38 @@ def __next__(self): except StopIteration as se: # Log the final result # Create fake response for openai format. - response = { - "id": self.gpt_id, - "object": "completion", - "model": self.model, - "choices": [ - { - "index": 0, - "finish_reason": self.finish_reason, - "message": { - "role": "assistant", - "content": self.final_result, - }, - } - ], - } + if self.final_result: + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "message": { + "role": "assistant", + "content": self.final_result, + }, + } + ], + } + elif self.function_arguments: + response = { + "id": self.gpt_id, + "object": "completion", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "function_call": { + "name": self.function_name, + "arguments": self.function_arguments, + }, + } + ], + } self.partial_log_row["response"] = json.dumps(response) self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) @@ -351,6 +380,25 @@ def __next__(self): raise se +def flatten_messages(messages): + flat_messages = [] + for message in messages: + if isinstance(message, dict): + flat_messages.append(message) + else: + flat_messages.append(message.model_dump()) + return flat_messages + + +def flatten_response(response): + if "choices" in response: + # May have to flatten, if not a dictionary + if not isinstance(response.choices[0].message, dict): + response.choices[0].message = response.choices[0].message.model_dump() + + return response + + def intercepting_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -466,6 +514,14 @@ def wrapper(*args, **kwargs): response = output kind = "chat" if output.object == "chat.completion" else "completion" + # We may have to flatten messages from their ChatCompletionMessage with nested ChatCompletionMessageToolCall to json serializable format + # Rewrite in-place + if "messages" in kwargs: + kwargs["messages"] = flatten_messages(kwargs["messages"]) + + if "choices" in response: + response = flatten_response(response) + # in case the usage of load(openai) and langchain.ChatOpenAI if "api_key" in kwargs: kwargs.pop("api_key") @@ -474,6 +530,7 @@ def wrapper(*args, **kwargs): response = response.model_dump_json() else: response = json.dumps(response) + log_row = { "response": response, "status": "finished", @@ -603,6 +660,9 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): global DEBUG, USE_ASYNC, sync_log_text DEBUG = DEBUG_ or os.environ.get("LOG10_DEBUG", False) logger.setLevel(logging.DEBUG if DEBUG else logging.WARNING) + if DEBUG: + openai_logger = logging.getLogger("openai") + openai_logger.setLevel(logging.DEBUG) USE_ASYNC = USE_ASYNC_ sync_log_text = set_sync_log_text(USE_ASYNC=USE_ASYNC) @@ -652,13 +712,18 @@ def new_init(self, *args, **kwargs): logger.debug("LOG10: patching AsyncOpenAI.__init__") import httpx - from log10._httpx_utils import _LogTransport, get_completion_id, log_request + from log10._httpx_utils import ( + _LogTransport, + get_completion_id, + log_request, + ) event_hooks = { "request": [get_completion_id, log_request], } async_httpx_client = httpx.AsyncClient( - event_hooks=event_hooks, transport=_LogTransport(httpx.AsyncHTTPTransport()) + event_hooks=event_hooks, + transport=_LogTransport(httpx.AsyncHTTPTransport()), ) kwargs["http_client"] = async_httpx_client origin_init(self, *args, **kwargs)