diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7b30e45d..6f94ff02 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -59,9 +59,10 @@ jobs: with: version: 9.6.0 - name: Setup Swift - uses: swift-actions/setup-swift@v2 + # uses: swift-actions/setup-swift@v2 + uses: redsun82/setup-swift@b2b6f77ab14f6a9b136b520dc53ec8eca27d2b992 # temporary work around for https://github.com/swift-actions/setup-swift/issues/591 with: - swift-version: "5.10.1" + swift-version: "5.9" - name: Install Node.js uses: actions/setup-node@v4 with: @@ -100,9 +101,10 @@ jobs: with: version: 9.6.0 - name: Setup Swift - uses: swift-actions/setup-swift@v2 + # uses: swift-actions/setup-swift@v2 + uses: redsun82/setup-swift@b2b6f77ab14f6a9b136b520dc53ec8eca27d2b99 # temporary work around for https://github.com/swift-actions/setup-swift/issues/591 with: - swift-version: "5.10.1" + swift-version: "5.9" - name: Install Node.js uses: actions/setup-node@v4 with: diff --git a/languages/dart/dart-client/lib/sse.dart b/languages/dart/dart-client/lib/sse.dart index 7e67ca54..7acc2d24 100644 --- a/languages/dart/dart-client/lib/sse.dart +++ b/languages/dart/dart-client/lib/sse.dart @@ -284,107 +284,180 @@ ParseSseEventsResult parseSseEvents( String input, T Function(String) dataParser, ) { - final stringParts = input.split("\n\n"); - final endingPart = stringParts.removeLast(); - final result = >[]; - for (final part in stringParts) { - final event = parseSseEvent(part); - if (event == null) { - print("WARN: Invalid message data $event"); - continue; + final List> events = []; + String? id; + String? event; + String? data; + int? retry; + final splitter = LineSplitter(); + final lines = splitter.convert(input); + int? lastIndex = 0; + for (var i = 0; i < lines.length; i++) { + final line = lines[i]; + final isEnd = line == ""; + if (line.isNotEmpty) { + final lineResult = SseLineResult.fromString(line); + switch (lineResult.type) { + case SseLineResultType.id: + id = lineResult.value; + break; + case SseLineResultType.event: + event = lineResult.value; + break; + case SseLineResultType.data: + data = lineResult.value; + break; + case SseLineResultType.retry: + retry = int.tryParse(lineResult.value); + break; + case SseLineResultType.none: + break; + } + } + if (isEnd) { + if (data != null) { + events.add( + SseEvent.fromSseRawEvent( + SseRawEvent( + id: id, + event: event ?? "message", + data: data, + retry: retry, + ), + dataParser, + ), + ); + } + id = null; + event = null; + data = null; + retry = null; + lastIndex = i + 1 < lines.length ? i + 1 : null; } - result.add(SseEvent.fromString(part, dataParser)); } + return ParseSseEventsResult( - events: result, - leftoverData: endingPart, + events: events, + leftoverData: lastIndex != null + ? lines.getRange(lastIndex, lines.length).join("\n") + : "", ); } -SseRawEvent? parseSseEvent(String input) { - try { - return SseRawEvent.fromString(input); - } catch (_) { - return null; +class SseLineResult { + final SseLineResultType type; + final String value; + const SseLineResult({required this.type, required this.value}); + factory SseLineResult.fromString(String input) { + if (input.startsWith("id:")) { + return SseLineResult( + type: SseLineResultType.id, + value: input.substring(3).trim(), + ); + } + if (input.startsWith("event:")) { + return SseLineResult( + type: SseLineResultType.event, + value: input.substring(6).trim(), + ); + } + if (input.startsWith("data:")) { + return SseLineResult( + type: SseLineResultType.data, + value: input.substring(5).trim(), + ); + } + if (input.startsWith("retry:")) { + return SseLineResult( + type: SseLineResultType.retry, + value: input.substring(6).trim(), + ); + } + return SseLineResult( + type: SseLineResultType.none, + value: "", + ); } } +enum SseLineResultType { + id, + event, + data, + retry, + none; +} + sealed class SseEvent { final String? id; - final String? event; + final String event; + final int? retry; const SseEvent({ this.id, - this.event, + required this.event, + this.retry, }); - factory SseEvent.fromString( - String input, + factory SseEvent.fromSseRawEvent( + SseRawEvent event, TData Function(String) parser, ) { - final sse = SseRawEvent.fromString(input); try { - switch (sse.event) { - case "message": - return SseMessageEvent.fromRawSseEvent(sse, parser); + switch (event.event) { case "done": - return SseDoneEvent.fromRawSseEvent(sse); + return SseDoneEvent.fromRawSseEvent(event); + case "message": + return SseMessageEvent.fromRawSseEvent(event, parser); default: - return SseMessageEvent.fromRawSseEvent(sse, parser); + return event; } } catch (err) { - return sse; + return event; } } } class SseRawEvent extends SseEvent { final String data; - const SseRawEvent({super.id, super.event, required this.data}); - - factory SseRawEvent.fromString(String input) { - final lines = input.split("\n"); - String? id; - String? event; - String data = ''; - for (final line in lines) { - if (line.startsWith("id:")) { - id = line.replaceFirst("id:", "").trim(); - continue; - } - if (line.startsWith("event:")) { - event = line.replaceFirst("event:", "").trim(); - continue; - } - if (line.startsWith("data:")) { - data = line.replaceFirst("data:", "").trim(); - continue; - } - } - return SseRawEvent(id: id, event: event, data: data); - } + const SseRawEvent({ + super.id, + required super.event, + required this.data, + super.retry, + }); } class SseMessageEvent extends SseEvent { final TData data; - const SseMessageEvent({super.id, super.event, required this.data}); + + const SseMessageEvent({ + super.id, + required this.data, + super.retry, + }) : super(event: "message"); factory SseMessageEvent.fromRawSseEvent( SseRawEvent event, TData Function(String) parser) { return SseMessageEvent( id: event.id, - event: "message", data: parser(event.data), + retry: event.retry, ); } } class SseDoneEvent extends SseEvent { final String data = ""; - const SseDoneEvent({super.id, super.event}); + + const SseDoneEvent({ + super.id, + super.retry, + }) : super(event: "done"); + factory SseDoneEvent.fromRawSseEvent(SseRawEvent event) { return SseDoneEvent( id: event.id, - event: "done", + retry: event.retry, ); } } diff --git a/languages/dart/dart-client/test/arri_client_test.dart b/languages/dart/dart-client/test/arri_client_test.dart index 9168649a..7419b957 100644 --- a/languages/dart/dart-client/test/arri_client_test.dart +++ b/languages/dart/dart-client/test/arri_client_test.dart @@ -34,6 +34,87 @@ main() { expect(closeCount, equals(1)); }); + test("SSE Line Result", () { + final idInputs = ["id:1", "id: 1"]; + for (final input in idInputs) { + final result = SseLineResult.fromString(input); + expect(result.type, equals(SseLineResultType.id)); + expect(result.value, equals("1")); + } + final eventInputs = ["event:foo", "event: foo"]; + for (final input in eventInputs) { + final result = SseLineResult.fromString(input); + expect(result.type, equals(SseLineResultType.event)); + expect(result.value, "foo"); + } + final dataResults = ["data:foo", "data: foo"]; + for (final input in dataResults) { + final result = SseLineResult.fromString(input); + expect(result.type, equals(SseLineResultType.data)); + expect(result.value, "foo"); + } + final retryResults = ["retry:150", "retry: 150"]; + for (final input in retryResults) { + final result = SseLineResult.fromString(input); + expect(result.type, equals(SseLineResultType.retry)); + expect(result.value, '150'); + } + }); + + group("Parsing SSE Messages", () { + group("Standard SSE messages", () { + final standardSsePayload = [ + "id: 1", + "data: hello world", + "", + "id: 2", + "data: hello world", + "", + "" + ]; + test("with \\n separator", () { + final result = parseSseEvents(standardSsePayload.join("\n"), (input) { + return input; + }); + expect(result.events.length, equals(2)); + expect(result.events[0].id, equals("1")); + expect(result.events[1].id, equals("2")); + expect( + result.events.every((el) => + el is SseMessageEvent && el.data == "hello world"), + equals(true), + ); + expect(result.leftoverData, equals("")); + }); + test("with \r\n separator", () { + final result = + parseSseEvents(standardSsePayload.join("\r\n"), (input) => input); + expect(result.events.length, equals(2)); + expect(result.events[0].id, equals("1")); + expect(result.events[1].id, equals("2")); + expect( + result.events.every((el) => + el is SseMessageEvent && el.data == "hello world"), + equals(true), + ); + expect(result.leftoverData, equals("")); + }); + test("with \r separator", () { + final result = + parseSseEvents(standardSsePayload.join("\r"), (input) => input); + expect(result.events.length, equals(2)); + expect(result.events[0].id, equals("1")); + expect(result.events[1].id, equals("2")); + expect( + result.events.every((el) => + el is SseMessageEvent && el.data == "hello world"), + equals(true), + ); + expect(result.leftoverData, equals("")); + }); + }); + }); + test("parsing sse messages", () { final streamedTxt = """id: 1 data: hello world