Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: support \r and \r\n line separators in Dart sse client #93

Merged
merged 3 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ jobs:
with:
version: 9.6.0
- name: Setup Swift
uses: swift-actions/setup-swift@v2
# uses: swift-actions/setup-swift@v2
uses: redsun82/setup-swift@b2b6f77ab14f6a9b136b520dc53ec8eca27d2b992 # temporary work around for https://github.com/swift-actions/setup-swift/issues/591
with:
swift-version: "5.10.1"
swift-version: "5.9"
- name: Install Node.js
uses: actions/setup-node@v4
with:
Expand Down Expand Up @@ -100,9 +101,10 @@ jobs:
with:
version: 9.6.0
- name: Setup Swift
uses: swift-actions/setup-swift@v2
# uses: swift-actions/setup-swift@v2
uses: redsun82/setup-swift@b2b6f77ab14f6a9b136b520dc53ec8eca27d2b99 # temporary work around for https://github.com/swift-actions/setup-swift/issues/591
with:
swift-version: "5.10.1"
swift-version: "5.9"
- name: Install Node.js
uses: actions/setup-node@v4
with:
Expand Down
181 changes: 127 additions & 54 deletions languages/dart/dart-client/lib/sse.dart
Original file line number Diff line number Diff line change
Expand Up @@ -284,107 +284,180 @@ ParseSseEventsResult<T> parseSseEvents<T>(
String input,
T Function(String) dataParser,
) {
final stringParts = input.split("\n\n");
final endingPart = stringParts.removeLast();
final result = <SseEvent<T>>[];
for (final part in stringParts) {
final event = parseSseEvent(part);
if (event == null) {
print("WARN: Invalid message data $event");
continue;
final List<SseEvent<T>> events = [];
String? id;
String? event;
String? data;
int? retry;
final splitter = LineSplitter();
final lines = splitter.convert(input);
int? lastIndex = 0;
for (var i = 0; i < lines.length; i++) {
final line = lines[i];
final isEnd = line == "";
if (line.isNotEmpty) {
final lineResult = SseLineResult.fromString(line);
switch (lineResult.type) {
case SseLineResultType.id:
id = lineResult.value;
break;
case SseLineResultType.event:
event = lineResult.value;
break;
case SseLineResultType.data:
data = lineResult.value;
break;
case SseLineResultType.retry:
retry = int.tryParse(lineResult.value);
break;
case SseLineResultType.none:
break;
}
}
if (isEnd) {
if (data != null) {
events.add(
SseEvent.fromSseRawEvent(
SseRawEvent(
id: id,
event: event ?? "message",
data: data,
retry: retry,
),
dataParser,
),
);
}
id = null;
event = null;
data = null;
retry = null;
lastIndex = i + 1 < lines.length ? i + 1 : null;
}
result.add(SseEvent<T>.fromString(part, dataParser));
}

return ParseSseEventsResult(
events: result,
leftoverData: endingPart,
events: events,
leftoverData: lastIndex != null
? lines.getRange(lastIndex, lines.length).join("\n")
: "",
);
}

SseRawEvent<TData>? parseSseEvent<TData>(String input) {
try {
return SseRawEvent.fromString(input);
} catch (_) {
return null;
class SseLineResult {
final SseLineResultType type;
final String value;
const SseLineResult({required this.type, required this.value});
factory SseLineResult.fromString(String input) {
if (input.startsWith("id:")) {
return SseLineResult(
type: SseLineResultType.id,
value: input.substring(3).trim(),
);
}
if (input.startsWith("event:")) {
return SseLineResult(
type: SseLineResultType.event,
value: input.substring(6).trim(),
);
}
if (input.startsWith("data:")) {
return SseLineResult(
type: SseLineResultType.data,
value: input.substring(5).trim(),
);
}
if (input.startsWith("retry:")) {
return SseLineResult(
type: SseLineResultType.retry,
value: input.substring(6).trim(),
);
}
return SseLineResult(
type: SseLineResultType.none,
value: "",
);
}
}

enum SseLineResultType {
id,
event,
data,
retry,
none;
}

sealed class SseEvent<TData> {
final String? id;
final String? event;
final String event;
final int? retry;
const SseEvent({
this.id,
this.event,
required this.event,
this.retry,
});

factory SseEvent.fromString(
String input,
factory SseEvent.fromSseRawEvent(
SseRawEvent<TData> event,
TData Function(String) parser,
) {
final sse = SseRawEvent<TData>.fromString(input);
try {
switch (sse.event) {
case "message":
return SseMessageEvent.fromRawSseEvent(sse, parser);
switch (event.event) {
case "done":
return SseDoneEvent.fromRawSseEvent(sse);
return SseDoneEvent.fromRawSseEvent(event);
case "message":
return SseMessageEvent.fromRawSseEvent(event, parser);
default:
return SseMessageEvent.fromRawSseEvent(sse, parser);
return event;
}
} catch (err) {
return sse;
return event;
}
}
}

class SseRawEvent<TData> extends SseEvent<TData> {
final String data;
const SseRawEvent({super.id, super.event, required this.data});

factory SseRawEvent.fromString(String input) {
final lines = input.split("\n");
String? id;
String? event;
String data = '';
for (final line in lines) {
if (line.startsWith("id:")) {
id = line.replaceFirst("id:", "").trim();
continue;
}
if (line.startsWith("event:")) {
event = line.replaceFirst("event:", "").trim();
continue;
}
if (line.startsWith("data:")) {
data = line.replaceFirst("data:", "").trim();
continue;
}
}
return SseRawEvent(id: id, event: event, data: data);
}
const SseRawEvent({
super.id,
required super.event,
required this.data,
super.retry,
});
}

class SseMessageEvent<TData> extends SseEvent<TData> {
final TData data;
const SseMessageEvent({super.id, super.event, required this.data});

const SseMessageEvent({
super.id,
required this.data,
super.retry,
}) : super(event: "message");

factory SseMessageEvent.fromRawSseEvent(
SseRawEvent event, TData Function(String) parser) {
return SseMessageEvent(
id: event.id,
event: "message",
data: parser(event.data),
retry: event.retry,
);
}
}

class SseDoneEvent<TData> extends SseEvent<TData> {
final String data = "";
const SseDoneEvent({super.id, super.event});

const SseDoneEvent({
super.id,
super.retry,
}) : super(event: "done");

factory SseDoneEvent.fromRawSseEvent(SseRawEvent event) {
return SseDoneEvent(
id: event.id,
event: "done",
retry: event.retry,
);
}
}
81 changes: 81 additions & 0 deletions languages/dart/dart-client/test/arri_client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,87 @@ main() {
expect(closeCount, equals(1));
});

test("SSE Line Result", () {
final idInputs = ["id:1", "id: 1"];
for (final input in idInputs) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.id));
expect(result.value, equals("1"));
}
final eventInputs = ["event:foo", "event: foo"];
for (final input in eventInputs) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.event));
expect(result.value, "foo");
}
final dataResults = ["data:foo", "data: foo"];
for (final input in dataResults) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.data));
expect(result.value, "foo");
}
final retryResults = ["retry:150", "retry: 150"];
for (final input in retryResults) {
final result = SseLineResult.fromString(input);
expect(result.type, equals(SseLineResultType.retry));
expect(result.value, '150');
}
});

group("Parsing SSE Messages", () {
group("Standard SSE messages", () {
final standardSsePayload = [
"id: 1",
"data: hello world",
"",
"id: 2",
"data: hello world",
"",
""
];
test("with \\n separator", () {
final result = parseSseEvents(standardSsePayload.join("\n"), (input) {
return input;
});
expect(result.events.length, equals(2));
expect(result.events[0].id, equals("1"));
expect(result.events[1].id, equals("2"));
expect(
result.events.every((el) =>
el is SseMessageEvent<String> && el.data == "hello world"),
equals(true),
);
expect(result.leftoverData, equals(""));
});
test("with \r\n separator", () {
final result =
parseSseEvents(standardSsePayload.join("\r\n"), (input) => input);
expect(result.events.length, equals(2));
expect(result.events[0].id, equals("1"));
expect(result.events[1].id, equals("2"));
expect(
result.events.every((el) =>
el is SseMessageEvent<String> && el.data == "hello world"),
equals(true),
);
expect(result.leftoverData, equals(""));
});
test("with \r separator", () {
final result =
parseSseEvents(standardSsePayload.join("\r"), (input) => input);
expect(result.events.length, equals(2));
expect(result.events[0].id, equals("1"));
expect(result.events[1].id, equals("2"));
expect(
result.events.every((el) =>
el is SseMessageEvent<String> && el.data == "hello world"),
equals(true),
);
expect(result.leftoverData, equals(""));
});
});
});

test("parsing sse messages", () {
final streamedTxt = """id: 1
data: hello world
Expand Down
Loading