Skip to content

Commit

Permalink
Enhance eventStream to handle multi-line data and improve stream clos…
Browse files Browse the repository at this point in the history
…ure logic in tests
  • Loading branch information
sergiodxa committed Dec 12, 2024
1 parent 559b7d6 commit 28ba9ba
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
13 changes: 9 additions & 4 deletions src/server/event-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe(eventStream.name, () => {
test("can send data to the client with the send function", async () => {
let controller = new AbortController();
let response = eventStream(controller.signal, (send, _) => {
send({ data: "hello" });
send({ data: "hello\nworld" });
// biome-ignore lint/suspicious/noEmptyBlockStatements: Test
return () => {};
});
Expand All @@ -39,11 +39,16 @@ describe(eventStream.name, () => {

let reader = response.body.getReader();

let encoder = new TextEncoder();

let { value: event } = await reader.read();
expect(event).toEqual(new TextEncoder().encode("event: message\n"));
expect(event).toEqual(encoder.encode("event: message\n"));

let { value: line1 } = await reader.read();
expect(line1).toEqual(encoder.encode("data: hello\n"));

let { value: data } = await reader.read();
expect(data).toEqual(new TextEncoder().encode("data: hello\n\n"));
let { value: line2 } = await reader.read();
expect(line2).toEqual(encoder.encode("data: world\n\n"));

let { done } = await reader.read();
expect(done).toBe(true);
Expand Down
17 changes: 11 additions & 6 deletions src/server/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,24 @@ export function eventStream(
let stream = new ReadableStream({
start(controller) {
let encoder = new TextEncoder();
let closed = false;

function send({ event = "message", data }: SendFunctionArgs) {
if (closed) return; // If already closed, not enqueue anything
controller.enqueue(encoder.encode(`event: ${event}\n`));
data.split("\n").forEach((line) => {
controller.enqueue(encoder.encode(`data: ${line}\n`));
})
controller.enqueue(encoder.encode("\n"));

if (closed) return; // If already closed, not enqueue anything

data.split("\n").forEach((line, index, array) => {
if (closed) return; // If already closed, not enqueue anything
let value = `data: ${line}\n`;
if (index === array.length - 1) value += "\n";
controller.enqueue(encoder.encode(value));
});
}

let cleanup = init(send, close);

let closed = false;

function close() {
if (closed) return;
cleanup();
Expand Down

0 comments on commit 28ba9ba

Please sign in to comment.