Skip to content

Commit

Permalink
bugfix: support \r and \r\n line separators in Dart sse client (#93)
Browse files Browse the repository at this point in the history
* get dart sse to handle \r and \r\n

* fix swift issue

* change swift version
  • Loading branch information
joshmossas authored Aug 24, 2024
1 parent 2fcdcbe commit 3a744d7
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 58 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ jobs:
with:
version: 9.6.0
- name: Setup Swift
uses: swift-actions/setup-swift@v2
# uses: swift-actions/setup-swift@v2
uses: redsun82/setup-swift@b2b6f77ab14f6a9b136b520dc53ec8eca27d2b992 # temporary work around for https://github.com/swift-actions/setup-swift/issues/591
with:
swift-version: "5.10.1"
swift-version: "5.9"
- name: Install Node.js
uses: actions/setup-node@v4
with:
Expand Down Expand Up @@ -100,9 +101,10 @@ jobs:
with:
version: 9.6.0
- name: Setup Swift
uses: swift-actions/setup-swift@v2
# uses: swift-actions/setup-swift@v2
uses: redsun82/setup-swift@b2b6f77ab14f6a9b136b520dc53ec8eca27d2b99 # temporary work around for https://github.com/swift-actions/setup-swift/issues/591
with:
swift-version: "5.10.1"
swift-version: "5.9"
- name: Install Node.js
uses: actions/setup-node@v4
with:
Expand Down
181 changes: 127 additions & 54 deletions languages/dart/dart-client/lib/sse.dart
Original file line number Diff line number Diff line change
Expand Up @@ -284,107 +284,180 @@ ParseSseEventsResult<T> parseSseEvents<T>(
String input,
T Function(String) dataParser,
) {
final stringParts = input.split("\n\n");
final endingPart = stringParts.removeLast();
final result = <SseEvent<T>>[];
for (final part in stringParts) {
final event = parseSseEvent(part);
if (event == null) {
print("WARN: Invalid message data $event");
continue;
final List<SseEvent<T>> events = [];
String? id;
String? event;
String? data;
int? retry;
final splitter = LineSplitter();
final lines = splitter.convert(input);
int? lastIndex = 0;
for (var i = 0; i < lines.length; i++) {
final line = lines[i];
final isEnd = line == "";
if (line.isNotEmpty) {
final lineResult = SseLineResult.fromString(line);
switch (lineResult.type) {
case SseLineResultType.id:
id = lineResult.value;
break;
case SseLineResultType.event:
event = lineResult.value;
break;
case SseLineResultType.data:
data = lineResult.value;
break;
case SseLineResultType.retry:
retry = int.tryParse(lineResult.value);
break;
case SseLineResultType.none:
break;
}
}
if (isEnd) {
if (data != null) {
events.add(
SseEvent.fromSseRawEvent(
SseRawEvent(
id: id,
event: event ?? "message",
data: data,
retry: retry,
),
dataParser,
),
);
}
id = null;
event = null;
data = null;
retry = null;
lastIndex = i + 1 < lines.length ? i + 1 : null;
}
result.add(SseEvent<T>.fromString(part, dataParser));
}

return ParseSseEventsResult(
events: result,
leftoverData: endingPart,
events: events,
leftoverData: lastIndex != null
? lines.getRange(lastIndex, lines.length).join("\n")
: "",
);
}

SseRawEvent<TData>? parseSseEvent<TData>(String input) {
try {
return SseRawEvent.fromString(input);
} catch (_) {
return null;
class SseLineResult {
final SseLineResultType type;
final String value;
const SseLineResult({required this.type, required this.value});
factory SseLineResult.fromString(String input) {
if (input.startsWith("id:")) {
return SseLineResult(
type: SseLineResultType.id,
value: input.substring(3).trim(),
);
}
if (input.startsWith("event:")) {
return SseLineResult(
type: SseLineResultType.event,
value: input.substring(6).trim(),
);
}
if (input.startsWith("data:")) {
return SseLineResult(
type: SseLineResultType.data,
value: input.substring(5).trim(),
);
}
if (input.startsWith("retry:")) {
return SseLineResult(
type: SseLineResultType.retry,
value: input.substring(6).trim(),
);
}
return SseLineResult(
type: SseLineResultType.none,
value: "",
);
}
}

enum SseLineResultType {
id,
event,
data,
retry,
none;
}

sealed class SseEvent<TData> {
final String? id;
final String? event;
final String event;
final int? retry;
const SseEvent({
this.id,
this.event,
required this.event,
this.retry,
});

factory SseEvent.fromString(
String input,
factory SseEvent.fromSseRawEvent(
SseRawEvent<TData> event,
TData Function(String) parser,
) {
final sse = SseRawEvent<TData>.fromString(input);
try {
switch (sse.event) {
case "message":
return SseMessageEvent.fromRawSseEvent(sse, parser);
switch (event.event) {
case "done":
return SseDoneEvent.fromRawSseEvent(sse);
return SseDoneEvent.fromRawSseEvent(event);
case "message":
return SseMessageEvent.fromRawSseEvent(event, parser);
default:
return SseMessageEvent.fromRawSseEvent(sse, parser);
return event;
}
} catch (err) {
return sse;
return event;
}
}
}

class SseRawEvent<TData> extends SseEvent<TData> {
final String data;
const SseRawEvent({super.id, super.event, required this.data});

factory SseRawEvent.fromString(String input) {
final lines = input.split("\n");
String? id;
String? event;
String data = '';
for (final line in lines) {
if (line.startsWith("id:")) {
id = line.replaceFirst("id:", "").trim();
continue;
}
if (line.startsWith("event:")) {
event = line.replaceFirst("event:", "").trim();
continue;
}
if (line.startsWith("data:")) {
data = line.replaceFirst("data:", "").trim();
continue;
}
}
return SseRawEvent(id: id, event: event, data: data);
}
const SseRawEvent({
super.id,
required super.event,
required this.data,
super.retry,
});
}

class SseMessageEvent<TData> extends SseEvent<TData> {
final TData data;
const SseMessageEvent({super.id, super.event, required this.data});

const SseMessageEvent({
super.id,
required this.data,
super.retry,
}) : super(event: "message");

factory SseMessageEvent.fromRawSseEvent(
SseRawEvent event, TData Function(String) parser) {
return SseMessageEvent(
id: event.id,
event: "message",
data: parser(event.data),
retry: event.retry,
);
}
}

class SseDoneEvent<TData> extends SseEvent<TData> {
final String data = "";
const SseDoneEvent({super.id, super.event});

const SseDoneEvent({
super.id,
super.retry,
}) : super(event: "done");

factory SseDoneEvent.fromRawSseEvent(SseRawEvent event) {
return SseDoneEvent(
id: event.id,
event: "done",
retry: event.retry,
);
}
}
81 changes: 81 additions & 0 deletions languages/dart/dart-client/test/arri_client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,87 @@ main() {
expect(closeCount, equals(1));
});

test("SSE Line Result", () {
final idInputs = ["id:1", "id: 1"];
for (final input in idInputs) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.id));
expect(result.value, equals("1"));
}
final eventInputs = ["event:foo", "event: foo"];
for (final input in eventInputs) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.event));
expect(result.value, "foo");
}
final dataResults = ["data:foo", "data: foo"];
for (final input in dataResults) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.data));
expect(result.value, "foo");
}
final retryResults = ["retry:150", "retry: 150"];
for (final input in retryResults) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.retry));
expect(result.value, '150');
}
});

group("Parsing SSE Messages", () {
group("Standard SSE messages", () {
final standardSsePayload = [
"id: 1",
"data: hello world",
"",
"id: 2",
"data: hello world",
"",
""
];
test("with \\n separator", () {
final result = parseSseEvents(standardSsePayload.join("\n"), (input) {
return input;
});
expect(result.events.length, equals(2));
expect(result.events[0].id, equals("1"));
expect(result.events[1].id, equals("2"));
expect(
result.events.every((el) =>
el is SseMessageEvent<String> && el.data == "hello world"),
equals(true),
);
expect(result.leftoverData, equals(""));
});
test("with \r\n separator", () {
final result =
parseSseEvents(standardSsePayload.join("\r\n"), (input) => input);
expect(result.events.length, equals(2));
expect(result.events[0].id, equals("1"));
expect(result.events[1].id, equals("2"));
expect(
result.events.every((el) =>
el is SseMessageEvent<String> && el.data == "hello world"),
equals(true),
);
expect(result.leftoverData, equals(""));
});
test("with \r separator", () {
final result =
parseSseEvents(standardSsePayload.join("\r"), (input) => input);
expect(result.events.length, equals(2));
expect(result.events[0].id, equals("1"));
expect(result.events[1].id, equals("2"));
expect(
result.events.every((el) =>
el is SseMessageEvent<String> && el.data == "hello world"),
equals(true),
);
expect(result.leftoverData, equals(""));
});
});
});

test("parsing sse messages", () {
final streamedTxt = """id: 1
data: hello world
Expand Down

0 comments on commit 3a744d7

Please sign in to comment.