Skip to content

Commit

Permalink
Wenzhe/test non blocking async post (#179)
Browse files Browse the repository at this point in the history
* create async task instead of await for log10 post calls, require to gather all
pending tasks at the end

* update function name to finalize to gather all tasks

* use finalize in the test

* use fixture scope module for asyncio tests

* update async logging examples with finalize at the end

* ruff check fix

* update doc with async logging
  • Loading branch information
wenzhe-log10 authored Jun 7, 2024
1 parent 59a22e5 commit 70c939f
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 17 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,30 @@ from log10.anthropic import Anthropic
llm = Anthropic({"model": "claude-2"}, log10_config=Log10Config())
```

#### Asynchronous LLM calls
We support OpenAI and Anthropic Async-client (e.g. AsyncOpenAI and AsyncAnthropic client) in their Python SDK
You could use the same code `log10(openai)` or `log10(anthropic)` and then call the async-client to start loggin asynchronous mode (including streaming).

Release `0.9.0` includes significant improvements in how we handle concurrency while using LLM in asynchronous streaming mode.
This update is designed to ensure that logging at steady state incurs no overhead (previously up to 1-2 seconds), providing a smoother and more efficient experience in latency critical settings.

__Important Considerations for Short-Lived Scripts__:
> 💡For short-lived scripts using asynchronous streaming, it's important to note that you may need to wait until all logging requests have been completed before terminating your script.
We have provided a convenient method called `finalize()` to handle this.
Here's how you can implement this in your code:

``` python
from log10._httpx_utils import finalize

...

await finalize()
```
Ensure `finalize()` is called once, at the very end of your event loop to guarantee that all pending logging requests are processed before the script exits.


For more details, check [async logging examples](./examples/logging/).

#### Open-source LLMs
Log open-source LLM calls, e.g. Llama-2, Mistral, etc from providers.
Currently we support inference endpoints on Together.AI and MosaicML (ranked on the top based on our [benchmarking](https://arjunbansal.substack.com/p/which-llama-2-inference-api-should-i-use) on Llama-2 inference providers).
Expand Down
2 changes: 2 additions & 0 deletions examples/logging/anthropic_async_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import anthropic

from log10._httpx_utils import finalize
from log10.load import log10


Expand All @@ -18,6 +19,7 @@ async def main() -> None:
)

print(message)
await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/anthropic_async_messages_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import anthropic

from log10._httpx_utils import finalize
from log10.load import log10


Expand Down Expand Up @@ -30,6 +31,7 @@ async def main() -> None:
# inside of the context manager
accumulated = await stream.get_final_message()
print("accumulated message: ", accumulated.to_json())
await finalize()


asyncio.run(main())
3 changes: 3 additions & 0 deletions examples/logging/anthropic_async_messages_stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from anthropic.types import MessageStreamEvent
from typing_extensions import override

from log10._httpx_utils import finalize
from log10.load import log10


Expand Down Expand Up @@ -34,5 +35,7 @@ async def main() -> None:
accumulated = await stream.get_final_message()
print("accumulated message: ", accumulated.to_json())

await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/anthropic_async_tools_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import anthropic
from anthropic import AsyncAnthropic

from log10._httpx_utils import finalize
from log10.load import log10


Expand Down Expand Up @@ -42,6 +43,7 @@ async def run_conversation():
max_tokens=1024,
) as stream:
await stream.until_done()
await finalize()


asyncio.run(run_conversation())
2 changes: 2 additions & 0 deletions examples/logging/anthropic_async_tools_stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from anthropic.lib.streaming.beta import AsyncToolsBetaMessageStream
from typing_extensions import override

from log10._httpx_utils import finalize
from log10.load import log10


Expand Down Expand Up @@ -49,6 +50,7 @@ async def main() -> None:
) as stream:
await stream.until_done()

await finalize()
print()


Expand Down
2 changes: 2 additions & 0 deletions examples/logging/magentic_async_chat_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from magentic import UserMessage, chatprompt
from magentic.chat_model.anthropic_chat_model import AnthropicChatModel

from log10._httpx_utils import finalize
from log10.load import log10


Expand All @@ -18,6 +19,7 @@ async def main(topic: str) -> str:
async def tell_joke(topic: str) -> str: ...

print(await tell_joke(topic))
await finalize()


asyncio.run(main("cats"))
3 changes: 3 additions & 0 deletions examples/logging/magentic_async_multi_session_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import openai
from magentic import AsyncStreamedStr, OpenaiChatModel, prompt

from log10._httpx_utils import finalize
from log10.load import log10, log10_session


Expand All @@ -29,5 +30,7 @@ async def main():
async for chunk in result:
print(chunk, end="", flush=True)

await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/magentic_async_parallel_function_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import openai
from magentic import AsyncParallelFunctionCall, prompt

from log10._httpx_utils import finalize
from log10.load import log10


Expand All @@ -25,6 +26,7 @@ async def main():
output = await plus_and_minus(2, 3)
async for chunk in output:
print(chunk)
await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/magentic_async_stream_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import openai
from magentic import AsyncStreamedStr, prompt

from log10._httpx_utils import finalize
from log10.load import log10, log10_session


Expand All @@ -19,6 +20,7 @@ async def main():
output = await tell_story("Europe.")
async for chunk in output:
print(chunk, end="", flush=True)
await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/magentic_async_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from magentic import OpenaiChatModel, prompt
from pydantic import BaseModel

from log10._httpx_utils import finalize
from log10.load import log10


Expand All @@ -31,6 +32,7 @@ async def _generate_title_and_description(query: str, widget_data: str) -> Widge
async def main():
r = await _generate_title_and_description(query="Give me a summary of AAPL", widget_data="<the summary>")
rich.print(r)
await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/openai_async_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import openai
from openai import AsyncOpenAI

from log10._httpx_utils import finalize
from log10.load import log10


Expand All @@ -17,6 +18,7 @@ async def main():
messages=[{"role": "user", "content": "Say this is a test"}],
)
print(completion.choices[0].message.content)
await finalize()


asyncio.run(main())
4 changes: 3 additions & 1 deletion examples/logging/openai_async_stream_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import openai
from openai import AsyncOpenAI

from log10._httpx_utils import finalize
from log10.load import log10


Expand All @@ -14,11 +15,12 @@
async def main():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Count to 50."}],
messages=[{"role": "user", "content": "Count to 20."}],
stream=True,
)
async for chunk in stream:
print(chunk.choices[0].delta.content or "", end="", flush=True)
await finalize()


asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/logging/openai_async_tools_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from openai import AsyncOpenAI
from rich import print

from log10._httpx_utils import finalize
from log10.load import log10


Expand Down Expand Up @@ -72,6 +73,7 @@ async def run_conversation():
else:
tool_calls[-1].function.arguments += tc[0].function.arguments
print(tool_calls)
await finalize()
return


Expand Down
18 changes: 15 additions & 3 deletions log10/_httpx_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import logging
import time
Expand Down Expand Up @@ -220,7 +221,7 @@ async def log_request(request: Request):
}
if get_log10_session_tags():
log_row["tags"] = get_log10_session_tags()
await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
asyncio.create_task(_try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row))


class _LogResponse(Response):
Expand Down Expand Up @@ -304,7 +305,10 @@ async def aiter_bytes(self, *args, **kwargs):
}
if get_log10_session_tags():
log_row["tags"] = get_log10_session_tags()
await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
asyncio.create_task(
_try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
)

yield chunk

def is_response_end_reached(self, text: str):
Expand Down Expand Up @@ -502,7 +506,9 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
}
if get_log10_session_tags():
log_row["tags"] = get_log10_session_tags()
await _try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
asyncio.create_task(
_try_post_request_async(url=f"{base_url}/api/completions/{completion_id}", payload=log_row)
)
return response
elif response.headers.get("content-type").startswith("text/event-stream"):
return _LogResponse(
Expand All @@ -515,3 +521,9 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:

# In case of an error, get out of the way
return response


async def finalize():
pending = asyncio.all_tasks()
pending.remove(asyncio.current_task())
await asyncio.gather(*pending)
14 changes: 10 additions & 4 deletions tests/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from anthropic.lib.streaming.beta import AsyncToolsBetaMessageStream
from typing_extensions import override

from log10._httpx_utils import finalize
from log10.load import log10
from tests.utils import _LogAssertion

Expand All @@ -34,7 +35,7 @@ def test_messages_create(session, anthropic_model):

@pytest.mark.chat
@pytest.mark.async_client
@pytest.mark.asyncio
@pytest.mark.asyncio(scope="module")
async def test_messages_create_async(session, anthropic_model):
client = anthropic.AsyncAnthropic()

Expand All @@ -48,6 +49,8 @@ async def test_messages_create_async(session, anthropic_model):

text = message.content[0].text
assert isinstance(text, str)

await finalize()
_LogAssertion(completion_id=session.last_completion_id(), message_content=text).assert_chat_response()


Expand Down Expand Up @@ -152,7 +155,7 @@ def test_beta_tools_messages_create(session, anthropic_model):

@pytest.mark.chat
@pytest.mark.async_client
@pytest.mark.asyncio
@pytest.mark.asyncio(scope="module")
async def test_beta_tools_messages_create_async(session, anthropic_model):
client = anthropic.AsyncAnthropic()

Expand All @@ -163,6 +166,7 @@ async def test_beta_tools_messages_create_async(session, anthropic_model):
)

text = message.content[0].text
await finalize()
_LogAssertion(completion_id=session.last_completion_id(), message_content=text).assert_chat_response()


Expand Down Expand Up @@ -196,7 +200,7 @@ def test_messages_stream_context_manager(session, anthropic_model):
@pytest.mark.stream
@pytest.mark.context_manager
@pytest.mark.async_client
@pytest.mark.asyncio
@pytest.mark.asyncio(scope="module")
async def test_messages_stream_context_manager_async(session, anthropic_model):
client = anthropic.AsyncAnthropic()

Expand All @@ -214,6 +218,7 @@ async def test_messages_stream_context_manager_async(session, anthropic_model):
async for text in stream.text_stream:
output += text

await finalize()
_LogAssertion(completion_id=session.last_completion_id(), message_content=output).assert_chat_response()


Expand Down Expand Up @@ -261,7 +266,7 @@ def test_tools_messages_stream_context_manager(session, anthropic_model):
@pytest.mark.stream
@pytest.mark.context_manager
@pytest.mark.async_client
@pytest.mark.asyncio
@pytest.mark.asyncio(scope="module")
async def test_tools_messages_stream_context_manager_async(session, anthropic_model):
client = anthropic.AsyncAnthropic()
json_snapshot = None
Expand Down Expand Up @@ -307,6 +312,7 @@ async def on_input_json(self, delta: str, snapshot: object) -> None:
if json_snapshot:
output += json.dumps(json_snapshot)

await finalize()
assert output, "No output from the model."
assert session.last_completion_id(), "No completion ID found."
## TODO fix this test after the anthropic fixes for the tool_calls
Expand Down
6 changes: 4 additions & 2 deletions tests/test_litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_completion_stream(session, openai_model):
@pytest.mark.async_client
@pytest.mark.chat
@pytest.mark.stream
@pytest.mark.asyncio
@pytest.mark.asyncio(scope="module")
async def test_completion_async_stream(anthropic_model):
response = await litellm.acompletion(
model=anthropic_model, messages=[{"role": "user", "content": "count to 8"}], stream=True
Expand Down Expand Up @@ -78,6 +78,8 @@ def test_image(session, openai_vision_model):
content = resp.choices[0].message.content
assert isinstance(content, str)

# Wait for the completion to be logged
time.sleep(3)
_LogAssertion(completion_id=session.last_completion_id(), message_content=content).assert_chat_response()


Expand Down Expand Up @@ -117,7 +119,7 @@ def test_image_stream(session, anthropic_model):
@pytest.mark.async_client
@pytest.mark.stream
@pytest.mark.vision
@pytest.mark.asyncio
@pytest.mark.asyncio(scope="module")
async def test_image_async_stream(session, anthropic_model):
image_url = "https://upload.wikimedia.org/wikipedia/commons/e/e8/Log10.png"
image_media_type = "image/png"
Expand Down
Loading

0 comments on commit 70c939f

Please sign in to comment.