Skip to content

Commit

Permalink
fix: support multiple websocket connections
Browse files Browse the repository at this point in the history
  • Loading branch information
d1snin committed Sep 2, 2023
1 parent cdc7a82 commit d2d1dec
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dev.d1s.ktor.events.server
import io.ktor.server.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.launch
import org.lighthousegames.logging.logging
Expand Down Expand Up @@ -46,15 +47,7 @@ internal class DefaultWebSocketEventConsumer : WebSocketEventConsumer {
}

eventReceivingScope.launch {
for (event in channel) {
log.d {
"Consumed event $event"
}

val connection = connectionPool[event.reference]

connection.sendEvent(event)
}
handleEvents(eventReceivingScope, channel)
}
}

Expand All @@ -66,27 +59,61 @@ internal class DefaultWebSocketEventConsumer : WebSocketEventConsumer {
connectionPool -= connection
}

@OptIn(DelicateCoroutinesApi::class)
private suspend fun WebSocketEventSendingConnection?.sendEvent(event: ServerWebSocketEvent) {
private suspend fun handleEvents(eventReceivingScope: CoroutineScope, channel: WebSocketEventReceiver) {
for (event in channel) {
log.d {
"Consumed event $event"
}

processEvent(eventReceivingScope, event)
}
}

private fun processEvent(eventReceivingScope: CoroutineScope, event: ServerWebSocketEvent) {
val connections = connectionPool[event.reference]

connections.parallelStream().forEach { connection ->
eventReceivingScope.launch {
connection.sendEvent(event)
}
}
}

private suspend fun WebSocketEventSendingConnection.sendEvent(event: ServerWebSocketEvent) {
log.d {
"Sending event... Connection: $this"
}

(this?.session as? WebSocketServerSession)?.let { session ->
if (!session.outgoing.isClosedForSend) {
val dto = WebSocketEventDto(
reference = reference,
data = event.dataSupplier(reference.parameters)
)

session.sendSerialized(dto)
} else {
log.d {
"Couldn't send event. Connection is closed."
}
(session as? WebSocketServerSession)?.let { session ->
processSession(session, event)
}
}

connectionPool -= reference
@OptIn(DelicateCoroutinesApi::class)
private suspend fun WebSocketEventSendingConnection.processSession(
session: WebSocketServerSession,
event: ServerWebSocketEvent
) {
if (!session.outgoing.isClosedForSend) {
sendEventDto(session, event)
} else {
log.d {
"Couldn't send event. Connection is closed."
}

connectionPool -= reference
}
}

private suspend fun WebSocketEventSendingConnection.sendEventDto(
session: WebSocketServerSession,
event: ServerWebSocketEvent
) {
val dto = WebSocketEventDto(
reference = reference,
data = event.dataSupplier(reference.parameters)
)

session.sendSerialized(dto)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal interface WebSocketEventSendingConnectionPool {

operator fun minusAssign(connection: WebSocketEventSendingConnection)

operator fun get(reference: EventReference): WebSocketEventSendingConnection?
operator fun get(reference: EventReference): List<WebSocketEventSendingConnection>

operator fun minusAssign(reference: EventReference)
}
Expand Down Expand Up @@ -54,24 +54,26 @@ internal class DefaultWebSocketEventSendingConnectionPool : WebSocketEventSendin
}

override fun minusAssign(reference: EventReference) {
this[reference]?.let {
this -= it
val connections = this[reference]

connections.forEach { connection ->
this -= connection
}
}

override fun get(reference: EventReference): WebSocketEventSendingConnection? {
override fun get(reference: EventReference): List<WebSocketEventSendingConnection> {
log.d {
"Finding connection in $connections by reference $reference"
"Filtering connections $connections by reference $reference"
}

val connection = connections.find {
val connections = connections.filter {
it.reference == reference
}

log.d {
"Found connection $connection. Wanted a connection with reference $reference"
"Found connections $connections. Wanted connections with reference $reference"
}

return connection
return connections
}
}

0 comments on commit d2d1dec

Please sign in to comment.