diff --git a/examples/logging/anthropic_async_messages.py b/examples/logging/anthropic_async_messages.py new file mode 100644 index 00000000..88268728 --- /dev/null +++ b/examples/logging/anthropic_async_messages.py @@ -0,0 +1,23 @@ +import asyncio + +import anthropic + +from log10.load import log10 + + +log10(anthropic) + +client = anthropic.AsyncAnthropic() + + +async def main() -> None: + message = await client.beta.tools.messages.create( + model="claude-instant-1.2", + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + print(message) + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_async_messages_stream.py b/examples/logging/anthropic_async_messages_stream.py new file mode 100644 index 00000000..77fdc6c1 --- /dev/null +++ b/examples/logging/anthropic_async_messages_stream.py @@ -0,0 +1,35 @@ +import asyncio + +import anthropic + +from log10.load import log10 + + +log10(anthropic) + +client = anthropic.AsyncAnthropic() + + +async def main() -> None: + async with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-haiku-20240307", + ) as stream: + async for text in stream.text_stream: + print(text, end="", flush=True) + print() + + # you can still get the accumulated final message outside of + # the context manager, as long as the entire stream was consumed + # inside of the context manager + accumulated = await stream.get_final_message() + print("accumulated message: ", accumulated.to_json()) + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_async_messages_stream_handler.py b/examples/logging/anthropic_async_messages_stream_handler.py new file mode 100644 index 00000000..382bbb6b --- /dev/null +++ b/examples/logging/anthropic_async_messages_stream_handler.py @@ -0,0 +1,38 @@ +import asyncio + +import anthropic +from anthropic import AsyncAnthropic, AsyncMessageStream +from anthropic.types import MessageStreamEvent +from typing_extensions import override + +from log10.load import log10 + + +log10(anthropic) + +client = AsyncAnthropic() + + +class MyStream(AsyncMessageStream): + @override + async def on_stream_event(self, event: MessageStreamEvent) -> None: + print("on_event fired with:", event) + + +async def main() -> None: + async with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-haiku-20240307", + event_handler=MyStream, + ) as stream: + accumulated = await stream.get_final_message() + print("accumulated message: ", accumulated.to_json()) + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_async_tools_stream.py b/examples/logging/anthropic_async_tools_stream.py new file mode 100644 index 00000000..f0eaff2d --- /dev/null +++ b/examples/logging/anthropic_async_tools_stream.py @@ -0,0 +1,47 @@ +import asyncio + +import anthropic +from anthropic import AsyncAnthropic + +from log10.load import log10 + + +log10(anthropic) + +client = AsyncAnthropic() + + +async def run_conversation(): + tools = [ + { + "name": "get_weather", + "description": "Get the weather in a given location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": 'The unit of temperature, either "celsius" or "fahrenheit"', + }, + }, + "required": ["location"], + }, + } + ] + async with client.beta.tools.messages.stream( + model="claude-3-haiku-20240307", + tools=tools, + messages=[ + { + "role": "user", + "content": "What's the weather like in San Francisco?", + } + ], + max_tokens=1024, + ) as stream: + await stream.until_done() + + +asyncio.run(run_conversation()) diff --git a/examples/logging/anthropic_async_tools_stream_handler.py b/examples/logging/anthropic_async_tools_stream_handler.py new file mode 100644 index 00000000..cf8826eb --- /dev/null +++ b/examples/logging/anthropic_async_tools_stream_handler.py @@ -0,0 +1,55 @@ +import asyncio + +import anthropic +from anthropic import AsyncAnthropic +from anthropic.lib.streaming.beta import AsyncToolsBetaMessageStream +from typing_extensions import override + +from log10.load import log10 + + +log10(anthropic) + + +client = AsyncAnthropic() + + +class MyHandler(AsyncToolsBetaMessageStream): + @override + async def on_input_json(self, delta: str, snapshot: object) -> None: + print(f"delta: {repr(delta)}") + print(f"snapshot: {snapshot}") + print() + + +async def main() -> None: + async with client.beta.tools.messages.stream( + max_tokens=1024, + model="claude-3-haiku-20240307", + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location.", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + event_handler=MyHandler, + ) as stream: + await stream.until_done() + + print() + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_messages_not_given.py b/examples/logging/anthropic_messages_not_given.py new file mode 100644 index 00000000..4d3573fe --- /dev/null +++ b/examples/logging/anthropic_messages_not_given.py @@ -0,0 +1,21 @@ +from anthropic import NOT_GIVEN + +from log10.load import Anthropic + + +client = Anthropic() + +completion = client.beta.tools.messages.create( + model="claude-3-haiku-20240307", + messages=[ + { + "role": "user", + "content": "tell a short joke.", + }, + ], + max_tokens=1000, + tools=NOT_GIVEN, + tool_choice=NOT_GIVEN, +) + +print(completion.content[0].text) diff --git a/examples/logging/anthropic_tools_stream.py b/examples/logging/anthropic_tools_stream.py new file mode 100644 index 00000000..08a1cd7f --- /dev/null +++ b/examples/logging/anthropic_tools_stream.py @@ -0,0 +1,35 @@ +import anthropic + +from log10.load import log10 + + +log10(anthropic) + + +client = anthropic.Anthropic() + +with client.beta.tools.messages.stream( + model="claude-3-haiku-20240307", + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + max_tokens=1024, +) as stream: + for message in stream: + print(message) diff --git a/examples/logging/magentic_async_chat_prompt.py b/examples/logging/magentic_async_chat_prompt.py new file mode 100644 index 00000000..06b3c107 --- /dev/null +++ b/examples/logging/magentic_async_chat_prompt.py @@ -0,0 +1,23 @@ +import asyncio + +import anthropic +from magentic import UserMessage, chatprompt +from magentic.chat_model.anthropic_chat_model import AnthropicChatModel + +from log10.load import log10 + + +log10(anthropic) + + +async def main(topic: str) -> str: + @chatprompt( + UserMessage(f"Tell me a joke about {topic}"), + model=AnthropicChatModel("claude-3-opus-20240229"), + ) + async def tell_joke(topic: str) -> str: ... + + print(await tell_joke(topic)) + + +asyncio.run(main("cats")) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 54efd286..7414fb52 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -108,9 +108,26 @@ async def _try_post_request_async(url: str, payload: dict = {}) -> httpx.Respons logger.error(f"Failed to insert in log10: {payload} with error {err}") +def format_anthropic_tools_request(request_content) -> str: + new_tools = [] + for tool in request_content["tools"]: + new_tool = { + "type": "function", + "function": {"name": tool["name"], "description": tool["description"], "parameters": tool["input_schema"]}, + } + new_tools.append(new_tool) + request_content["tools"] = new_tools + return json.dumps(request_content) + + async def get_completion_id(request: Request): - if "v1/chat/completions" not in str(request.url): - logger.warning("Currently logging is only available for v1/chat/completions.") + host = request.headers.get("host") + if "anthropic" in host and "/v1/messages" not in str(request.url): + logger.warning("Currently logging is only available for anthropic v1/messages.") + return + + if "openai" in host and "v1/chat/completions" not in str(request.url): + logger.warning("Currently logging is only available for openai v1/chat/completions.") return request.headers["x-log10-completion-id"] = str(uuid.uuid4()) @@ -125,20 +142,36 @@ async def log_request(request: Request): orig_module = "" orig_qualname = "" - if "chat" in str(request.url): + request_content_decode = request.content.decode("utf-8") + host = request.headers.get("host") + if "openai" in host: + if "chat" in str(request.url): + kind = "chat" + orig_module = "openai.api_resources.chat_completion" + orig_qualname = "ChatCompletion.create" + else: + kind = "completion" + orig_module = "openai.api_resources.completion" + orig_qualname = "Completion.create" + elif "anthropic" in host: kind = "chat" - orig_module = "openai.api_resources.chat_completion" - orig_qualname = "ChatCompletion.create" + request_content = json.loads(request_content_decode) + if "tools" in request_content: + orig_module = "anthropic.resources.beta.tools" + orig_qualname = "Messages.stream" + request_content_decode = format_anthropic_tools_request(request_content) + else: + orig_module = "anthropic.resources.messages" + orig_qualname = "Messages.stream" else: - kind = "completion" - orig_module = "openai.api_resources.completion" - orig_qualname = "Completion.create" + logger.warning("Currently logging is only available for async openai and anthropic.") + return log_row = { "status": "started", "kind": kind, "orig_module": orig_module, "orig_qualname": orig_qualname, - "request": request.content.decode("utf-8"), + "request": request_content_decode, "session_id": session_id_var.get(), } if get_log10_session_tags(): @@ -147,13 +180,19 @@ async def log_request(request: Request): class _LogResponse(Response): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.full_content = "" + self.function_name = "" + self.full_argument = "" + self.tool_calls = [] + async def aiter_bytes(self, *args, **kwargs): full_response = "" finished = False async for chunk in super().aiter_bytes(*args, **kwargs): full_response += chunk.decode(errors="ignore") - - if "data: [DONE]" in full_response: + if self.is_response_end_reached(full_response): finished = True duration = int(time.time() - self.request.started) * 1000 @@ -169,67 +208,41 @@ async def aiter_bytes(self, *args, **kwargs): } for frame in current_stack_frame ] - full_content = "" - function_name = "" - full_argument = "" - tool_calls = [] + responses = full_response.split("\n\n") - for r in responses: - if "data: [DONE]" in r: - break - - r_json = json.loads(r[6:]) - - delta = r_json["choices"][0]["delta"] - - # Delta may have content - if "content" in delta: - content = delta["content"] - if content: - full_content += content - - # May be a function call, and have to reconstruct the arguments - if "function_call" in delta: - # May be function name - if "name" in delta["function_call"]: - function_name = delta["function_call"]["name"] - # May be function arguments - if "arguments" in delta["function_call"]: - full_argument += delta["function_call"]["arguments"] - - if tc := delta.get("tool_calls", []): - if tc[0].get("id", ""): - tool_calls.append(tc[0]) - elif tc[0].get("function", {}).get("arguments", ""): - idx = tc[0].get("index") - tool_calls[idx]["function"]["arguments"] += tc[0]["function"]["arguments"] + r_json = self.parse_response_data(responses) response_json = r_json.copy() - response_json["object"] = "chat.completion" # r_json is the last response before "data: [DONE]" - if full_content: - response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} - elif tool_calls: + if self.full_content: + response_json["choices"][0]["message"] = {"role": "assistant", "content": self.full_content} + elif self.tool_calls: response_json["choices"][0]["message"] = { "content": None, "role": "assistant", - "tool_calls": tool_calls, + "tool_calls": self.tool_calls, } - elif function_name and full_argument: + elif self.function_name and self.full_argument: # function is deprecated in openai api response_json["choices"][0]["function_call"] = { - "name": function_name, - "arguments": full_argument, + "name": self.function_name, + "arguments": self.full_argument, } + request_content_decode = self.request.content.decode("utf-8") + if "anthropic" in self.request.headers.get("host"): + request_content = json.loads(request_content_decode) + if "tools" in request_content: + request_content_decode = format_anthropic_tools_request(request_content) + log_row = { "response": json.dumps(response_json), "status": "finished", "duration": duration, "stacktrace": json.dumps(stacktrace), "kind": "chat", - "request": self.request.content.decode("utf-8"), + "request": request_content_decode, "session_id": session_id_var.get(), } if get_log10_session_tags(): @@ -237,13 +250,153 @@ async def aiter_bytes(self, *args, **kwargs): await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) yield chunk + def is_response_end_reached(self, text: str): + host = self.request.headers.get("host") + if "anthropic" in host: + return self.is_anthropic_response_end_reached(text) + elif "openai" in host: + return self.is_openai_response_end_reached(text) + else: + logger.warning("Currently logging is only available for async openai and anthropic.") + return False + + def is_anthropic_response_end_reached(self, text: str): + return "event: message_stop" in text + + def is_openai_response_end_reached(self, text: str): + return "data: [DONE]" in text + + def parse_anthropic_responses(self, responses: list[str]): + message_id = None + model = None + finish_reason = None + input_tokens = 0 + output_tokens = 0 + tool_call = {} + arguments = "" + for r in responses: + if not r: + break + + data_index = r.find("data:") + r_json = json.loads(r[data_index + len("data:") :]) + + ### anthropic first data contains + ### {"event":"message_start","data":{"message":{"role":"user","content":"Hello, how are you today?"}}} + type = r_json["type"] + if type == "message_start": + message_id = r_json["message"]["id"] + model = r_json["message"]["model"] + input_tokens = r_json["message"]["usage"]["input_tokens"] + elif type == "content_block_start": + content_block = r_json["content_block"] + type = content_block["type"] + if type == "tool_use": + id = content_block["id"] + tool_call = { + "id": id, + "type": "function", + "function": {"name": content_block["name"], "arguments": ""}, + } + if "text" in content_block: + self.full_content += content_block["text"] + elif type == "content_block_delta": + delta = r_json["delta"] + if "text" in delta: + self.full_content += delta["text"] + if "partial_json" in delta: + if self.full_content: + self.full_content += delta["partial_json"] + else: + arguments += delta["partial_json"] + elif type == "message_delta": + finish_reason = r_json["delta"]["stop_reason"] + output_tokens = r_json["usage"]["output_tokens"] + elif type == "content_block_end" or type == "message_end": + if tool_call: + tool_call["function"]["arguments"] = arguments + self.tool_calls.append(tool_call) + tool_call = {} + arguments = "" + + return { + "id": message_id, + "object": "chat", + "model": model, + "choices": [ + { + "index": 0, + "finish_reason": finish_reason, + } + ], + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + } + + def parse_openai_responses(self, responses: list[str]): + r_json = None + for r in responses: + if self.is_openai_response_end_reached(r): + break + + # loading the substring of response text after 'data: '. + # example: 'data: {"choices":[{"text":"Hello, how can I help you today?"}]}' + r_json = json.loads(r[6:]) + delta = r_json["choices"][0]["delta"] + + # Delta may have content + if "content" in delta: + content = delta["content"] + if content: + self.full_content += content + + # May be a function call, and have to reconstruct the arguments + if "function_call" in delta: + # May be function name + if "name" in delta["function_call"]: + self.function_name = delta["function_call"]["name"] + # May be function arguments + if "arguments" in delta["function_call"]: + self.full_argument += delta["function_call"]["arguments"] + + if tc := delta.get("tool_calls", []): + if tc[0].get("id", ""): + self.tool_calls.append(tc[0]) + elif tc[0].get("function", {}).get("arguments", ""): + idx = tc[0].get("index") + self.tool_calls[idx]["function"]["arguments"] += tc[0]["function"]["arguments"] + + r_json["object"] = "chat.completion" + return r_json + + def parse_response_data(self, responses: list[str]): + host = self.request.headers.get("host") + if "openai" in host: + return self.parse_openai_responses(responses) + elif "anthropic" in host: + return self.parse_anthropic_responses(responses) + else: + logger.warning("Currently logging is only available for async openai and anthropic.") + return None + class _LogTransport(httpx.AsyncBaseTransport): def __init__(self, transport: httpx.AsyncBaseTransport): self.transport = transport async def handle_async_request(self, request: httpx.Request) -> httpx.Response: - response = await self.transport.handle_async_request(request) + try: + response = await self.transport.handle_async_request(request) + except Exception as e: + logger.warning(f"Failed to send request: {e}") + return + + if response.status_code >= 400: + logger.warning(f"HTTP error occurred: {response.status_code}") + return completion_id = request.headers.get("x-log10-completion-id", "") if not completion_id: @@ -265,6 +418,15 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: ] elapsed = time.time() - request.started + if "anthropic" in request.url.host: + from anthropic.types.beta.tools import ( + ToolsBetaMessage, + ) + + from log10.anthropic import Anthropic + + llm_response = Anthropic.prepare_response(ToolsBetaMessage(**llm_response)) + log_row = { "response": json.dumps(llm_response), "status": "finished", diff --git a/log10/anthropic.py b/log10/anthropic.py index 3e8b2716..bb05f407 100644 --- a/log10/anthropic.py +++ b/log10/anthropic.py @@ -4,6 +4,9 @@ import anthropic from anthropic import AI_PROMPT, HUMAN_PROMPT +from anthropic.types.beta.tools import ( + ToolsBetaMessage, +) from log10.llm import LLM, ChatCompletion, Kind, Message, TextCompletion from log10.utils import merge_hparams @@ -133,9 +136,12 @@ def create_tokens_usage(prompt: str, completion: str): @staticmethod def prepare_response( - response: anthropic.types.Completion | anthropic.types.Message, input_prompt: str = "" + response: anthropic.types.Completion | anthropic.types.Message | ToolsBetaMessage, input_prompt: str = "" ) -> dict: - if response.stop_reason in ["stop_sequence", "end_turn"]: + if not hasattr(response, "stop_reason"): + return None + + if response.stop_reason in ["stop_sequence", "end_turn", "tool_use"]: reason = "stop" elif response.stop_reason == "max_tokens": reason = "length" @@ -151,6 +157,7 @@ def prepare_response( } ], } + if isinstance(response, anthropic.types.Message): tokens_usage = { "prompt_tokens": response.usage.input_tokens, @@ -161,6 +168,13 @@ def prepare_response( elif isinstance(response, anthropic.types.Completion): tokens_usage = Anthropic.create_tokens_usage(input_prompt, response.completion) ret_response["choices"][0]["text"] = response.completion + elif isinstance(response, ToolsBetaMessage): + tokens_usage = { + "prompt_tokens": response.usage.input_tokens, + "completion_tokens": response.usage.output_tokens, + "total_tokens": response.usage.input_tokens + response.usage.output_tokens, + } + ret_response["choices"][0]["message"] = {"role": response.role, "content": response.content[0].text} ret_response["usage"] = tokens_usage return ret_response diff --git a/log10/load.py b/log10/load.py index 6c52f236..86dcefdd 100644 --- a/log10/load.py +++ b/log10/load.py @@ -394,6 +394,14 @@ class AnthropicStreamingResponseWrapper: Wraps a streaming response object to log the final result and duration to log10. """ + def __enter__(self): + self.response = self.response.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.response.__exit__(exc_type, exc_value, traceback) + return + def __init__(self, completion_url, completionID, response, partial_log_row): self.completionID = completionID self.completion_url = completion_url @@ -412,16 +420,26 @@ def __iter__(self): def __next__(self): chunk = next(self.response) + self._process_chunk(chunk) + return chunk + + def _process_chunk(self, chunk): if chunk.type == "message_start": self.model = chunk.message.model self.message_id = chunk.message.id self.input_tokens = chunk.message.usage.input_tokens + if chunk.type == "content_block_start": + if hasattr(chunk.content_block, "text"): + self.final_result += chunk.content_block.text elif chunk.type == "message_delta": self.finish_reason = chunk.delta.stop_reason self.output_tokens = chunk.usage.output_tokens elif chunk.type == "content_block_delta": - self.final_result += chunk.delta.text - elif chunk.type == "message_stop": + if hasattr(chunk.delta, "text"): + self.final_result += chunk.delta.text + if hasattr(chunk.delta, "partial_json"): + self.final_result += chunk.delta.partial_json + elif chunk.type == "message_stop" or chunk.type == "content_block_stop": response = { "id": self.message_id, "object": "chat", @@ -450,8 +468,6 @@ def __next__(self): if res.status_code != 200: logger.error(f"Failed to insert in log10: {self.partial_log_row} with error {res.text}. Skipping") - return chunk - def flatten_messages(messages): flat_messages = [] @@ -509,6 +525,9 @@ def _init_log_row(func, *args, **kwargs): # kind and request are set based on the module and qualname # request is based on openai schema if "anthropic" in func.__module__: + from anthropic._utils._utils import strip_not_given + + kwargs_copy = strip_not_given(kwargs_copy) log_row["kind"] = "chat" if "message" in func.__module__ else "completion" # set system message if "system" in kwargs_copy: @@ -530,6 +549,17 @@ def _init_log_row(func, *args, **kwargs): else: new_content.append(c) m["content"] = new_content + if "tools" in kwargs_copy: + for t in kwargs_copy["tools"]: + new_function = { + "name": t.get("name", None), + "description": t.get("description", None), + "parameters": { + "properties": t.get("input_schema", {}).get("properties", None), + }, + } + t["function"] = new_function + t.pop("input_schema", None) elif "vertexai" in func.__module__: if func.__name__ == "_send_message": # get model name save in ChatSession instance @@ -644,7 +674,7 @@ def wrapper(*args, **kwargs): completionID = result_queue.get() if completionID is None: - logging.warn(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") + logger.warning(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") return logger.debug(f"LOG10: failed - {e}") @@ -660,7 +690,7 @@ def wrapper(*args, **kwargs): try: res = post_request(completion_url + "/" + completionID, log_row) except Exception as le: - logging.warn(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") + logger.warning(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") raise e else: # finished with no exceptions @@ -674,7 +704,7 @@ def wrapper(*args, **kwargs): response = output # Adjust the Anthropic output to match OAI completion output if "anthropic" in func.__module__: - if type(output).__name__ == "Stream": + if type(output).__name__ == "Stream" or "MessageStreamManager" in type(output).__name__: log_row["response"] = response log_row["status"] = "finished" return AnthropicStreamingResponseWrapper( @@ -683,6 +713,7 @@ def wrapper(*args, **kwargs): response=response, partial_log_row=log_row, ) + from log10.anthropic import Anthropic response = Anthropic.prepare_response(output, input_prompt=kwargs.get("prompt", "")) @@ -941,6 +972,42 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): attr = module.resources.messages.Messages method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) + + attr = module.resources.messages.Messages + method = getattr(attr, "stream") + setattr(attr, "stream", intercepting_decorator(method)) + + attr = module.resources.beta.tools.Messages + method = getattr(attr, "create") + setattr(attr, "create", intercepting_decorator(method)) + + attr = module.resources.beta.tools.Messages + method = getattr(attr, "stream") + setattr(attr, "stream", intercepting_decorator(method)) + + origin_init = module.AsyncAnthropic.__init__ + + def new_init(self, *args, **kwargs): + logger.debug("LOG10: patching AsyncAnthropic.__init__") + import httpx + + from log10._httpx_utils import ( + _LogTransport, + get_completion_id, + log_request, + ) + + event_hooks = { + "request": [get_completion_id, log_request], + } + async_httpx_client = httpx.AsyncClient( + event_hooks=event_hooks, + transport=_LogTransport(httpx.AsyncHTTPTransport()), + ) + kwargs["http_client"] = async_httpx_client + origin_init(self, *args, **kwargs) + + module.AsyncAnthropic.__init__ = new_init elif module.__name__ == "lamini": attr = module.api.utils.completion.Completion method = getattr(attr, "generate") diff --git a/poetry.lock b/poetry.lock index 136b6e62..b662b3cb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -123,13 +123,13 @@ files = [ [[package]] name = "anthropic" -version = "0.25.7" +version = "0.26.0" description = "The official Python library for the anthropic API" optional = false python-versions = ">=3.7" files = [ - {file = "anthropic-0.25.7-py3-none-any.whl", hash = "sha256:419a276eb20cfb7ddaac03c7e28e4e12df3ace71bcf33071a68c9a03c0dfcbdd"}, - {file = "anthropic-0.25.7.tar.gz", hash = "sha256:e7de4c8ba8e7e8248ad7f05ed9176634780b95b67c678d23915d8964c8a26f4e"}, + {file = "anthropic-0.26.0-py3-none-any.whl", hash = "sha256:38fc415561d71dcf263b89da0cc6ecec498379b56256fc4242e9128bc707b283"}, + {file = "anthropic-0.26.0.tar.gz", hash = "sha256:6aaffeb05d515cf9788eef57150a5f827f3786883628ccac71dbe5671ab6f44e"}, ] [package.dependencies] diff --git a/tests/pytest.ini b/tests/pytest.ini index 956f1de7..3448f112 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -16,3 +16,4 @@ markers = tools: mark a test as a tools test vision: mark a test as a vision test widget: mark a test as a widget test + context_manager: mark a test as a context manager test diff --git a/tests/test_anthropic.py b/tests/test_anthropic.py index 166b3b75..6727b95d 100644 --- a/tests/test_anthropic.py +++ b/tests/test_anthropic.py @@ -3,16 +3,20 @@ import anthropic import httpx import pytest +from anthropic import NOT_GIVEN +from anthropic.lib.streaming.beta import AsyncToolsBetaMessageStream +from typing_extensions import override from log10.load import log10 log10(anthropic) -client = anthropic.Anthropic() @pytest.mark.chat -def test_messages(anthropic_model): +def test_messages_create(anthropic_model): + client = anthropic.Anthropic() + message = client.messages.create( model=anthropic_model, max_tokens=1000, @@ -26,9 +30,30 @@ def test_messages(anthropic_model): assert text, "No output from the model." +@pytest.mark.chat +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_messages_create_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + message = await client.messages.create( + model=anthropic_model, + max_tokens=1000, + temperature=0.0, + system="Respond only in Yoda-speak.", + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert isinstance(text, str) + assert text, "No output from the model." + + @pytest.mark.chat @pytest.mark.stream -def test_messages_stream(anthropic_model): +def test_messages_create_stream(anthropic_model): + client = anthropic.Anthropic() + stream = client.messages.create( model=anthropic_model, messages=[ @@ -85,3 +110,190 @@ def test_messages_image(anthropic_model): text = message.content[0].text assert text, "No output from the model." assert "ant" in text + + +@pytest.mark.chat +def test_chat_not_given(anthropic_model): + client = anthropic.Anthropic() + + message = client.beta.tools.messages.create( + model=anthropic_model, + messages=[ + { + "role": "user", + "content": "tell a short joke.", + }, + ], + max_tokens=1000, + tools=NOT_GIVEN, + tool_choice=NOT_GIVEN, + ) + + content = message.content[0].text + assert isinstance(content, str) + assert content, "No output from the model." + + +@pytest.mark.chat +def test_beta_tools_messages_create(anthropic_model): + client = anthropic.Anthropic() + + message = client.beta.tools.messages.create( + model=anthropic_model, + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert text, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_beta_tools_messages_create_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + message = await client.beta.tools.messages.create( + model=anthropic_model, + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert text, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.stream +@pytest.mark.context_manager +def test_messages_stream_context_manager(anthropic_model): + client = anthropic.Anthropic() + + output = "" + with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model=anthropic_model, + ) as stream: + for message in stream: + if message.type == "content_block_delta": + if message.delta: + if hasattr(message.delta, "text"): + output += message.delta.text + + assert output, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.stream +@pytest.mark.context_manager +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_messages_stream_context_manager_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + output = "" + async with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model=anthropic_model, + ) as stream: + async for text in stream.text_stream: + output += text + + assert output, "No output from the model." + + +@pytest.mark.tools +@pytest.mark.stream +@pytest.mark.context_manager +def test_tools_messages_stream_context_manager(anthropic_model): + client = anthropic.Anthropic() + output = "" + with client.beta.tools.messages.stream( + model=anthropic_model, + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + max_tokens=1024, + ) as stream: + for message in stream: + if message.type == "content_block_delta": + if message.delta: + if hasattr(message.delta, "text"): + output += message.delta.text + if hasattr(message.delta, "partial_json"): + output += message.delta.partial_json + + assert output, "No output from the model." + + +@pytest.mark.tools +@pytest.mark.stream +@pytest.mark.context_manager +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_tools_messages_stream_context_manager_async(anthropic_model): + client = anthropic.AsyncAnthropic() + output = None + + class MyHandler(AsyncToolsBetaMessageStream): + @override + async def on_input_json(self, delta: str, snapshot: object) -> None: + nonlocal output + output = snapshot + + async with client.beta.tools.messages.stream( + model=anthropic_model, + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + max_tokens=1024, + event_handler=MyHandler, + ) as stream: + await stream.until_done() + + assert output, "No output from the model."