Skip to content

Commit

Permalink
Update index.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani authored Dec 14, 2024
1 parent fb80824 commit 7119b3b
Showing 1 changed file with 39 additions and 36 deletions.
75 changes: 39 additions & 36 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ app.post("/", async (c) => {
);

try {
const newquery = await streamToString(copilotLLMResponse.body);
const newquery = await processLLMStream(copilotLLMResponse.body);
console.log("LLM:", newquery);
} catch (error) {
console.error('Error converting stream to string:', error);
return c.text('Error processing LLM stream.');
return c.text('Error processing LLM stream:', error);
}

// Check if the message contains a SQL query
Expand Down Expand Up @@ -196,45 +196,48 @@ app.post("/", async (c) => {
});

/**
* Utility function to convert a ReadableStream to a string
* Handles both Web Streams API and Node.js streams
* Processes a stream of Server-Sent Events from the Copilot LLM API
* and accumulates the content into a single string
*/
async function streamToString(stream: ReadableStream | NodeJS.ReadableStream): Promise<string> {
// Handle Web Streams API ReadableStream
if (stream instanceof ReadableStream) {
const reader = stream.getReader();
const chunks: Uint8Array[] = [];
async function processLLMStream(stream: ReadableStream): Promise<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let accumulator = '';

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

// Decode the chunk
const chunk = decoder.decode(value);

// Split into individual SSE events
const events = chunk
.split('\n')
.filter(line => line.startsWith('data: '))
.map(line => line.slice(5).trim());

// Process each event
for (const event of events) {
if (event === '[DONE]') continue;

try {
const parsed = JSON.parse(event);
if (parsed.choices?.[0]?.delta?.content) {
accumulator += parsed.choices[0].delta.content;
}
} catch (e) {
// Skip invalid JSON
continue;
}
}
} finally {
reader.releaseLock();
}

// Concatenate chunks and decode
const concatenated = new Uint8Array(chunks.reduce((acc, chunk) => acc + chunk.length, 0));
let offset = 0;
for (const chunk of chunks) {
concatenated.set(chunk, offset);
offset += chunk.length;
}
return new TextDecoder().decode(concatenated);
} finally {
reader.releaseLock();
}

// Handle Node.js Readable streams
if (typeof stream.on === 'function') {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on('data', (chunk: Buffer) => chunks.push(Buffer.from(chunk)));
stream.on('error', (err) => reject(err));
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')));
});
}
throw new Error('Unsupported stream type');

return accumulator.trim();
}

const port = 3000;
Expand Down

0 comments on commit 7119b3b

Please sign in to comment.