Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Optional Control for Participant Auto-Linking in PipelineAgent #997

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions livekit-agents/livekit/agents/pipeline/pipeline_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __init__(
before_tts_cb: BeforeTTSCallback = _default_before_tts_cb,
plotting: bool = False,
loop: asyncio.AbstractEventLoop | None = None,
auto_link_on_connect: bool = True,
# backward compatibility
will_synthesize_assistant_reply: WillSynthesizeAssistantReply | None = None,
) -> None:
Expand Down Expand Up @@ -203,6 +204,7 @@ def __init__(
"""
super().__init__()
self._loop = loop or asyncio.get_event_loop()
self._auto_link_on_connect = auto_link_on_connect

if will_synthesize_assistant_reply is not None:
logger.warning(
Expand Down Expand Up @@ -364,11 +366,11 @@ def _on_vad_metrics(vad_metrics: vad.VADMetrics) -> None:
self._link_participant(participant.identity)
else:
self._link_participant(participant)
else:
# no participant provided, try to find the first participant in the room
elif self._auto_link_on_connect:
# Automatically link to the first participant if auto-linking is enabled
for participant in self._room.remote_participants.values():
self._link_participant(participant.identity)
break
break # Only connect to the first participant initially

self._main_atask = asyncio.create_task(self._main_task())

Expand Down Expand Up @@ -441,12 +443,35 @@ async def aclose(self) -> None:
await self._deferred_validation.aclose()

def _on_participant_connected(self, participant: rtc.RemoteParticipant):
if self._human_input is not None:
return
logger.debug("_on_participant_connected called")
# Auto-link the participant if no one is assigned or if auto-linking is enabled
if self._auto_link_on_connect and not self._human_input:
self._link_participant(participant.identity)

def link_participant(self, identity: str) -> None:
"""Link a participant manually by identity, replacing any existing HumanInput if necessary."""
logger.info(f"Attempting to manually link participant: {identity}")

# Ensure the participant exists before attempting to link
participant = self._room.remote_participants.get(identity)

if participant:
# Unassign any existing HumanInput
if self._human_input:
old_identity = self._human_input
logger.info(f"Unassigning previous participant: {old_identity}")
self._human_input = None # Clear the existing HumanInput assignment

# Link the new participant
self._link_participant(identity)
logger.info(f"Participant {identity} successfully linked.")
else:
logger.error(f"Participant with identity {identity} not found.")


self._link_participant(participant.identity)

def _link_participant(self, identity: str) -> None:
logger.debug("_link_participant has been called")
participant = self._room.remote_participants.get(identity)
if participant is None:
logger.error("_link_participant must be called with a valid identity")
Expand Down