Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: added trigger-phrase agent example #800

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
23 changes: 23 additions & 0 deletions examples/trigger-phrase/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Trigger Phrase Initiated Agent

This example demonstrates an agent that only responds to the user queries if the user provided trigger phrase is validated in the beginning of a speech.

The trigger phrase can be edited by changing this line:

```
trigger_phrase = "Hi Bob!"
nbsp marked this conversation as resolved.
Show resolved Hide resolved
```

The example uses Deepgram's STT, OpenAI's LLM, and ElevenLabs' TTS, but can be switched to other plugins as well.

## Running the example

```bash
export LIVEKIT_URL=<your LiveKit server URL>
export LIVEKIT_API_KEY=<your API Key>
export LIVEKIT_API_SECRET=<your API Secret>
export DEEPGRAM_API_KEY=<your Deepgram API key>
export OPENAI_API_KEY=<your OpenAI API key>
export ELEVEN_API_KEY=<your ElevenLabs API key>
python3 agent.py start
```
166 changes: 166 additions & 0 deletions examples/trigger-phrase/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import asyncio
import logging
from typing import AsyncIterable, Optional

from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
AutoSubscribe,
JobContext,
WorkerOptions,
cli,
stt,
tokenize,
tts,
llm,
)
from livekit.plugins import deepgram, openai, silero, elevenlabs

load_dotenv()

Check failure on line 19 in examples/trigger-phrase/agent.py

View workflow job for this annotation

GitHub Actions / build

Ruff (I001)

examples/trigger-phrase/agent.py:1:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger("my-worker")
logger.setLevel(logging.INFO)

word_tokenizer_without_punctuation: tokenize.WordTokenizer = (
tokenize.basic.WordTokenizer(ignore_punctuation=True)
)

trigger_phrase = "Hi Bob!"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: TRIGGER_PHRASE instead to show that this is a changeable constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh okay, didn't know that - thanks!

trigger_phrase_words = word_tokenizer_without_punctuation.tokenize(text=trigger_phrase)


async def _playout_task(
playout_q: asyncio.Queue, audio_source: rtc.AudioSource
) -> None:
# Playout audio frames from the queue to the audio source
while True:
frame = await playout_q.get()
if frame is None:
break

await audio_source.capture_frame(frame)


async def _respond_to_user(
stt_stream: stt.SpeechStream,
tts: tts.TTS,
agent_audio_source: rtc.AudioSource,
local_llm: llm.LLM,
llm_stream: llm.LLMStream,
):
playout_q = asyncio.Queue[Optional[rtc.AudioFrame]]()
tts_stream = tts.stream()

async def _synth_task():
async for ev in tts_stream:
playout_q.put_nowait(ev.frame)

playout_q.put_nowait(None)

synth_task = asyncio.create_task(_synth_task())
playout_task = asyncio.create_task(_playout_task(playout_q, agent_audio_source))

async for ev in stt_stream:
if ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:

new_transcribed_text_words = word_tokenizer_without_punctuation.tokenize(
text=ev.alternatives[0].text
)
for i in range(len(trigger_phrase_words)):
if (
len(new_transcribed_text_words) < len(trigger_phrase_words)
or trigger_phrase_words[i].lower()
!= new_transcribed_text_words[i].lower()
):
# ignore user speech by not sending it to LLM
break
elif i == len(trigger_phrase_words) - 1:
# trigger phrase is validated
new_chat_context = llm_stream.chat_ctx.append(
text=ev.alternatives[0].text
)
llm_stream = local_llm.chat(chat_ctx=new_chat_context)
llm_reply_stream = _llm_stream_to_str_iterable(llm_stream)
async for seg in llm_reply_stream:
tts_stream.push_text(seg)
tts_stream.flush()
nbsp marked this conversation as resolved.
Show resolved Hide resolved
await asyncio.gather(synth_task, playout_task)
await tts_stream.aclose()


async def _llm_stream_to_str_iterable(stream: llm.LLMStream) -> AsyncIterable[str]:
async for chunk in stream:
content = chunk.choices[0].delta.content
if content is None:
continue
yield content


async def entrypoint(ctx: JobContext):
logger.info("starting trigger-phrase agent example")

vad = silero.VAD.load(
min_speech_duration=0.01,
min_silence_duration=0.5,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should have this be in the prewarm function so it doesn't block the job from starting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oope I see it in the docs now, should have read it better 🤦‍♂️


stt_local = stt.StreamAdapter(
stt=deepgram.STT(keywords=[(trigger_phrase, 3.5)]), vad=vad
)
stt_stream = stt_local.stream()

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
first_participant = await ctx.wait_for_participant()

# publish agent track
tts = elevenlabs.TTS(model_id="eleven_turbo_v2")
agent_audio_source = rtc.AudioSource(tts.sample_rate, tts.num_channels)
agent_track = rtc.LocalAudioTrack.create_audio_track(
"agent-mic", agent_audio_source
)
options = rtc.TrackPublishOptions()
options.source = rtc.TrackSource.SOURCE_MICROPHONE
publication = await ctx.room.local_participant.publish_track(agent_track, options)

Check failure on line 123 in examples/trigger-phrase/agent.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F841)

examples/trigger-phrase/agent.py:123:5: F841 Local variable `publication` is assigned to but never used

# setup LLM
initial_ctx = llm.ChatContext().append(
role="system",
text=(
f"You are {trigger_phrase}, a voice assistant created by LiveKit. Your interface with users will be voice. "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird misleading use of trigger_phrase here. this implies that it can only be used as a name, i think it's best to drop it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree. I was also debating about this but my thought was that it might be helpful to give the LLM a bit more context

"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)
local_llm = openai.LLM()
llm_stream = local_llm.chat(chat_ctx=initial_ctx)

async def subscribe_track(participant: rtc.RemoteParticipant, track: rtc.Track):
audio_stream = rtc.AudioStream(track)
asyncio.create_task(
_respond_to_user(
stt_stream=stt_stream,
tts=tts,
agent_audio_source=agent_audio_source,
local_llm=local_llm,
llm_stream=llm_stream,
)
)

async for ev in audio_stream:
stt_stream.push_frame(ev.frame)

@ctx.room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant,
):
if (
track.kind == rtc.TrackKind.KIND_AUDIO
and participant.identity == first_participant.identity
):
subscribe_task = asyncio.create_task(subscribe_track(participant, track))
asyncio.gather(subscribe_task)


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
Loading