From c128a61f37544b06cb2f7d4e4c6a9aa3047c8d3c Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 02:40:54 -0400 Subject: [PATCH 01/15] Add sync tools stream support --- log10/load.py | 22 +++++++++++++++++++++- poetry.lock | 6 +++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/log10/load.py b/log10/load.py index eabce024..b249000f 100644 --- a/log10/load.py +++ b/log10/load.py @@ -374,6 +374,16 @@ class AnthropicStreamingResponseWrapper: Wraps a streaming response object to log the final result and duration to log10. """ + text_stream = None + + def __enter__(self): + self.response = self.response.__enter__() + # self.text_stream = self.response + return self + + def __exit__(self, exc_type, exc_value, traceback): + return + def __init__(self, completion_url, completionID, response, partial_log_row): self.completionID = completionID self.completion_url = completion_url @@ -639,7 +649,7 @@ def wrapper(*args, **kwargs): response = output # Adjust the Anthropic output to match OAI completion output if "anthropic" in func.__module__: - if type(output).__name__ == "Stream": + if type(output).__name__ == "Stream" or "MessageStreamManager" in type(output).__name__: log_row["response"] = response log_row["status"] = "finished" return AnthropicStreamingResponseWrapper( @@ -648,6 +658,7 @@ def wrapper(*args, **kwargs): response=response, partial_log_row=log_row, ) + from log10.anthropic import Anthropic response = Anthropic.prepare_response(output, input_prompt=kwargs.get("prompt", "")) @@ -892,6 +903,15 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): attr = module.resources.messages.Messages method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) + + # anthropic Messages stream function + # attr = module.resources.messages.Messages + # method = getattr(attr, "stream") + # setattr(attr, "stream", intercepting_decorator(method)) + + attr = module.resources.beta.tools.Messages + method = getattr(attr, "stream") + setattr(attr, "stream", intercepting_decorator(method)) elif module.__name__ == "lamini": attr = module.api.utils.completion.Completion method = getattr(attr, "generate") diff --git a/poetry.lock b/poetry.lock index 136b6e62..b662b3cb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -123,13 +123,13 @@ files = [ [[package]] name = "anthropic" -version = "0.25.7" +version = "0.26.0" description = "The official Python library for the anthropic API" optional = false python-versions = ">=3.7" files = [ - {file = "anthropic-0.25.7-py3-none-any.whl", hash = "sha256:419a276eb20cfb7ddaac03c7e28e4e12df3ace71bcf33071a68c9a03c0dfcbdd"}, - {file = "anthropic-0.25.7.tar.gz", hash = "sha256:e7de4c8ba8e7e8248ad7f05ed9176634780b95b67c678d23915d8964c8a26f4e"}, + {file = "anthropic-0.26.0-py3-none-any.whl", hash = "sha256:38fc415561d71dcf263b89da0cc6ecec498379b56256fc4242e9128bc707b283"}, + {file = "anthropic-0.26.0.tar.gz", hash = "sha256:6aaffeb05d515cf9788eef57150a5f827f3786883628ccac71dbe5671ab6f44e"}, ] [package.dependencies] From 204e56c8c064d66591ad2258121fd816a878782c Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 03:07:56 -0400 Subject: [PATCH 02/15] Add anthropic tools stream example --- examples/logging/anthropic_tools_stream.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 examples/logging/anthropic_tools_stream.py diff --git a/examples/logging/anthropic_tools_stream.py b/examples/logging/anthropic_tools_stream.py new file mode 100644 index 00000000..e60e6c32 --- /dev/null +++ b/examples/logging/anthropic_tools_stream.py @@ -0,0 +1,22 @@ +import anthropic + +from log10.load import log10 + + +log10(anthropic) + + +client = anthropic.Anthropic() + +with client.beta.tools.messages.stream( + model="claude-instant-1.2", + messages=[ + { + "role": "user", + "content": "Howdy", + } + ], + max_tokens=1024, +) as stream: + for message in stream: + print(message) From 1d85176010ba6a0d73159e88a3149a8c04469501 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 03:49:29 -0400 Subject: [PATCH 03/15] Clean up commented code --- log10/load.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/log10/load.py b/log10/load.py index b249000f..a4705766 100644 --- a/log10/load.py +++ b/log10/load.py @@ -374,11 +374,8 @@ class AnthropicStreamingResponseWrapper: Wraps a streaming response object to log the final result and duration to log10. """ - text_stream = None - def __enter__(self): self.response = self.response.__enter__() - # self.text_stream = self.response return self def __exit__(self, exc_type, exc_value, traceback): @@ -904,11 +901,6 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) - # anthropic Messages stream function - # attr = module.resources.messages.Messages - # method = getattr(attr, "stream") - # setattr(attr, "stream", intercepting_decorator(method)) - attr = module.resources.beta.tools.Messages method = getattr(attr, "stream") setattr(attr, "stream", intercepting_decorator(method)) From 4b873af09fd997274b850c0e7d5d576277af9334 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 14:20:31 -0400 Subject: [PATCH 04/15] Update anthropic tools stream format --- log10/load.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/log10/load.py b/log10/load.py index a4705766..1121db1b 100644 --- a/log10/load.py +++ b/log10/load.py @@ -403,12 +403,17 @@ def __next__(self): self.model = chunk.message.model self.message_id = chunk.message.id self.input_tokens = chunk.message.usage.input_tokens + if chunk.type == "content_block_start" and hasattr(chunk.content_block, "text"): + self.final_result += chunk.content_block.text elif chunk.type == "message_delta": self.finish_reason = chunk.delta.stop_reason self.output_tokens = chunk.usage.output_tokens elif chunk.type == "content_block_delta": - self.final_result += chunk.delta.text - elif chunk.type == "message_stop": + if hasattr(chunk.delta, "text"): + self.final_result += chunk.delta.text + if hasattr(chunk.delta, "partial_json"): + self.final_result += chunk.delta.partial_json + elif chunk.type == "message_stop" or chunk.type == "content_block_stop": response = { "id": self.message_id, "object": "chat", @@ -513,6 +518,17 @@ def _init_log_row(func, *args, **kwargs): else: new_content.append(c) m["content"] = new_content + if "tools" in kwargs_copy: + for t in kwargs_copy["tools"]: + new_function = { + "name": t["name"], + "description": t["description"], + "parameters": { + "properties": t["input_schema"]["properties"], + }, + } + t["function"] = new_function + t.pop("input_schema", None) elif "vertexai" in func.__module__: if func.__name__ == "_send_message": # get model name save in ChatSession instance From c74f08fdd28dad6b256a3939c2b9176702892d29 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 14:23:14 -0400 Subject: [PATCH 05/15] Update anthropic tools example --- examples/logging/anthropic_tools_stream.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/examples/logging/anthropic_tools_stream.py b/examples/logging/anthropic_tools_stream.py index e60e6c32..08a1cd7f 100644 --- a/examples/logging/anthropic_tools_stream.py +++ b/examples/logging/anthropic_tools_stream.py @@ -9,13 +9,26 @@ client = anthropic.Anthropic() with client.beta.tools.messages.stream( - model="claude-instant-1.2", - messages=[ + model="claude-3-haiku-20240307", + tools=[ { - "role": "user", - "content": "Howdy", + "name": "get_weather", + "description": "Get the weather at a specific location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, } ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], max_tokens=1024, ) as stream: for message in stream: From e6ad37a5575c294157d506b1b6beead2130b0437 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 16:32:56 -0400 Subject: [PATCH 06/15] Add anthropic async stream function calling support --- log10/load.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/log10/load.py b/log10/load.py index 1121db1b..c556dbea 100644 --- a/log10/load.py +++ b/log10/load.py @@ -399,12 +399,39 @@ def __iter__(self): def __next__(self): chunk = next(self.response) + self._process_chunk(chunk) + return chunk + + async def __aenter__(self): + self.response = await self.response.__aenter__() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + return + + def __aiter__(self): + return self + + async def __anext__(self): + try: + chunk = await self.response.__anext__() + self._process_chunk(chunk) + return chunk + except StopAsyncIteration: + raise StopAsyncIteration from None + + async def until_done(self): + async for _ in self: + pass + + def _process_chunk(self, chunk): if chunk.type == "message_start": self.model = chunk.message.model self.message_id = chunk.message.id self.input_tokens = chunk.message.usage.input_tokens - if chunk.type == "content_block_start" and hasattr(chunk.content_block, "text"): - self.final_result += chunk.content_block.text + if chunk.type == "content_block_start": + if hasattr(chunk.content_block, "text"): + self.final_result += chunk.content_block.text elif chunk.type == "message_delta": self.finish_reason = chunk.delta.stop_reason self.output_tokens = chunk.usage.output_tokens @@ -441,8 +468,6 @@ def __next__(self): if res.status_code != 200: logger.error(f"Failed to insert in log10: {self.partial_log_row} with error {res.text}. Skipping") - return chunk - def flatten_messages(messages): flat_messages = [] @@ -920,6 +945,10 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): attr = module.resources.beta.tools.Messages method = getattr(attr, "stream") setattr(attr, "stream", intercepting_decorator(method)) + + attr = module.resources.beta.tools.AsyncMessages + method = getattr(attr, "stream") + setattr(attr, "stream", intercepting_decorator(method)) elif module.__name__ == "lamini": attr = module.api.utils.completion.Completion method = getattr(attr, "generate") From e5d33cd5da4d8fa8cf9c0fc64f75cc37eeaa5205 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 16:37:31 -0400 Subject: [PATCH 07/15] Delegate to exit and aexit of the response --- log10/load.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/log10/load.py b/log10/load.py index c556dbea..e2f1ab5d 100644 --- a/log10/load.py +++ b/log10/load.py @@ -379,6 +379,7 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): + self.response.__exit__(exc_type, exc_value, traceback) return def __init__(self, completion_url, completionID, response, partial_log_row): @@ -407,6 +408,7 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): + await self.response.__aexit__(exc_type, exc_value, traceback) return def __aiter__(self): From 772cd496329dc1a5c0d1ec1965298e88ae825865 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 21 May 2024 18:08:59 -0400 Subject: [PATCH 08/15] Address feedback --- log10/load.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/log10/load.py b/log10/load.py index e2f1ab5d..df17f2fc 100644 --- a/log10/load.py +++ b/log10/load.py @@ -548,10 +548,10 @@ def _init_log_row(func, *args, **kwargs): if "tools" in kwargs_copy: for t in kwargs_copy["tools"]: new_function = { - "name": t["name"], - "description": t["description"], + "name": t.get("name", None), + "description": t.get("description", None), "parameters": { - "properties": t["input_schema"]["properties"], + "properties": t.get("input_schema", {}).get("properties", None), }, } t["function"] = new_function From 59817f044a90470825238f92dd1ec1a64d9faba0 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Wed, 22 May 2024 20:41:20 -0400 Subject: [PATCH 09/15] Use http hooks for anthropic async calls --- log10/_httpx_utils.py | 207 +++++++++++++++++++++++++++++++----------- log10/anthropic.py | 10 +- log10/load.py | 35 ++++++- 3 files changed, 198 insertions(+), 54 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 8ef69e11..f741f0a6 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -109,8 +109,13 @@ async def _try_post_request_async(url: str, payload: dict = {}) -> httpx.Respons async def get_completion_id(request: Request): - if "v1/chat/completions" not in str(request.url): - logger.warning("Currently logging is only available for v1/chat/completions.") + host = request.headers.get("host") + if "anthropic" in host and "/v1/messages" not in str(request.url): + logger.warning("Currently logging is only available for anthropic v1/messages.") + return + + if "openai" in host and "v1/chat/completions" not in str(request.url): + logger.warning("Currently logging is only available for openai v1/chat/completions.") return request.headers["x-log10-completion-id"] = str(uuid.uuid4()) @@ -125,14 +130,27 @@ async def log_request(request: Request): orig_module = "" orig_qualname = "" - if "chat" in str(request.url): - kind = "chat" - orig_module = "openai.api_resources.chat_completion" - orig_qualname = "ChatCompletion.create" + host = request.headers.get("host") + if "openai" in host: + if "chat" in str(request.url): + kind = "chat" + orig_module = "openai.api_resources.chat_completion" + orig_qualname = "ChatCompletion.create" + else: + kind = "completion" + orig_module = "openai.api_resources.completion" + orig_qualname = "Completion.create" + elif "anthropic" in host: + if "async:" in request.headers.get("x-stainless-async"): + kind = "chat" + orig_module = "anthropic.resources.beta.tools" + orig_qualname = "Messages.stream" + else: + logger.warning("Currently logging is only available for anthropic async") + return else: - kind = "completion" - orig_module = "openai.api_resources.completion" - orig_qualname = "Completion.create" + logger.warning("Currently logging is only available for async openai and anthropic.") + return log_row = { "status": "started", "kind": kind, @@ -147,13 +165,19 @@ async def log_request(request: Request): class _LogResponse(Response): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.full_content = "" + self.function_name = "" + self.full_argument = "" + self.tool_calls = [] + async def aiter_bytes(self, *args, **kwargs): full_response = "" finished = False async for chunk in super().aiter_bytes(*args, **kwargs): full_response += chunk.decode(errors="ignore") - - if "data: [DONE]" in full_response: + if self.is_response_end_reached(full_response): finished = True duration = int(time.time() - self.request.started) * 1000 @@ -169,58 +193,27 @@ async def aiter_bytes(self, *args, **kwargs): } for frame in current_stack_frame ] - full_content = "" - function_name = "" - full_argument = "" - tool_calls = [] + responses = full_response.split("\n\n") - for r in responses: - if "data: [DONE]" in r: - break - - r_json = json.loads(r[6:]) - - delta = r_json["choices"][0]["delta"] - - # Delta may have content - if "content" in delta: - content = delta["content"] - if content: - full_content += content - - # May be a function call, and have to reconstruct the arguments - if "function_call" in delta: - # May be function name - if "name" in delta["function_call"]: - function_name = delta["function_call"]["name"] - # May be function arguments - if "arguments" in delta["function_call"]: - full_argument += delta["function_call"]["arguments"] - - if tc := delta.get("tool_calls", []): - if tc[0].get("id", ""): - tool_calls.append(tc[0]) - elif tc[0].get("function", {}).get("arguments", ""): - idx = tc[0].get("index") - tool_calls[idx]["function"]["arguments"] += tc[0]["function"]["arguments"] + r_json = self.parse_response_data(responses) response_json = r_json.copy() response_json["object"] = "chat.completion" # r_json is the last response before "data: [DONE]" - if full_content: - response_json["choices"][0]["message"] = {"role": "assistant", "content": full_content} - elif tool_calls: + if self.full_content: + response_json["choices"][0]["message"] = {"role": "assistant", "content": self.full_content} + elif self.tool_calls: response_json["choices"][0]["message"] = { "content": None, "role": "assistant", - "tool_calls": tool_calls, + "tool_calls": self.tool_calls, } - elif function_name and full_argument: + elif self.function_name and self.full_argument: # function is deprecated in openai api response_json["choices"][0]["function_call"] = { - "name": function_name, - "arguments": full_argument, + "name": self.function_name, + "arguments": self.full_argument, } log_row = { @@ -237,6 +230,113 @@ async def aiter_bytes(self, *args, **kwargs): await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row) yield chunk + def is_response_end_reached(self, text: str): + host = self.request.headers.get("host") + if "anthropic" in host: + return self.is_anthropic_response_end_reached(text) + elif "openai" in host: + return self.is_openai_response_end_reached(text) + else: + logger.warning("Currently logging is only available for async openai and anthropic.") + return False + + def is_anthropic_response_end_reached(self, text: str): + return "event: message_stop" in text + + def is_openai_response_end_reached(self, text: str): + return "data: [DONE]" in text + + def parse_anthropic_responses(self, responses: list[str]): + message_id = None + model = None + finish_reason = None + input_tokens = 0 + output_tokens = 0 + + for r in responses: + if self.is_anthropic_response_end_reached(r): + break + + data_index = r.find("data:") + r_json = json.loads(r[data_index + len("data:") :]) + + ### anthropic first data contains + ### {"event":"message_start","data":{"message":{"role":"user","content":"Hello, how are you today?"}}} + type = r_json["type"] + if type == "message_start": + message_id = r_json["message"]["id"] + model = r_json["message"]["model"] + input_tokens = r_json["message"]["usage"]["input_tokens"] + elif type == "content_block_start": + self.full_content += r_json["content_block"]["text"] + elif type == "content_block_delta": + self.full_content += r_json["delta"]["text"] + elif type == "message_delta": + finish_reason = r_json["delta"]["stop_reason"] + output_tokens = r_json["usage"]["output_tokens"] + + return { + "id": message_id, + "object": "chat", + "model": model, + "choices": [ + { + "index": 0, + "finish_reason": finish_reason, + } + ], + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + } + + def parse_openai_responses(self, responses: list[str]): + r_json = None + for r in responses: + if self.is_openai_response_end_reached(r): + break + + # loading the substring of response text after 'data: '. + # example: 'data: {"choices":[{"text":"Hello, how can I help you today?"}]}' + r_json = json.loads(r[6:]) + delta = r_json["choices"][0]["delta"] + + # Delta may have content + if "content" in delta: + content = delta["content"] + if content: + self.full_content += content + + # May be a function call, and have to reconstruct the arguments + if "function_call" in delta: + # May be function name + if "name" in delta["function_call"]: + self.function_name = delta["function_call"]["name"] + # May be function arguments + if "arguments" in delta["function_call"]: + self.full_argument += delta["function_call"]["arguments"] + + if tc := delta.get("tool_calls", []): + if tc[0].get("id", ""): + self.tool_calls.append(tc[0]) + elif tc[0].get("function", {}).get("arguments", ""): + idx = tc[0].get("index") + self.tool_calls[idx]["function"]["arguments"] += tc[0]["function"]["arguments"] + + return r_json + + def parse_response_data(self, responses: list[str]): + host = self.request.headers.get("host") + if "openai" in host: + return self.parse_openai_responses(responses) + elif "anthropic" in host: + return self.parse_anthropic_responses(responses) + else: + logger.warning("Currently logging is only available for async openai and anthropic.") + return None + class _LogTransport(httpx.AsyncBaseTransport): def __init__(self, transport: httpx.AsyncBaseTransport): @@ -265,6 +365,11 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: ] elapsed = time.time() - request.started + if "anthropic" in request.url.host: + from log10.anthropic import Anthropic + + response = Anthropic.prepare_response(llm_response) + log_row = { "response": json.dumps(llm_response), "status": "finished", diff --git a/log10/anthropic.py b/log10/anthropic.py index 3e8b2716..630a9fca 100644 --- a/log10/anthropic.py +++ b/log10/anthropic.py @@ -135,7 +135,15 @@ def create_tokens_usage(prompt: str, completion: str): def prepare_response( response: anthropic.types.Completion | anthropic.types.Message, input_prompt: str = "" ) -> dict: - if response.stop_reason in ["stop_sequence", "end_turn"]: + print(type(response)) + + if isinstance(response, dict): + response = anthropic.types.Message(**response) + + if not hasattr(response, "stop_reason"): + return None + + if response.stop_reason in ["stop_sequence", "end_turn", "tool_use"]: reason = "stop" elif response.stop_reason == "max_tokens": reason = "length" diff --git a/log10/load.py b/log10/load.py index df17f2fc..c62962ae 100644 --- a/log10/load.py +++ b/log10/load.py @@ -527,6 +527,9 @@ def _init_log_row(func, *args, **kwargs): # kind and request are set based on the module and qualname # request is based on openai schema if "anthropic" in func.__module__: + from anthropic._utils._utils import strip_not_given + + kwargs_copy = strip_not_given(kwargs_copy) log_row["kind"] = "chat" if "message" in func.__module__ else "completion" # set system message if "system" in kwargs_copy: @@ -944,13 +947,41 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) - attr = module.resources.beta.tools.Messages + attr = module.resources.messages.Messages method = getattr(attr, "stream") setattr(attr, "stream", intercepting_decorator(method)) - attr = module.resources.beta.tools.AsyncMessages + attr = module.resources.beta.tools.Messages + method = getattr(attr, "create") + setattr(attr, "create", intercepting_decorator(method)) + + attr = module.resources.beta.tools.Messages method = getattr(attr, "stream") setattr(attr, "stream", intercepting_decorator(method)) + + origin_init = module.AsyncAnthropic.__init__ + + def new_init(self, *args, **kwargs): + logger.debug("LOG10: patching AsyncAnthropic.__init__") + import httpx + + from log10._httpx_utils import ( + _LogTransport, + get_completion_id, + log_request, + ) + + event_hooks = { + "request": [get_completion_id, log_request], + } + async_httpx_client = httpx.AsyncClient( + event_hooks=event_hooks, + transport=_LogTransport(httpx.AsyncHTTPTransport()), + ) + kwargs["http_client"] = async_httpx_client + origin_init(self, *args, **kwargs) + + module.AsyncAnthropic.__init__ = new_init elif module.__name__ == "lamini": attr = module.api.utils.completion.Completion method = getattr(attr, "generate") From 9563ed46ed63f49ba48b718b86ec98468eb07358 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Thu, 23 May 2024 00:56:20 -0400 Subject: [PATCH 10/15] Handle async anthropic function call via httpx hook --- log10/_httpx_utils.py | 65 ++++++++++++++++++++++++++++++++++++++----- log10/load.py | 23 --------------- 2 files changed, 58 insertions(+), 30 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index f741f0a6..f17d7463 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -108,6 +108,18 @@ async def _try_post_request_async(url: str, payload: dict = {}) -> httpx.Respons logger.error(f"Failed to insert in log10: {payload} with error {err}") +def format_anthropic_tools_request(request_content) -> str: + new_tools = [] + for tool in request_content["tools"]: + new_tool = { + "type": "function", + "function": {"name": tool["name"], "description": tool["description"], "parameters": tool["input_schema"]}, + } + new_tools.append(new_tool) + request_content["tools"] = new_tools + return json.dumps(request_content) + + async def get_completion_id(request: Request): host = request.headers.get("host") if "anthropic" in host and "/v1/messages" not in str(request.url): @@ -130,6 +142,7 @@ async def log_request(request: Request): orig_module = "" orig_qualname = "" + request_content_decode = request.content.decode("utf-8") host = request.headers.get("host") if "openai" in host: if "chat" in str(request.url): @@ -145,6 +158,14 @@ async def log_request(request: Request): kind = "chat" orig_module = "anthropic.resources.beta.tools" orig_qualname = "Messages.stream" + request_content = json.loads(request_content_decode) + if "tools" in request_content: + orig_module = "anthropic.resources.beta.tools" + orig_qualname = "Messages.stream" + request_content_decode = format_anthropic_tools_request(request_content) + else: + orig_module = "anthropic.resources.messages" + orig_qualname = "Messages.stream" else: logger.warning("Currently logging is only available for anthropic async") return @@ -156,7 +177,7 @@ async def log_request(request: Request): "kind": kind, "orig_module": orig_module, "orig_qualname": orig_qualname, - "request": request.content.decode("utf-8"), + "request": request_content_decode, "session_id": sessionID, } if get_log10_session_tags(): @@ -198,7 +219,6 @@ async def aiter_bytes(self, *args, **kwargs): r_json = self.parse_response_data(responses) response_json = r_json.copy() - response_json["object"] = "chat.completion" # r_json is the last response before "data: [DONE]" if self.full_content: @@ -216,13 +236,19 @@ async def aiter_bytes(self, *args, **kwargs): "arguments": self.full_argument, } + request_content_decode = self.request.content.decode("utf-8") + if "anthropic" in self.request.headers.get("host"): + request_content = json.loads(request_content_decode) + if "tools" in request_content: + request_content_decode = format_anthropic_tools_request(request_content) + log_row = { "response": json.dumps(response_json), "status": "finished", "duration": duration, "stacktrace": json.dumps(stacktrace), "kind": "chat", - "request": self.request.content.decode("utf-8"), + "request": request_content_decode, "session_id": sessionID, } if get_log10_session_tags(): @@ -252,9 +278,10 @@ def parse_anthropic_responses(self, responses: list[str]): finish_reason = None input_tokens = 0 output_tokens = 0 - + tool_call = {} + arguments = "" for r in responses: - if self.is_anthropic_response_end_reached(r): + if not r: break data_index = r.find("data:") @@ -268,12 +295,35 @@ def parse_anthropic_responses(self, responses: list[str]): model = r_json["message"]["model"] input_tokens = r_json["message"]["usage"]["input_tokens"] elif type == "content_block_start": - self.full_content += r_json["content_block"]["text"] + content_block = r_json["content_block"] + type = content_block["type"] + if type == "tool_use": + id = content_block["id"] + tool_call = { + "id": id, + "type": "function", + "function": {"name": content_block["name"], "arguments": ""}, + } + if "text" in content_block: + self.full_content += content_block["text"] elif type == "content_block_delta": - self.full_content += r_json["delta"]["text"] + delta = r_json["delta"] + if "text" in delta: + self.full_content += delta["text"] + if "partial_json" in delta: + if self.full_content: + self.full_content += delta["partial_json"] + else: + arguments += delta["partial_json"] elif type == "message_delta": finish_reason = r_json["delta"]["stop_reason"] output_tokens = r_json["usage"]["output_tokens"] + elif type == "content_block_end" or type == "message_end": + if tool_call: + tool_call["function"]["arguments"] = arguments + self.tool_calls.append(tool_call) + tool_call = {} + arguments = "" return { "id": message_id, @@ -325,6 +375,7 @@ def parse_openai_responses(self, responses: list[str]): idx = tc[0].get("index") self.tool_calls[idx]["function"]["arguments"] += tc[0]["function"]["arguments"] + r_json["object"] = "chat.completion" return r_json def parse_response_data(self, responses: list[str]): diff --git a/log10/load.py b/log10/load.py index c62962ae..1bd3a63a 100644 --- a/log10/load.py +++ b/log10/load.py @@ -403,29 +403,6 @@ def __next__(self): self._process_chunk(chunk) return chunk - async def __aenter__(self): - self.response = await self.response.__aenter__() - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.response.__aexit__(exc_type, exc_value, traceback) - return - - def __aiter__(self): - return self - - async def __anext__(self): - try: - chunk = await self.response.__anext__() - self._process_chunk(chunk) - return chunk - except StopAsyncIteration: - raise StopAsyncIteration from None - - async def until_done(self): - async for _ in self: - pass - def _process_chunk(self, chunk): if chunk.type == "message_start": self.model = chunk.message.model From 24884600dcbe97609b7fe39465e1a852964c6645 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Thu, 23 May 2024 00:57:16 -0400 Subject: [PATCH 11/15] Add anthropic async examples --- .../anthropic_async_messages_stream.py | 35 ++++++++++++ ...anthropic_async_messages_stream_handler.py | 38 +++++++++++++ .../logging/anthropic_async_tools_stream.py | 47 ++++++++++++++++ .../anthropic_async_tools_stream_handler.py | 55 +++++++++++++++++++ .../logging/anthropic_messages_not_given.py | 17 ++++++ .../logging/magentic_async_chat_prompt.py | 23 ++++++++ log10/anthropic.py | 5 -- 7 files changed, 215 insertions(+), 5 deletions(-) create mode 100644 examples/logging/anthropic_async_messages_stream.py create mode 100644 examples/logging/anthropic_async_messages_stream_handler.py create mode 100644 examples/logging/anthropic_async_tools_stream.py create mode 100644 examples/logging/anthropic_async_tools_stream_handler.py create mode 100644 examples/logging/anthropic_messages_not_given.py create mode 100644 examples/logging/magentic_async_chat_prompt.py diff --git a/examples/logging/anthropic_async_messages_stream.py b/examples/logging/anthropic_async_messages_stream.py new file mode 100644 index 00000000..77fdc6c1 --- /dev/null +++ b/examples/logging/anthropic_async_messages_stream.py @@ -0,0 +1,35 @@ +import asyncio + +import anthropic + +from log10.load import log10 + + +log10(anthropic) + +client = anthropic.AsyncAnthropic() + + +async def main() -> None: + async with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-haiku-20240307", + ) as stream: + async for text in stream.text_stream: + print(text, end="", flush=True) + print() + + # you can still get the accumulated final message outside of + # the context manager, as long as the entire stream was consumed + # inside of the context manager + accumulated = await stream.get_final_message() + print("accumulated message: ", accumulated.to_json()) + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_async_messages_stream_handler.py b/examples/logging/anthropic_async_messages_stream_handler.py new file mode 100644 index 00000000..382bbb6b --- /dev/null +++ b/examples/logging/anthropic_async_messages_stream_handler.py @@ -0,0 +1,38 @@ +import asyncio + +import anthropic +from anthropic import AsyncAnthropic, AsyncMessageStream +from anthropic.types import MessageStreamEvent +from typing_extensions import override + +from log10.load import log10 + + +log10(anthropic) + +client = AsyncAnthropic() + + +class MyStream(AsyncMessageStream): + @override + async def on_stream_event(self, event: MessageStreamEvent) -> None: + print("on_event fired with:", event) + + +async def main() -> None: + async with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model="claude-3-haiku-20240307", + event_handler=MyStream, + ) as stream: + accumulated = await stream.get_final_message() + print("accumulated message: ", accumulated.to_json()) + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_async_tools_stream.py b/examples/logging/anthropic_async_tools_stream.py new file mode 100644 index 00000000..f0eaff2d --- /dev/null +++ b/examples/logging/anthropic_async_tools_stream.py @@ -0,0 +1,47 @@ +import asyncio + +import anthropic +from anthropic import AsyncAnthropic + +from log10.load import log10 + + +log10(anthropic) + +client = AsyncAnthropic() + + +async def run_conversation(): + tools = [ + { + "name": "get_weather", + "description": "Get the weather in a given location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": 'The unit of temperature, either "celsius" or "fahrenheit"', + }, + }, + "required": ["location"], + }, + } + ] + async with client.beta.tools.messages.stream( + model="claude-3-haiku-20240307", + tools=tools, + messages=[ + { + "role": "user", + "content": "What's the weather like in San Francisco?", + } + ], + max_tokens=1024, + ) as stream: + await stream.until_done() + + +asyncio.run(run_conversation()) diff --git a/examples/logging/anthropic_async_tools_stream_handler.py b/examples/logging/anthropic_async_tools_stream_handler.py new file mode 100644 index 00000000..cf8826eb --- /dev/null +++ b/examples/logging/anthropic_async_tools_stream_handler.py @@ -0,0 +1,55 @@ +import asyncio + +import anthropic +from anthropic import AsyncAnthropic +from anthropic.lib.streaming.beta import AsyncToolsBetaMessageStream +from typing_extensions import override + +from log10.load import log10 + + +log10(anthropic) + + +client = AsyncAnthropic() + + +class MyHandler(AsyncToolsBetaMessageStream): + @override + async def on_input_json(self, delta: str, snapshot: object) -> None: + print(f"delta: {repr(delta)}") + print(f"snapshot: {snapshot}") + print() + + +async def main() -> None: + async with client.beta.tools.messages.stream( + max_tokens=1024, + model="claude-3-haiku-20240307", + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location.", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + event_handler=MyHandler, + ) as stream: + await stream.until_done() + + print() + + +asyncio.run(main()) diff --git a/examples/logging/anthropic_messages_not_given.py b/examples/logging/anthropic_messages_not_given.py new file mode 100644 index 00000000..1be9d8ef --- /dev/null +++ b/examples/logging/anthropic_messages_not_given.py @@ -0,0 +1,17 @@ +from log10.load import Anthropic + + +client = Anthropic() + +completion = client.messages.create( + model="claude-instant-1.2", + messages=[ + { + "role": "user", + "content": "tell a short joke.", + }, + ], + max_tokens=1000, +) + +print(completion.content[0].text) diff --git a/examples/logging/magentic_async_chat_prompt.py b/examples/logging/magentic_async_chat_prompt.py new file mode 100644 index 00000000..06b3c107 --- /dev/null +++ b/examples/logging/magentic_async_chat_prompt.py @@ -0,0 +1,23 @@ +import asyncio + +import anthropic +from magentic import UserMessage, chatprompt +from magentic.chat_model.anthropic_chat_model import AnthropicChatModel + +from log10.load import log10 + + +log10(anthropic) + + +async def main(topic: str) -> str: + @chatprompt( + UserMessage(f"Tell me a joke about {topic}"), + model=AnthropicChatModel("claude-3-opus-20240229"), + ) + async def tell_joke(topic: str) -> str: ... + + print(await tell_joke(topic)) + + +asyncio.run(main("cats")) diff --git a/log10/anthropic.py b/log10/anthropic.py index 630a9fca..24ca63b6 100644 --- a/log10/anthropic.py +++ b/log10/anthropic.py @@ -135,11 +135,6 @@ def create_tokens_usage(prompt: str, completion: str): def prepare_response( response: anthropic.types.Completion | anthropic.types.Message, input_prompt: str = "" ) -> dict: - print(type(response)) - - if isinstance(response, dict): - response = anthropic.types.Message(**response) - if not hasattr(response, "stop_reason"): return None From ae2e7cf0a4114a7cdc6407304442ccc16d9de2ee Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Thu, 23 May 2024 02:12:37 -0400 Subject: [PATCH 12/15] Prepare anthropic response from tool.beta.message --- examples/logging/anthropic_async_messages.py | 23 ++++++++++++++++++++ log10/_httpx_utils.py | 16 ++++++++++++-- log10/anthropic.py | 13 ++++++++++- 3 files changed, 49 insertions(+), 3 deletions(-) create mode 100644 examples/logging/anthropic_async_messages.py diff --git a/examples/logging/anthropic_async_messages.py b/examples/logging/anthropic_async_messages.py new file mode 100644 index 00000000..88268728 --- /dev/null +++ b/examples/logging/anthropic_async_messages.py @@ -0,0 +1,23 @@ +import asyncio + +import anthropic + +from log10.load import log10 + + +log10(anthropic) + +client = anthropic.AsyncAnthropic() + + +async def main() -> None: + message = await client.beta.tools.messages.create( + model="claude-instant-1.2", + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + print(message) + + +asyncio.run(main()) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index f17d7463..c41a24c6 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -394,7 +394,15 @@ def __init__(self, transport: httpx.AsyncBaseTransport): self.transport = transport async def handle_async_request(self, request: httpx.Request) -> httpx.Response: - response = await self.transport.handle_async_request(request) + try: + response = await self.transport.handle_async_request(request) + except Exception as e: + logger.warning(f"Failed to send request: {e}") + return + + if response.status_code >= 400: + logger.warning(f"HTTP error occurred: {response.status_code}") + return completion_id = request.headers.get("x-log10-completion-id", "") if not completion_id: @@ -417,9 +425,13 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: elapsed = time.time() - request.started if "anthropic" in request.url.host: + from anthropic.types.beta.tools import ( + ToolsBetaMessage, + ) + from log10.anthropic import Anthropic - response = Anthropic.prepare_response(llm_response) + llm_response = Anthropic.prepare_response(ToolsBetaMessage(**llm_response)) log_row = { "response": json.dumps(llm_response), diff --git a/log10/anthropic.py b/log10/anthropic.py index 24ca63b6..bb05f407 100644 --- a/log10/anthropic.py +++ b/log10/anthropic.py @@ -4,6 +4,9 @@ import anthropic from anthropic import AI_PROMPT, HUMAN_PROMPT +from anthropic.types.beta.tools import ( + ToolsBetaMessage, +) from log10.llm import LLM, ChatCompletion, Kind, Message, TextCompletion from log10.utils import merge_hparams @@ -133,7 +136,7 @@ def create_tokens_usage(prompt: str, completion: str): @staticmethod def prepare_response( - response: anthropic.types.Completion | anthropic.types.Message, input_prompt: str = "" + response: anthropic.types.Completion | anthropic.types.Message | ToolsBetaMessage, input_prompt: str = "" ) -> dict: if not hasattr(response, "stop_reason"): return None @@ -154,6 +157,7 @@ def prepare_response( } ], } + if isinstance(response, anthropic.types.Message): tokens_usage = { "prompt_tokens": response.usage.input_tokens, @@ -164,6 +168,13 @@ def prepare_response( elif isinstance(response, anthropic.types.Completion): tokens_usage = Anthropic.create_tokens_usage(input_prompt, response.completion) ret_response["choices"][0]["text"] = response.completion + elif isinstance(response, ToolsBetaMessage): + tokens_usage = { + "prompt_tokens": response.usage.input_tokens, + "completion_tokens": response.usage.output_tokens, + "total_tokens": response.usage.input_tokens + response.usage.output_tokens, + } + ret_response["choices"][0]["message"] = {"role": response.role, "content": response.content[0].text} ret_response["usage"] = tokens_usage return ret_response From 741a5db107e2e7fc2cc217bb4a723f7ea9e863ae Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Thu, 23 May 2024 18:19:33 -0400 Subject: [PATCH 13/15] Add tests for new anthropic changes --- .../logging/anthropic_messages_not_given.py | 8 +- tests/test_anthropic.py | 261 +++++++++++++++++- 2 files changed, 264 insertions(+), 5 deletions(-) diff --git a/examples/logging/anthropic_messages_not_given.py b/examples/logging/anthropic_messages_not_given.py index 1be9d8ef..4d3573fe 100644 --- a/examples/logging/anthropic_messages_not_given.py +++ b/examples/logging/anthropic_messages_not_given.py @@ -1,10 +1,12 @@ +from anthropic import NOT_GIVEN + from log10.load import Anthropic client = Anthropic() -completion = client.messages.create( - model="claude-instant-1.2", +completion = client.beta.tools.messages.create( + model="claude-3-haiku-20240307", messages=[ { "role": "user", @@ -12,6 +14,8 @@ }, ], max_tokens=1000, + tools=NOT_GIVEN, + tool_choice=NOT_GIVEN, ) print(completion.content[0].text) diff --git a/tests/test_anthropic.py b/tests/test_anthropic.py index 166b3b75..43f49b4f 100644 --- a/tests/test_anthropic.py +++ b/tests/test_anthropic.py @@ -3,16 +3,20 @@ import anthropic import httpx import pytest +from anthropic import NOT_GIVEN +from anthropic.lib.streaming.beta import AsyncToolsBetaMessageStream +from typing_extensions import override from log10.load import log10 log10(anthropic) -client = anthropic.Anthropic() @pytest.mark.chat -def test_messages(anthropic_model): +def test_messages_create(anthropic_model): + client = anthropic.Anthropic() + message = client.messages.create( model=anthropic_model, max_tokens=1000, @@ -26,9 +30,30 @@ def test_messages(anthropic_model): assert text, "No output from the model." +@pytest.mark.chat +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_messages_create_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + message = await client.messages.create( + model=anthropic_model, + max_tokens=1000, + temperature=0.0, + system="Respond only in Yoda-speak.", + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert isinstance(text, str) + assert text, "No output from the model." + + @pytest.mark.chat @pytest.mark.stream -def test_messages_stream(anthropic_model): +def test_messages_create_stream(anthropic_model): + client = anthropic.Anthropic() + stream = client.messages.create( model=anthropic_model, messages=[ @@ -53,6 +78,37 @@ def test_messages_stream(anthropic_model): assert output, "No output from the model." +@pytest.mark.chat +@pytest.mark.stream +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_messages_create_stream_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + stream = await client.messages.create( + model=anthropic_model, + messages=[ + { + "role": "user", + "content": "Count to 10", + } + ], + max_tokens=128, + temperature=0.9, + stream=True, + ) + + output = "" + for event in stream: + if event.type == "content_block_delta": + text = event.delta.text + output += text + if text.isdigit(): + assert int(text) <= 10 + + assert output, "No output from the model." + + @pytest.mark.vision def test_messages_image(anthropic_model): client = anthropic.Anthropic() @@ -85,3 +141,202 @@ def test_messages_image(anthropic_model): text = message.content[0].text assert text, "No output from the model." assert "ant" in text + + +@pytest.mark.chat +def test_chat_not_given(anthropic_model): + client = anthropic.Anthropic() + + message = client.beta.tools.messages.stream( + model=anthropic_model, + messages=[ + { + "role": "user", + "content": "tell a short joke.", + }, + ], + tools=NOT_GIVEN, + tool_choice=NOT_GIVEN, + ) + + content = message.content[0].text + assert isinstance(content, str) + assert content, "No output from the model." + + +@pytest.mark.chat +def test_beta_tools_messages_create(anthropic_model): + client = anthropic.Anthropic() + + message = client.beta.tools.messages.create( + model=anthropic_model, + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert text, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_beta_tools_messages_create_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + message = await client.beta.tools.messages.create( + model=anthropic_model, + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert text, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_beta_tools_messages_stream_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + message = await client.beta.tools.messages.stream( + model=anthropic_model, + max_tokens=1000, + messages=[{"role": "user", "content": "Say hello!"}], + ) + + text = message.content[0].text + assert text, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.stream +@pytest.mark.context_manager +def test_messages_stream_context_manager(anthropic_model): + client = anthropic.Anthropic() + + output = "" + with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model=anthropic_model, + ) as stream: + for text in stream.text_stream: + output += text + + assert output, "No output from the model." + + +@pytest.mark.chat +@pytest.mark.stream +@pytest.mark.context_manager +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_messages_stream_context_manager_async(anthropic_model): + client = anthropic.AsyncAnthropic() + + output = "" + async with client.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Say hello there!", + } + ], + model=anthropic_model, + ) as stream: + async for text in stream.text_stream: + output += text + + assert output, "No output from the model." + + +@pytest.mark.tools +@pytest.mark.stream +@pytest.mark.context_manager +def test_tools_messages_stream_context_manager(anthropic_model): + client = anthropic.Anthropic() + output = "" + with client.beta.tools.messages.stream( + model=anthropic_model, + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + max_tokens=1024, + ) as stream: + for message in stream: + if message.type == "content_block_delta": + if message.delta: + if hasattr(message.delta, "text"): + output += message.delta.text + if hasattr(message.delta, "partial_json"): + output += message.delta.partial_json + + assert output, "No output from the model." + + +@pytest.mark.tools +@pytest.mark.stream +@pytest.mark.context_manager +@pytest.mark.async_client +@pytest.mark.asyncio +async def test_tools_messages_stream_context_manager_async(anthropic_model): + client = anthropic.AsyncAnthropic() + output = None + + class MyHandler(AsyncToolsBetaMessageStream): + @override + async def on_input_json(self, snapshot: object) -> None: + nonlocal output + output = snapshot + + async with client.beta.tools.messages.stream( + model=anthropic_model, + tools=[ + { + "name": "get_weather", + "description": "Get the weather at a specific location", + "input_schema": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. San Francisco, CA"}, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Unit for the output", + }, + }, + "required": ["location"], + }, + } + ], + messages=[{"role": "user", "content": "What is the weather in SF?"}], + max_tokens=1024, + event_handler=MyHandler, + ) as stream: + await stream.until_done() + + assert output, "No output from the model." From ad7638ffd67a30fb9c4ea9bf506be1652df1afdd Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Thu, 23 May 2024 19:01:29 -0400 Subject: [PATCH 14/15] Fix tests --- tests/pytest.ini | 1 + tests/test_anthropic.py | 59 ++++++----------------------------------- 2 files changed, 9 insertions(+), 51 deletions(-) diff --git a/tests/pytest.ini b/tests/pytest.ini index 956f1de7..3448f112 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -16,3 +16,4 @@ markers = tools: mark a test as a tools test vision: mark a test as a vision test widget: mark a test as a widget test + context_manager: mark a test as a context manager test diff --git a/tests/test_anthropic.py b/tests/test_anthropic.py index 43f49b4f..6727b95d 100644 --- a/tests/test_anthropic.py +++ b/tests/test_anthropic.py @@ -78,37 +78,6 @@ def test_messages_create_stream(anthropic_model): assert output, "No output from the model." -@pytest.mark.chat -@pytest.mark.stream -@pytest.mark.async_client -@pytest.mark.asyncio -async def test_messages_create_stream_async(anthropic_model): - client = anthropic.AsyncAnthropic() - - stream = await client.messages.create( - model=anthropic_model, - messages=[ - { - "role": "user", - "content": "Count to 10", - } - ], - max_tokens=128, - temperature=0.9, - stream=True, - ) - - output = "" - for event in stream: - if event.type == "content_block_delta": - text = event.delta.text - output += text - if text.isdigit(): - assert int(text) <= 10 - - assert output, "No output from the model." - - @pytest.mark.vision def test_messages_image(anthropic_model): client = anthropic.Anthropic() @@ -147,7 +116,7 @@ def test_messages_image(anthropic_model): def test_chat_not_given(anthropic_model): client = anthropic.Anthropic() - message = client.beta.tools.messages.stream( + message = client.beta.tools.messages.create( model=anthropic_model, messages=[ { @@ -155,6 +124,7 @@ def test_chat_not_given(anthropic_model): "content": "tell a short joke.", }, ], + max_tokens=1000, tools=NOT_GIVEN, tool_choice=NOT_GIVEN, ) @@ -194,22 +164,6 @@ async def test_beta_tools_messages_create_async(anthropic_model): assert text, "No output from the model." -@pytest.mark.chat -@pytest.mark.async_client -@pytest.mark.asyncio -async def test_beta_tools_messages_stream_async(anthropic_model): - client = anthropic.AsyncAnthropic() - - message = await client.beta.tools.messages.stream( - model=anthropic_model, - max_tokens=1000, - messages=[{"role": "user", "content": "Say hello!"}], - ) - - text = message.content[0].text - assert text, "No output from the model." - - @pytest.mark.chat @pytest.mark.stream @pytest.mark.context_manager @@ -227,8 +181,11 @@ def test_messages_stream_context_manager(anthropic_model): ], model=anthropic_model, ) as stream: - for text in stream.text_stream: - output += text + for message in stream: + if message.type == "content_block_delta": + if message.delta: + if hasattr(message.delta, "text"): + output += message.delta.text assert output, "No output from the model." @@ -309,7 +266,7 @@ async def test_tools_messages_stream_context_manager_async(anthropic_model): class MyHandler(AsyncToolsBetaMessageStream): @override - async def on_input_json(self, snapshot: object) -> None: + async def on_input_json(self, delta: str, snapshot: object) -> None: nonlocal output output = snapshot From 167103f30a829de4e6fe04b8bdde16d9caf67222 Mon Sep 17 00:00:00 2001 From: Kim Tran Date: Tue, 28 May 2024 15:20:28 -0400 Subject: [PATCH 15/15] Address Wenzhe feedback --- log10/_httpx_utils.py | 18 ++++++------------ log10/load.py | 4 ++-- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/log10/_httpx_utils.py b/log10/_httpx_utils.py index 6332d120..7414fb52 100644 --- a/log10/_httpx_utils.py +++ b/log10/_httpx_utils.py @@ -154,21 +154,15 @@ async def log_request(request: Request): orig_module = "openai.api_resources.completion" orig_qualname = "Completion.create" elif "anthropic" in host: - if "async:" in request.headers.get("x-stainless-async"): - kind = "chat" + kind = "chat" + request_content = json.loads(request_content_decode) + if "tools" in request_content: orig_module = "anthropic.resources.beta.tools" orig_qualname = "Messages.stream" - request_content = json.loads(request_content_decode) - if "tools" in request_content: - orig_module = "anthropic.resources.beta.tools" - orig_qualname = "Messages.stream" - request_content_decode = format_anthropic_tools_request(request_content) - else: - orig_module = "anthropic.resources.messages" - orig_qualname = "Messages.stream" + request_content_decode = format_anthropic_tools_request(request_content) else: - logger.warning("Currently logging is only available for anthropic async") - return + orig_module = "anthropic.resources.messages" + orig_qualname = "Messages.stream" else: logger.warning("Currently logging is only available for async openai and anthropic.") return diff --git a/log10/load.py b/log10/load.py index e6b0019c..766e6c08 100644 --- a/log10/load.py +++ b/log10/load.py @@ -668,7 +668,7 @@ def wrapper(*args, **kwargs): completionID = result_queue.get() if completionID is None: - logging.warn(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") + logger.warning(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") return logger.debug(f"LOG10: failed - {e}") @@ -684,7 +684,7 @@ def wrapper(*args, **kwargs): try: res = post_request(completion_url + "/" + completionID, log_row) except Exception as le: - logging.warn(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") + logger.warning(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") raise e else: # finished with no exceptions