Skip to content

Commit

Permalink
add anthropic messages support and stream (#118)
Browse files Browse the repository at this point in the history
* add anthropic messages support and stream
add examples

* update anthropic version dependency

* add log10.load Anthropic client

* use claude-instant-1.2 in anthropic completions example
  • Loading branch information
wenzhe-log10 committed Mar 15, 2024
1 parent 045e0ed commit 1b7b4ef
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 48 deletions.
2 changes: 1 addition & 1 deletion examples/logging/anthropic_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

response = client.completions.create(
model="claude-1",
model="claude-instant-1.2",
prompt=f"\n\nHuman:Write the names of all Star Wars movies and spinoffs along with the time periods in which they were set?{anthropic.AI_PROMPT}",
temperature=0,
max_tokens_to_sample=1024,
Expand Down
18 changes: 18 additions & 0 deletions examples/logging/anthropic_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import anthropic

from log10.load import log10


log10(anthropic)

client = anthropic.Anthropic()

message = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1000,
temperature=0.0,
system="Respond only in Yoda-speak.",
messages=[{"role": "user", "content": "How are you today?"}],
)

print(message.content[0].text)
26 changes: 26 additions & 0 deletions examples/logging/anthropic_messages_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import anthropic
from anthropic import Anthropic

from log10.load import log10


log10(anthropic)


client = Anthropic()

stream = client.messages.create(
model="claude-3-opus-20240229",
messages=[
{
"role": "user",
"content": "Tell a 50 words joke.",
}
],
max_tokens=128,
temperature=0.9,
stream=True,
)
for event in stream:
if event.type == "content_block_delta":
print(event.delta.text, end="", flush=True)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Count to 200"}],
messages=[{"role": "user", "content": "Count to 20"}],
temperature=0,
stream=True,
)
Expand Down
39 changes: 22 additions & 17 deletions log10/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def chat(self, messages: List[Message], hparams: dict = None) -> ChatCompletion:
completion = self.client.completions.create(**chat_request)
content = completion.completion

response = Anthropic.prepare_response(chat_request["prompt"], completion, "chat")
response = Anthropic.prepare_response(completion, chat_request["prompt"])

self.log_end(
completion_id,
Expand Down Expand Up @@ -83,7 +83,7 @@ def text(self, prompt: str, hparams: dict = None) -> TextCompletion:
completion = self.client.completions.create(**text_request)
text = completion.completion

response = Anthropic.prepare_response(text_request["prompt"], completion, "text")
response = Anthropic.prepare_response(completion, text_request["prompt"])

self.log_end(
completion_id,
Expand Down Expand Up @@ -132,30 +132,35 @@ def create_tokens_usage(prompt: str, completion: str):
}

@staticmethod
def prepare_response(prompt: str, completion: anthropic.types.Completion, type: str = "text") -> dict:
if completion.stop_reason == "stop_sequence":
def prepare_response(
response: anthropic.types.Completion | anthropic.types.Message, input_prompt: str = ""
) -> dict:
if response.stop_reason in ["stop_sequence", "end_turn"]:
reason = "stop"
elif completion.stop_reason == "max_tokens":
elif response.stop_reason == "max_tokens":
reason = "length"

tokens_usage = Anthropic.create_tokens_usage(prompt, completion.completion)
response = {
"id": completion.model_extra["log_id"],
ret_response = {
"id": response.id,
"object": "completion",
"model": completion.model,
"model": response.model,
"choices": [
{
"index": 0,
"finish_reason": reason,
}
],
"usage": tokens_usage,
}
if type == "text":
response["choices"][0]["text"] = completion.completion
elif type == "chat":
response["choices"][0]["message"] = {
"role": "assistant",
"content": completion.completion,
if isinstance(response, anthropic.types.Message):
tokens_usage = {
"prompt_tokens": response.usage.input_tokens,
"completion_tokens": response.usage.output_tokens,
"total_tokens": response.usage.input_tokens + response.usage.output_tokens,
}
return response
ret_response["choices"][0]["message"] = {"role": response.role, "content": response.content[0].text}
elif isinstance(response, anthropic.types.Completion):
tokens_usage = Anthropic.create_tokens_usage(input_prompt, response.completion)
ret_response["choices"][0]["text"] = response.completion
ret_response["usage"] = tokens_usage

return ret_response
166 changes: 146 additions & 20 deletions log10/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,14 @@ async def log_async(completion_url, func, **kwargs):
if "messages" in kwargs:
kwargs["messages"] = flatten_messages(kwargs["messages"])

if "anthropic" in func.__module__:
if "system" in kwargs:
kwargs["messages"].insert(0, {"role": "system", "content": kwargs["system"]})

log_row = {
# do we want to also store args?
"status": "started",
"kind": "chat" if "chat" in func.__module__ or "messages" in func.__module__ else "completion",
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
Expand Down Expand Up @@ -384,6 +389,69 @@ def __next__(self):
raise se


class AnthropicStreamingResponseWrapper:
"""
Wraps a streaming response object to log the final result and duration to log10.
"""

def __init__(self, completion_url, completionID, response, partial_log_row):
self.completionID = completionID
self.completion_url = completion_url
self.partial_log_row = partial_log_row
self.response = response
self.final_result = ""
self.start_time = time.perf_counter()
self.message_id = None
self.model = None
self.finish_reason = None
self.input_tokens = 0
self.output_tokens = 0

def __iter__(self):
return self

def __next__(self):
chunk = next(self.response)
if chunk.type == "message_start":
self.model = chunk.message.model
self.message_id = chunk.message.id
self.input_tokens = chunk.message.usage.input_tokens
elif chunk.type == "message_delta":
self.finish_reason = chunk.delta.stop_reason
self.output_tokens = chunk.usage.output_tokens
elif chunk.type == "content_block_delta":
self.final_result += chunk.delta.text
elif chunk.type == "message_stop":
response = {
"id": self.message_id,
"object": "chat",
"model": self.model,
"choices": [
{
"index": 0,
"finish_reason": self.finish_reason,
"message": {
"role": "assistant",
"content": self.final_result,
},
}
],
"usage": {
"prompt_tokens": self.input_tokens,
"completion_tokens": self.output_tokens,
"total_tokens": self.input_tokens + self.output_tokens,
},
}
self.partial_log_row["response"] = json.dumps(response)
self.partial_log_row["duration"] = int((time.perf_counter() - self.start_time) * 1000)

res = post_request(self.completion_url + "/" + self.completionID, self.partial_log_row)
if res.status_code != 200:
logger.error(f"Failed to insert in log10: {self.partial_log_row} with error {res.text}. Skipping")

return chunk


def flatten_messages(messages):
flat_messages = []
for message in messages:
Expand Down Expand Up @@ -491,30 +559,50 @@ def wrapper(*args, **kwargs):
response = output
# Adjust the Anthropic output to match OAI completion output
if "anthropic" in type(output).__module__:
if type(output).__name__ == "Stream":
kind = "chat"
return AnthropicStreamingResponseWrapper(
completion_url=completion_url,
completionID=completionID,
response=response,
partial_log_row={
"response": response,
"status": "finished",
"stacktrace": json.dumps(stacktrace),
"kind": kind,
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
},
)
from log10.anthropic import Anthropic

response = Anthropic.prepare_response(kwargs["prompt"], output, "text")
kind = "completion"
elif type(output).__name__ == "Stream":
kind = "chat" # Should be "stream", but we don't have that kind yet.
return StreamingResponseWrapper(
completion_url=completion_url,
completionID=completionID,
response=response,
partial_log_row={
"response": response,
"status": "finished",
"stacktrace": json.dumps(stacktrace),
"kind": kind,
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
},
)
kind = "chat" if response.type == "message" else "completion"
response = Anthropic.prepare_response(output, input_prompt=kwargs.get("prompt", ""))

if "system" in kwargs:
kwargs["messages"].insert(0, {"role": "system", "content": kwargs["system"]})
else:
if type(output).__name__ == "Stream":
kind = "chat"
return StreamingResponseWrapper(
completion_url=completion_url,
completionID=completionID,
response=response,
partial_log_row={
"response": response,
"status": "finished",
"stacktrace": json.dumps(stacktrace),
"kind": kind,
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
},
)
response = output
kind = "chat" if output.object == "chat.completion" else "completion"

Expand Down Expand Up @@ -692,6 +780,11 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True):
attr = module.resources.completions.Completions
method = getattr(attr, "create")
setattr(attr, "create", intercepting_decorator(method))

# anthropic Messages completion
attr = module.resources.messages.Messages
method = getattr(attr, "create")
setattr(attr, "create", intercepting_decorator(method))
elif module.__name__ == "openai":
openai_version = parse(version("openai"))
global OPENAI_V1
Expand Down Expand Up @@ -784,3 +877,36 @@ def __init__(self, *args, **kwargs):
if not getattr(openai, "_log10_patched", False):
log10(openai)
openai._log10_patched = True


try:
import anthropic
except ImportError:
logger.warning("Anthropic not found. Skipping defining log10.load.Anthropic client.")
else:
from anthropic import Anthropic

class Anthropic(Anthropic):
"""
Example:
>>> from log10.load import Anthropic
>>> client = Anthropic(tags=["test", "load_anthropic"])
>>> message = client.messages.create(
... model="claude-3-haiku-20240307",
... max_tokens=100,
... temperature=0.9,
... system="Respond only in Yoda-speak.",
... messages=[{"role": "user", "content": "How are you today?"}],
... )
>>> print(message.content[0].text)
"""

def __init__(self, *args, **kwargs):
if "tags" in kwargs:
global global_tags
global_tags = kwargs.pop("tags")
super().__init__(*args, **kwargs)

if not getattr(anthropic, "_log10_patched", False):
log10(anthropic)
anthropic._log10_patched = True
21 changes: 13 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ google-search-results = "^2.4.2"
wikipedia = "^1.4.0"
faker = "^19.2.0"
backoff = "^2.2.1"
anthropic = "^0.3.11"
anthropic = "<1"
mosaicml-cli = "^0.5.30"
together = "^0.2.7"
magentic = "^0.17.0"
Expand Down

0 comments on commit 1b7b4ef

Please sign in to comment.