Skip to content

Commit

Permalink
Handle latency values from WebRTC worker
Browse files Browse the repository at this point in the history
  • Loading branch information
juberti committed Nov 29, 2023
1 parent 4cd5eaf commit f5d75ac
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 48 deletions.
79 changes: 46 additions & 33 deletions packages/voice/src/app/agent/chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class ChatRequest {
public outMessage = '';
public conversationId?: string;
public done = false;
public onUpdate?: (request: ChatRequest, newText: string) => void;
public onUpdate?: (request: ChatRequest, newText: string, firstToken: boolean) => void;
public onComplete?: (request: ChatRequest) => void;
public startMillis?: number;
public requestLatency?: number;
Expand Down Expand Up @@ -135,8 +135,9 @@ export class ChatRequest {
console.log(`[chat] received agent response, latency=${this.requestLatency.toFixed(0)} ms`);
}

const firstToken = this.outMessage.length === 0;
this.outMessage += newText;
this.onUpdate?.(this, newText);
this.onUpdate?.(this, newText, firstToken);
}
}

Expand Down Expand Up @@ -173,6 +174,7 @@ export class ChatRequest {
}

const reader = response.body!.pipeThrough(new TextDecoderStream()).pipeThrough(jsonLinesTransformer()).getReader();
let firstToken = true;

while (true) {
const { done, value } = await reader.read();
Expand Down Expand Up @@ -214,7 +216,8 @@ export class ChatRequest {
}

this.outMessage = currentMessage;
this.onUpdate?.(this, delta);
this.onUpdate?.(this, delta, firstToken);
firstToken = false;

if (currentTurn.state === 'done') {
this.ensureComplete();
Expand Down Expand Up @@ -259,11 +262,9 @@ export interface ChatManagerInit {
*/
export interface ChatManager {
onStateChange?: (state: ChatManagerState) => void;
onInputChange?: (text: string, final: boolean, latency?: number) => void;
onOutputChange?: (text: string, final: boolean, latency: number) => void;
onAudioGenerate?: (latency: number) => void;
onAudioStart?: (latency: number) => void;
onAudioEnd?: () => void;
onInputChange?: (text: string, final: boolean) => void;
onOutputChange?: (text: string, final: boolean) => void;
onLatencyChange?: (kind: string, latency: number) => void;
onError?: () => void;

state: ChatManagerState;
Expand All @@ -289,11 +290,9 @@ export class LocalChatManager implements ChatManager {
private readonly agentId: string;
private readonly docs: boolean;
onStateChange?: (state: ChatManagerState) => void;
onInputChange?: (text: string, final: boolean, latency?: number) => void;
onOutputChange?: (text: string, final: boolean, latency: number) => void;
onAudioGenerate?: (latency: number) => void;
onAudioStart?: (latency: number) => void;
onAudioEnd?: () => void;
onInputChange?: (text: string, final: boolean) => void;
onOutputChange?: (text: string, final: boolean) => void;
onLatencyChange?: (kind: string, latency: number) => void;
onError?: () => void;
constructor({ asrProvider, asrLanguage, ttsProvider, ttsModel, ttsVoice, model, agentId, docs }: ChatManagerInit) {
this.micManager = new MicManager();
Expand Down Expand Up @@ -402,7 +401,7 @@ export class LocalChatManager implements ChatManager {
final ? ' FINAL' : ''
} latency=${adjustedLatency?.toFixed(0)} ms`
);
this.onInputChange?.(text, final, latency);
this.onInputChange?.(text, final);

// Ignore partial transcripts if VAD indicates the user is still speaking.
if (!final && this.micManager.isVoiceActive) {
Expand All @@ -417,6 +416,7 @@ export class LocalChatManager implements ChatManager {
if (final) {
this.history = newMessages;
this.micManager.isEnabled = false;
this.onLatencyChange?.('asr', adjustedLatency!);
}

// If it doesn't match an existing request, kick off a new one.
Expand All @@ -434,7 +434,7 @@ export class LocalChatManager implements ChatManager {
*/
private dispatchRequest(normalized: string, messages: ChatMessage[], final: boolean) {
const request = new ChatRequest(messages, this.model, this.agentId, this.docs, final);
request.onUpdate = (request, newText) => this.handleRequestUpdate(request, newText);
request.onUpdate = (request, newText, firstToken) => this.handleRequestUpdate(request, newText, firstToken);
request.onComplete = (request) => this.handleRequestDone(request);
this.pendingRequests.set(normalized, request);
request.start();
Expand All @@ -446,7 +446,7 @@ export class LocalChatManager implements ChatManager {
request.active = true;
this.tts.play(request.outMessage);
if (!request.done) {
this.onOutputChange?.(request.outMessage, false, request.requestLatency!);
this.onOutputChange?.(request.outMessage, false);
} else {
this.finishRequest(request);
}
Expand All @@ -464,9 +464,12 @@ export class LocalChatManager implements ChatManager {
* Handle new in-progress responses from the LLM. If the request is not marked
* as active, it's a speculative request that we ignore for now.
*/
private handleRequestUpdate(request: ChatRequest, newText: string) {
private handleRequestUpdate(request: ChatRequest, newText: string, firstToken: boolean) {
if (request.active) {
this.onOutputChange?.(request.outMessage, false, request.requestLatency!);
this.onOutputChange?.(request.outMessage, false);
if (firstToken) {
this.onLatencyChange?.('llm', request.streamLatency!);
}
this.tts.play(newText);
}
}
Expand All @@ -489,29 +492,28 @@ export class LocalChatManager implements ChatManager {
const assistantMessage = new ChatMessage('assistant', request.outMessage, request.conversationId);
this.history.push(assistantMessage);
this.pendingRequests.clear();
this.onOutputChange?.(request.outMessage, true, request.requestLatency!);
this.onOutputChange?.(request.outMessage, true);
}
/**
* Handle the start of generation from the TTS.
*/
private handleGenerationStart() {
if (this._state != ChatManagerState.THINKING) return;
this.onAudioGenerate?.(this.tts.bufferLatency!);
this.onLatencyChange?.('llmt', this.tts.bufferLatency!);
}
/**
* Handle the start of playout from the TTS.
*/
private handlePlaybackStart() {
if (this._state != ChatManagerState.THINKING) return;
this.changeState(ChatManagerState.SPEAKING);
this.onAudioStart?.(this.tts.latency! - this.tts.bufferLatency!);
this.onLatencyChange?.('tts', this.tts.latency! - this.tts.bufferLatency!);
}
/**
* Handle the end of playout from the TTS.
*/
private handlePlaybackComplete() {
if (this._state != ChatManagerState.SPEAKING) return;
this.onAudioEnd?.();
if (this._state != ChatManagerState.SPEAKING) return;
this.micManager.isEnabled = true;
this.changeState(ChatManagerState.LISTENING);
}
Expand Down Expand Up @@ -550,11 +552,9 @@ export class WebRtcChatManager implements ChatManager {
private outAnalyzer?: StreamAnalyzer;
private pinger?: NodeJS.Timer;
onStateChange?: (state: ChatManagerState) => void;
onInputChange?: (text: string, final: boolean, latency?: number) => void;
onOutputChange?: (text: string, final: boolean, latency: number) => void;
onAudioGenerate?: (latency: number) => void;
onAudioStart?: (latency: number) => void;
onAudioEnd?: () => void;
onInputChange?: (text: string, final: boolean) => void;
onOutputChange?: (text: string, final: boolean) => void;
onLatencyChange?: (kind: string, latency: number) => void;
onError?: () => void;

constructor(params: ChatManagerInit) {
Expand All @@ -581,6 +581,7 @@ export class WebRtcChatManager implements ChatManager {
}
async start() {
console.log('[chat] starting');
this.audioContext.resume();
this.audioElement.play();
const localTracks = await createLocalTracks({ audio: true, video: false });
this.localAudioTrack = localTracks[0] as LocalAudioTrack;
Expand Down Expand Up @@ -677,8 +678,6 @@ export class WebRtcChatManager implements ChatManager {
const audioTrack = track as RemoteAudioTrack;
audioTrack.on(TrackEvent.AudioPlaybackStarted, () => console.log(`[chat] audio playback started`));
audioTrack.on(TrackEvent.AudioPlaybackFailed, (err) => console.error(`[chat] audio playback failed`, err));
// TODO Farzad: Figure out why setting audioContext here is necessary.
audioTrack.setAudioContext(this.audioContext);
audioTrack.attach(this.audioElement);
this.outAnalyzer = new StreamAnalyzer(this.audioContext, track.mediaStream!);
if (this.delayedSpeakingState) {
Expand All @@ -701,12 +700,26 @@ export class WebRtcChatManager implements ChatManager {
this.changeState(newState);
}
} else if (data.type === 'transcript') {
const finalText = data.transcript.final ? ' FINAL' : '';
console.log(`[chat] input: ${data.transcript.text}${finalText}`);
this.handleInputChange(data.transcript);
} else if (data.type === 'output') {
console.log(`[chat] output: ${data.text}`);
this.handleOutputChange(data.text, data.final);
} else if (data.type == 'latency') {
this.handleLatency(data.kind, data.latency);
}
}
private handleInputChange(transcript: Transcript) {
const finalText = transcript.final ? ' FINAL' : '';
console.log(`[chat] input: ${transcript.text}${finalText}`);
this.onInputChange?.(transcript.text, transcript.final);
}
private handleOutputChange(text: string, final: boolean) {
console.log(`[chat] output: ${text}`);
this.onOutputChange?.(text, final);
}
private handleLatency(kind: string, latency: number) {
console.log(`[chat] latency: ${kind} ${latency.toFixed(0)} ms`);
this.onLatencyChange?.(kind, latency);
}
}

export function createChatManager(init: ChatManagerInit): ChatManager {
Expand Down
36 changes: 21 additions & 15 deletions packages/voice/src/app/agent/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -265,27 +265,33 @@ const AgentPageComponent: React.FC = () => {
setHelpText(idleText);
}
};
manager.onInputChange = (text, final, latency) => {
setInput(text);
if (final && latency) {
setAsrLatency(latency);
setLlmResponseLatency(0);
setLlmTokenLatency(0);
setTtsLatency(0);
}
manager.onInputChange = (text, final) => {
setInput(text);
};
manager.onOutputChange = (text, final, latency) => {
manager.onOutputChange = (text, final) => {
setOutput(text);
if (final) {
setInput('');
}
setLlmResponseLatency((prev) => (prev ? prev : latency));
};
manager.onAudioGenerate = (latency) => {
setLlmTokenLatency(latency);
};
manager.onAudioStart = (latency) => {
setTtsLatency(latency);
manager.onLatencyChange = (kind, latency) => {
switch (kind) {
case 'asr':
setAsrLatency(latency);
setLlmResponseLatency(0);
setLlmTokenLatency(0);
setTtsLatency(0);
break;
case 'llm':
setLlmResponseLatency(latency);
break;
case 'llmt':
setLlmTokenLatency(latency);
break;
case 'tts':
setTtsLatency(latency);
break;
}
};
manager.onError = () => {
manager.stop();
Expand Down

0 comments on commit f5d75ac

Please sign in to comment.