-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
101 lines (85 loc) · 3.97 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
import asyncio
from openai import AsyncOpenAI
import chainlit as cl
from uuid import uuid4
from chainlit.logger import logger
from realtime import RealtimeClient
from realtime.tools import tools
client = AsyncOpenAI()
async def setup_openai_realtime():
"""Instantiate and configure the OpenAI Realtime Client"""
openai_realtime = RealtimeClient(api_key=os.getenv("OPENAI_API_KEY"))
cl.user_session.set("track_id", str(uuid4()))
async def handle_conversation_updated(event):
item = event.get("item")
delta = event.get("delta")
"""Currently used to stream audio back to the client."""
if delta:
# Only one of the following will be populated for any given event
if 'audio' in delta:
audio = delta['audio'] # Int16Array, audio added
await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
if 'transcript' in delta:
transcript = delta['transcript'] # string, transcript added
pass
if 'arguments' in delta:
arguments = delta['arguments'] # string, function arguments added
pass
async def handle_item_completed(item):
"""Used to populate the chat context with transcription once an item is completed."""
# print(item) # TODO
pass
async def handle_conversation_interrupt(event):
"""Used to cancel the client previous audio playback."""
cl.user_session.set("track_id", str(uuid4()))
await cl.context.emitter.send_audio_interrupt()
async def handle_error(event):
logger.error(event)
openai_realtime.on('conversation.updated', handle_conversation_updated)
openai_realtime.on('conversation.item.completed', handle_item_completed)
openai_realtime.on('conversation.interrupted', handle_conversation_interrupt)
openai_realtime.on('error', handle_error)
cl.user_session.set("openai_realtime", openai_realtime)
coros = [openai_realtime.add_tool(tool_def, tool_handler) for tool_def, tool_handler in tools]
await asyncio.gather(*coros)
@cl.on_chat_start
async def start():
await cl.Message(
content="Willkommen zur DialogAI! Ich würde am liebsten mit dir sprechen. Drücke AltGr+P um das Mikrophon einzuschalten."
).send()
await setup_openai_realtime()
@cl.on_message
async def on_message(message: cl.Message):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
# TODO: Try image processing with message.elements
await openai_realtime.send_user_message_content([{ "type": 'input_text', "text": message.content }])
else:
await cl.Message(content="Bitte aktiviere zunächst den Audio-Modus, bevor du Text-Nachrichten schickst.").send()
@cl.on_audio_start
async def on_audio_start():
try:
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
await openai_realtime.connect()
logger.info("Connected to OpenAI realtime")
# TODO: might want to recreate items to restore context
# openai_realtime.create_conversation_item(item)
return True
except Exception as e:
await cl.ErrorMessage(content=f"Failed to connect to OpenAI realtime: {e}").send()
return False
@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime.is_connected():
await openai_realtime.append_input_audio(chunk.data)
else:
logger.info("RealtimeClient is not connected")
@cl.on_audio_end
@cl.on_chat_end
@cl.on_stop
async def on_end():
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.disconnect()