From 7119b3b057bd8af2a874cf4a794613a12c42aecc Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sat, 14 Dec 2024 16:05:55 +0100 Subject: [PATCH] Update index.ts --- src/index.ts | 75 +++++++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/src/index.ts b/src/index.ts index c6b635b..71eefa2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -144,11 +144,11 @@ app.post("/", async (c) => { ); try { - const newquery = await streamToString(copilotLLMResponse.body); + const newquery = await processLLMStream(copilotLLMResponse.body); console.log("LLM:", newquery); } catch (error) { console.error('Error converting stream to string:', error); - return c.text('Error processing LLM stream.'); + return c.text('Error processing LLM stream:', error); } // Check if the message contains a SQL query @@ -196,45 +196,48 @@ app.post("/", async (c) => { }); /** - * Utility function to convert a ReadableStream to a string - * Handles both Web Streams API and Node.js streams + * Processes a stream of Server-Sent Events from the Copilot LLM API + * and accumulates the content into a single string */ -async function streamToString(stream: ReadableStream | NodeJS.ReadableStream): Promise { - // Handle Web Streams API ReadableStream - if (stream instanceof ReadableStream) { - const reader = stream.getReader(); - const chunks: Uint8Array[] = []; +async function processLLMStream(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let accumulator = ''; - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - chunks.push(value); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + // Decode the chunk + const chunk = decoder.decode(value); + + // Split into individual SSE events + const events = chunk + .split('\n') + .filter(line => line.startsWith('data: ')) + .map(line => line.slice(5).trim()); + + // Process each event + for (const event of events) { + if (event === '[DONE]') continue; + + try { + const parsed = JSON.parse(event); + if (parsed.choices?.[0]?.delta?.content) { + accumulator += parsed.choices[0].delta.content; + } + } catch (e) { + // Skip invalid JSON + continue; + } } - } finally { - reader.releaseLock(); - } - - // Concatenate chunks and decode - const concatenated = new Uint8Array(chunks.reduce((acc, chunk) => acc + chunk.length, 0)); - let offset = 0; - for (const chunk of chunks) { - concatenated.set(chunk, offset); - offset += chunk.length; } - return new TextDecoder().decode(concatenated); + } finally { + reader.releaseLock(); } - - // Handle Node.js Readable streams - if (typeof stream.on === 'function') { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - stream.on('data', (chunk: Buffer) => chunks.push(Buffer.from(chunk))); - stream.on('error', (err) => reject(err)); - stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); - }); - } - throw new Error('Unsupported stream type'); + + return accumulator.trim(); } const port = 3000;