diff --git a/src/server/event-stream.test.ts b/src/server/event-stream.test.ts index 22811a0..8d78b2a 100644 --- a/src/server/event-stream.test.ts +++ b/src/server/event-stream.test.ts @@ -28,7 +28,7 @@ describe(eventStream.name, () => { test("can send data to the client with the send function", async () => { let controller = new AbortController(); let response = eventStream(controller.signal, (send, _) => { - send({ data: "hello" }); + send({ data: "hello\nworld" }); // biome-ignore lint/suspicious/noEmptyBlockStatements: Test return () => {}; }); @@ -39,11 +39,16 @@ describe(eventStream.name, () => { let reader = response.body.getReader(); + let encoder = new TextEncoder(); + let { value: event } = await reader.read(); - expect(event).toEqual(new TextEncoder().encode("event: message\n")); + expect(event).toEqual(encoder.encode("event: message\n")); + + let { value: line1 } = await reader.read(); + expect(line1).toEqual(encoder.encode("data: hello\n")); - let { value: data } = await reader.read(); - expect(data).toEqual(new TextEncoder().encode("data: hello\n\n")); + let { value: line2 } = await reader.read(); + expect(line2).toEqual(encoder.encode("data: world\n\n")); let { done } = await reader.read(); expect(done).toBe(true); diff --git a/src/server/event-stream.ts b/src/server/event-stream.ts index 75ef490..7cce44e 100644 --- a/src/server/event-stream.ts +++ b/src/server/event-stream.ts @@ -31,19 +31,24 @@ export function eventStream( let stream = new ReadableStream({ start(controller) { let encoder = new TextEncoder(); + let closed = false; function send({ event = "message", data }: SendFunctionArgs) { + if (closed) return; // If already closed, not enqueue anything controller.enqueue(encoder.encode(`event: ${event}\n`)); - data.split("\n").forEach((line) => { - controller.enqueue(encoder.encode(`data: ${line}\n`)); - }) - controller.enqueue(encoder.encode("\n")); + + if (closed) return; // If already closed, not enqueue anything + + data.split("\n").forEach((line, index, array) => { + if (closed) return; // If already closed, not enqueue anything + let value = `data: ${line}\n`; + if (index === array.length - 1) value += "\n"; + controller.enqueue(encoder.encode(value)); + }); } let cleanup = init(send, close); - let closed = false; - function close() { if (closed) return; cleanup();