From 1b7b4efa824c1a5ced337fc42b450be7a2e009b9 Mon Sep 17 00:00:00 2001 From: wenzhe <145375501+wenzhe-log10@users.noreply.github.com> Date: Fri, 15 Mar 2024 14:26:26 -0700 Subject: [PATCH] add anthropic messages support and stream (#118) * add anthropic messages support and stream add examples * update anthropic version dependency * add log10.load Anthropic client * use claude-instant-1.2 in anthropic completions example --- examples/logging/anthropic_completion.py | 2 +- examples/logging/anthropic_messages.py | 18 ++ examples/logging/anthropic_messages_stream.py | 26 +++ .../{steaming.py => openai_chat_stream.py} | 2 +- log10/anthropic.py | 39 ++-- log10/load.py | 166 +++++++++++++++--- poetry.lock | 21 ++- pyproject.toml | 2 +- 8 files changed, 228 insertions(+), 48 deletions(-) create mode 100644 examples/logging/anthropic_messages.py create mode 100644 examples/logging/anthropic_messages_stream.py rename examples/logging/{steaming.py => openai_chat_stream.py} (84%) diff --git a/examples/logging/anthropic_completion.py b/examples/logging/anthropic_completion.py index 588b158a..d07bb8d8 100644 --- a/examples/logging/anthropic_completion.py +++ b/examples/logging/anthropic_completion.py @@ -9,7 +9,7 @@ client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) response = client.completions.create( - model="claude-1", + model="claude-instant-1.2", prompt=f"\n\nHuman:Write the names of all Star Wars movies and spinoffs along with the time periods in which they were set?{anthropic.AI_PROMPT}", temperature=0, max_tokens_to_sample=1024, diff --git a/examples/logging/anthropic_messages.py b/examples/logging/anthropic_messages.py new file mode 100644 index 00000000..677cce3b --- /dev/null +++ b/examples/logging/anthropic_messages.py @@ -0,0 +1,18 @@ +import anthropic + +from log10.load import log10 + + +log10(anthropic) + +client = anthropic.Anthropic() + +message = client.messages.create( + model="claude-3-opus-20240229", + max_tokens=1000, + temperature=0.0, + system="Respond only in Yoda-speak.", + messages=[{"role": "user", "content": "How are you today?"}], +) + +print(message.content[0].text) diff --git a/examples/logging/anthropic_messages_stream.py b/examples/logging/anthropic_messages_stream.py new file mode 100644 index 00000000..3bb88e96 --- /dev/null +++ b/examples/logging/anthropic_messages_stream.py @@ -0,0 +1,26 @@ +import anthropic +from anthropic import Anthropic + +from log10.load import log10 + + +log10(anthropic) + + +client = Anthropic() + +stream = client.messages.create( + model="claude-3-opus-20240229", + messages=[ + { + "role": "user", + "content": "Tell a 50 words joke.", + } + ], + max_tokens=128, + temperature=0.9, + stream=True, +) +for event in stream: + if event.type == "content_block_delta": + print(event.delta.text, end="", flush=True) diff --git a/examples/logging/steaming.py b/examples/logging/openai_chat_stream.py similarity index 84% rename from examples/logging/steaming.py rename to examples/logging/openai_chat_stream.py index 8f1f1d10..690dfd3f 100644 --- a/examples/logging/steaming.py +++ b/examples/logging/openai_chat_stream.py @@ -6,7 +6,7 @@ response = client.chat.completions.create( model="gpt-3.5-turbo", - messages=[{"role": "user", "content": "Count to 200"}], + messages=[{"role": "user", "content": "Count to 20"}], temperature=0, stream=True, ) diff --git a/log10/anthropic.py b/log10/anthropic.py index 65c72cf1..3e8b2716 100644 --- a/log10/anthropic.py +++ b/log10/anthropic.py @@ -43,7 +43,7 @@ def chat(self, messages: List[Message], hparams: dict = None) -> ChatCompletion: completion = self.client.completions.create(**chat_request) content = completion.completion - response = Anthropic.prepare_response(chat_request["prompt"], completion, "chat") + response = Anthropic.prepare_response(completion, chat_request["prompt"]) self.log_end( completion_id, @@ -83,7 +83,7 @@ def text(self, prompt: str, hparams: dict = None) -> TextCompletion: completion = self.client.completions.create(**text_request) text = completion.completion - response = Anthropic.prepare_response(text_request["prompt"], completion, "text") + response = Anthropic.prepare_response(completion, text_request["prompt"]) self.log_end( completion_id, @@ -132,30 +132,35 @@ def create_tokens_usage(prompt: str, completion: str): } @staticmethod - def prepare_response(prompt: str, completion: anthropic.types.Completion, type: str = "text") -> dict: - if completion.stop_reason == "stop_sequence": + def prepare_response( + response: anthropic.types.Completion | anthropic.types.Message, input_prompt: str = "" + ) -> dict: + if response.stop_reason in ["stop_sequence", "end_turn"]: reason = "stop" - elif completion.stop_reason == "max_tokens": + elif response.stop_reason == "max_tokens": reason = "length" - tokens_usage = Anthropic.create_tokens_usage(prompt, completion.completion) - response = { - "id": completion.model_extra["log_id"], + ret_response = { + "id": response.id, "object": "completion", - "model": completion.model, + "model": response.model, "choices": [ { "index": 0, "finish_reason": reason, } ], - "usage": tokens_usage, } - if type == "text": - response["choices"][0]["text"] = completion.completion - elif type == "chat": - response["choices"][0]["message"] = { - "role": "assistant", - "content": completion.completion, + if isinstance(response, anthropic.types.Message): + tokens_usage = { + "prompt_tokens": response.usage.input_tokens, + "completion_tokens": response.usage.output_tokens, + "total_tokens": response.usage.input_tokens + response.usage.output_tokens, } - return response + ret_response["choices"][0]["message"] = {"role": response.role, "content": response.content[0].text} + elif isinstance(response, anthropic.types.Completion): + tokens_usage = Anthropic.create_tokens_usage(input_prompt, response.completion) + ret_response["choices"][0]["text"] = response.completion + ret_response["usage"] = tokens_usage + + return ret_response diff --git a/log10/load.py b/log10/load.py index 139538ac..713b83b4 100644 --- a/log10/load.py +++ b/log10/load.py @@ -214,9 +214,14 @@ async def log_async(completion_url, func, **kwargs): if "messages" in kwargs: kwargs["messages"] = flatten_messages(kwargs["messages"]) + if "anthropic" in func.__module__: + if "system" in kwargs: + kwargs["messages"].insert(0, {"role": "system", "content": kwargs["system"]}) + log_row = { # do we want to also store args? "status": "started", + "kind": "chat" if "chat" in func.__module__ or "messages" in func.__module__ else "completion", "orig_module": func.__module__, "orig_qualname": func.__qualname__, "request": json.dumps(kwargs), @@ -384,6 +389,69 @@ def __next__(self): raise se +class AnthropicStreamingResponseWrapper: + """ + Wraps a streaming response object to log the final result and duration to log10. + """ + + def __init__(self, completion_url, completionID, response, partial_log_row): + self.completionID = completionID + self.completion_url = completion_url + self.partial_log_row = partial_log_row + self.response = response + self.final_result = "" + self.start_time = time.perf_counter() + self.message_id = None + self.model = None + self.finish_reason = None + self.input_tokens = 0 + self.output_tokens = 0 + + def __iter__(self): + return self + + def __next__(self): + chunk = next(self.response) + if chunk.type == "message_start": + self.model = chunk.message.model + self.message_id = chunk.message.id + self.input_tokens = chunk.message.usage.input_tokens + elif chunk.type == "message_delta": + self.finish_reason = chunk.delta.stop_reason + self.output_tokens = chunk.usage.output_tokens + elif chunk.type == "content_block_delta": + self.final_result += chunk.delta.text + elif chunk.type == "message_stop": + response = { + "id": self.message_id, + "object": "chat", + "model": self.model, + "choices": [ + { + "index": 0, + "finish_reason": self.finish_reason, + "message": { + "role": "assistant", + "content": self.final_result, + }, + } + ], + "usage": { + "prompt_tokens": self.input_tokens, + "completion_tokens": self.output_tokens, + "total_tokens": self.input_tokens + self.output_tokens, + }, + } + self.partial_log_row["response"] = json.dumps(response) + self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000) + + res = post_request(self.completion_url + "/" + self.completionID, self.partial_log_row) + if res.status_code != 200: + logger.error(f"Failed to insert in log10: {self.partial_log_row} with error {res.text}. Skipping") + + return chunk + + def flatten_messages(messages): flat_messages = [] for message in messages: @@ -491,30 +559,50 @@ def wrapper(*args, **kwargs): response = output # Adjust the Anthropic output to match OAI completion output if "anthropic" in type(output).__module__: + if type(output).__name__ == "Stream": + kind = "chat" + return AnthropicStreamingResponseWrapper( + completion_url=completion_url, + completionID=completionID, + response=response, + partial_log_row={ + "response": response, + "status": "finished", + "stacktrace": json.dumps(stacktrace), + "kind": kind, + "orig_module": func.__module__, + "orig_qualname": func.__qualname__, + "request": json.dumps(kwargs), + "session_id": sessionID, + "tags": global_tags, + }, + ) from log10.anthropic import Anthropic - response = Anthropic.prepare_response(kwargs["prompt"], output, "text") - kind = "completion" - elif type(output).__name__ == "Stream": - kind = "chat" # Should be "stream", but we don't have that kind yet. - return StreamingResponseWrapper( - completion_url=completion_url, - completionID=completionID, - response=response, - partial_log_row={ - "response": response, - "status": "finished", - "stacktrace": json.dumps(stacktrace), - "kind": kind, - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "request": json.dumps(kwargs), - "session_id": sessionID, - "tags": global_tags, - }, - ) + kind = "chat" if response.type == "message" else "completion" + response = Anthropic.prepare_response(output, input_prompt=kwargs.get("prompt", "")) + if "system" in kwargs: + kwargs["messages"].insert(0, {"role": "system", "content": kwargs["system"]}) else: + if type(output).__name__ == "Stream": + kind = "chat" + return StreamingResponseWrapper( + completion_url=completion_url, + completionID=completionID, + response=response, + partial_log_row={ + "response": response, + "status": "finished", + "stacktrace": json.dumps(stacktrace), + "kind": kind, + "orig_module": func.__module__, + "orig_qualname": func.__qualname__, + "request": json.dumps(kwargs), + "session_id": sessionID, + "tags": global_tags, + }, + ) response = output kind = "chat" if output.object == "chat.completion" else "completion" @@ -692,6 +780,11 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): attr = module.resources.completions.Completions method = getattr(attr, "create") setattr(attr, "create", intercepting_decorator(method)) + + # anthropic Messages completion + attr = module.resources.messages.Messages + method = getattr(attr, "create") + setattr(attr, "create", intercepting_decorator(method)) elif module.__name__ == "openai": openai_version = parse(version("openai")) global OPENAI_V1 @@ -784,3 +877,36 @@ def __init__(self, *args, **kwargs): if not getattr(openai, "_log10_patched", False): log10(openai) openai._log10_patched = True + + +try: + import anthropic +except ImportError: + logger.warning("Anthropic not found. Skipping defining log10.load.Anthropic client.") +else: + from anthropic import Anthropic + + class Anthropic(Anthropic): + """ + Example: + >>> from log10.load import Anthropic + >>> client = Anthropic(tags=["test", "load_anthropic"]) + >>> message = client.messages.create( + ... model="claude-3-haiku-20240307", + ... max_tokens=100, + ... temperature=0.9, + ... system="Respond only in Yoda-speak.", + ... messages=[{"role": "user", "content": "How are you today?"}], + ... ) + >>> print(message.content[0].text) + """ + + def __init__(self, *args, **kwargs): + if "tags" in kwargs: + global global_tags + global_tags = kwargs.pop("tags") + super().__init__(*args, **kwargs) + + if not getattr(anthropic, "_log10_patched", False): + log10(anthropic) + anthropic._log10_patched = True diff --git a/poetry.lock b/poetry.lock index f29403f7..6968d73b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -123,22 +123,27 @@ files = [ [[package]] name = "anthropic" -version = "0.3.11" -description = "Client library for the anthropic API" +version = "0.20.0" +description = "The official Python library for the anthropic API" optional = false -python-versions = ">=3.7,<4.0" +python-versions = ">=3.7" files = [ - {file = "anthropic-0.3.11-py3-none-any.whl", hash = "sha256:5c81105cd9ee7388bff3fdb739aaddedc83bbae9b95d51c2d50c13b1ad106138"}, - {file = "anthropic-0.3.11.tar.gz", hash = "sha256:2e0fa5351c9b368cbed0bbd7217deaa9409b82b56afaf244e2196e99eb4fe20e"}, + {file = "anthropic-0.20.0-py3-none-any.whl", hash = "sha256:754fe24596efbe1d7a49d3d59818a2a25d2bf29539e036eafd8a43427cd58134"}, + {file = "anthropic-0.20.0.tar.gz", hash = "sha256:f9f1a5213af4710d6e1f9b83208a4b158355d1a95eb4ae0906c7580aa0e0f1d1"}, ] [package.dependencies] -anyio = ">=3.5.0,<4" +anyio = ">=3.5.0,<5" distro = ">=1.7.0,<2" httpx = ">=0.23.0,<1" pydantic = ">=1.9.0,<3" +sniffio = "*" tokenizers = ">=0.13.0" -typing-extensions = ">=4.5,<5" +typing-extensions = ">=4.7,<5" + +[package.extras] +bedrock = ["boto3 (>=1.28.57)", "botocore (>=1.31.57)"] +vertex = ["google-auth (>=2,<3)"] [[package]] name = "anyio" @@ -2802,4 +2807,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = ">=3.10,<4.0" -content-hash = "44ba60978c119ff8f4236f83b39cf5e72a2a90d4497fc15c176f53929d09b66b" +content-hash = "9cff543fa4150b34a074d95e75027c14af8e44dce43b3905e7c1c83fd1c9f6a6" diff --git a/pyproject.toml b/pyproject.toml index 61388895..4ed4a859 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ google-search-results = "^2.4.2" wikipedia = "^1.4.0" faker = "^19.2.0" backoff = "^2.2.1" -anthropic = "^0.3.11" +anthropic = "<1" mosaicml-cli = "^0.5.30" together = "^0.2.7" magentic = "^0.17.0"