Skip to content

Commit

Permalink
Continued actual fixes for packet protocols
Browse files Browse the repository at this point in the history
*NOTE*: This is a breaking change to most protocols
  • Loading branch information
Monkopedia committed Sep 4, 2022
1 parent 0ef4097 commit ff7cbc9
Show file tree
Hide file tree
Showing 32 changed files with 535 additions and 274 deletions.
10 changes: 5 additions & 5 deletions ksrpc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ kotlin {
sourceSets["commonMain"].dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-serialization-core:1.3.3")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3-native-mt")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
implementation("org.jetbrains.kotlinx:atomicfu:0.17.1")
implementation("io.ktor:ktor-client-core:2.0.2")
implementation("io.ktor:ktor-client-websockets:2.0.2")
implementation("io.ktor:ktor-http:2.0.2")
implementation("io.ktor:ktor-serialization-kotlinx-json:2.0.2")
}
sourceSets["commonTest"].dependencies {
implementation(kotlin("test"))
Expand All @@ -88,7 +89,7 @@ kotlin {
}
sourceSets["jvmTest"].dependencies {
implementation(kotlin("test-junit"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3-native-mt")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
implementation("io.ktor:ktor-server-core:2.0.2")
implementation("io.ktor:ktor-server-netty:2.0.2")
implementation("io.ktor:ktor-serialization-jackson:2.0.2")
Expand All @@ -97,7 +98,7 @@ kotlin {
implementation("io.ktor:ktor-client-okhttp:2.0.2")
implementation("io.ktor:ktor-server-websockets:2.0.2")
implementation("io.ktor:ktor-client-websockets:2.0.2")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.3-native-mt")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4")
}
sourceSets["jsTest"].dependencies {
implementation(kotlin("test-js"))
Expand All @@ -109,7 +110,7 @@ kotlin {
}
sourceSets["nativeMain"].dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3-native-mt")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
implementation("io.ktor:ktor-client-curl:2.0.2")
}
}
Expand All @@ -128,7 +129,6 @@ kotlin.targets.withType(org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarge
}
}


val dokkaJavadoc = tasks.create("dokkaJavadocCustom", org.jetbrains.dokka.gradle.DokkaTask::class) {
dependencies {
plugins("org.jetbrains.dokka:kotlin-as-java-plugin:1.4.10.2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.monkopedia.ksrpc.internal
package com.monkopedia.ksrpc

import io.ktor.websocket.Frame
import io.ktor.websocket.Frame.Text
import io.ktor.websocket.readText
internal expect fun epochMillis(): Long
12 changes: 5 additions & 7 deletions ksrpc/src/commonMain/kotlin/KsrpcEnvironment.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -28,7 +28,6 @@ interface KsrpcEnvironment {
val serialization: StringFormat
val defaultScope: CoroutineScope
val errorListener: ErrorListener
val maxParallelReceives: Int
val coroutineExceptionHandler: CoroutineExceptionHandler

interface Element {
Expand Down Expand Up @@ -64,12 +63,11 @@ fun ksrpcEnvironment(builder: KsrpcEnvironmentBuilder.() -> Unit): KsrpcEnvironm
data class KsrpcEnvironmentBuilder internal constructor(
override var serialization: StringFormat = Json,
override var defaultScope: CoroutineScope = GlobalScope,
override var errorListener: ErrorListener = ErrorListener { },
override var maxParallelReceives: Int = 5
override var errorListener: ErrorListener = ErrorListener { }
) : KsrpcEnvironment {
override val coroutineExceptionHandler: CoroutineExceptionHandler by lazy {
CoroutineExceptionHandler { _, throwable ->
errorListener.onError(throwable)
}
}
}
}
6 changes: 3 additions & 3 deletions ksrpc/src/commonMain/kotlin/channels/HttpChannels.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
8 changes: 4 additions & 4 deletions ksrpc/src/commonMain/kotlin/channels/JsonRpcChannels.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,8 +21,8 @@ import com.monkopedia.ksrpc.internal.jsonrpc.jsonHeader
import com.monkopedia.ksrpc.internal.jsonrpc.jsonLine
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineScope

suspend fun Pair<ByteReadChannel, ByteWriteChannel>.asJsonRpcConnection(
env: KsrpcEnvironment,
Expand Down
8 changes: 4 additions & 4 deletions ksrpc/src/commonMain/kotlin/channels/ReadWriteChannels.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,8 +19,8 @@ import com.monkopedia.ksrpc.KsrpcEnvironment
import com.monkopedia.ksrpc.internal.ReadWritePacketChannel
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineScope

internal const val CONTENT_LENGTH = "Content-Length"
internal const val CONTENT_TYPE = "Content-Type"
Expand Down
15 changes: 10 additions & 5 deletions ksrpc/src/commonMain/kotlin/internal/HostSerializedServiceImpl.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -30,9 +30,9 @@ import com.monkopedia.ksrpc.channels.Connection
import com.monkopedia.ksrpc.channels.SerializedChannel
import com.monkopedia.ksrpc.channels.SerializedService
import com.monkopedia.ksrpc.channels.randomUuid
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext

internal class HostSerializedChannelImpl(
override val env: KsrpcEnvironment,
Expand All @@ -55,7 +55,12 @@ internal class HostSerializedChannelImpl(
serviceMap[channelId.id] ?: error("Cannot find service ${channelId.id}")
}
withContext(context) {
channel.call(endpoint, data)
if (endpoint.isEmpty()) {
close(channelId)
CallData.create("{}")
} else {
channel.call(endpoint, data)
}
}
} catch (t: Throwable) {
env.errorListener.onError(t)
Expand Down
17 changes: 10 additions & 7 deletions ksrpc/src/commonMain/kotlin/internal/HttpSerializedChannel.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,6 +19,7 @@ import com.monkopedia.ksrpc.ERROR_PREFIX
import com.monkopedia.ksrpc.KSRPC_BINARY
import com.monkopedia.ksrpc.KSRPC_CHANNEL
import com.monkopedia.ksrpc.KsrpcEnvironment
import com.monkopedia.ksrpc.RpcEndpointException
import com.monkopedia.ksrpc.RpcFailure
import com.monkopedia.ksrpc.channels.CallData
import com.monkopedia.ksrpc.channels.ChannelClient
Expand All @@ -35,8 +36,8 @@ import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.http.encodeURLPath
import io.ktor.utils.io.ByteReadChannel
import kotlinx.serialization.json.Json
import kotlin.coroutines.CoroutineContext
import kotlinx.serialization.json.Json

internal class HttpSerializedChannel(
private val httpClient: HttpClient,
Expand All @@ -48,14 +49,14 @@ internal class HttpSerializedChannel(
override val context: CoroutineContext =
ClientChannelContext(this) + env.coroutineExceptionHandler

override suspend fun call(channelId: ChannelId, endpoint: String, input: CallData): CallData {
override suspend fun call(channelId: ChannelId, endpoint: String, data: CallData): CallData {
val response = httpClient.post(
"$baseStripped/call/${endpoint.encodeURLPath()}"
) {
accept(ContentType.Application.Json)
headers[KSRPC_BINARY] = input.isBinary.toString()
headers[KSRPC_BINARY] = data.isBinary.toString()
headers[KSRPC_CHANNEL] = channelId.id
setBody(if (input.isBinary) input.readBinary() else input.readSerialized())
setBody(if (data.isBinary) data.readBinary() else data.readSerialized())
}
response.checkErrors()
if (response.headers[KSRPC_BINARY]?.toBoolean() == true) {
Expand Down Expand Up @@ -92,5 +93,7 @@ internal suspend fun HttpResponse.checkErrors() {
} else {
throw IllegalStateException("Can't parse error $this")
}
} else if (status == HttpStatusCode.NotFound) {
throw RpcEndpointException("Url not found $this")
}
}
84 changes: 84 additions & 0 deletions ksrpc/src/commonMain/kotlin/internal/MultiChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2021 Jason Monk
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.monkopedia.ksrpc.internal

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.sync.Mutex

internal class MultiChannel<T> {

private var isClosed: Boolean = false
private val lock = Mutex()
private val pending = mutableListOf<Pair<String, CompletableDeferred<T>>>()
private var id = 1

private fun checkClosed() {
require(!isClosed) {
"$this has already been closed"
}
}

suspend fun send(id: String, response: T) {
checkClosed()
lock.lock()
try {
val hasPending = pending.consume(matcher = { it.first == id }) { (_, pendingItem) ->
pendingItem.complete(response)
}
if (!hasPending) {
error("No pending receiver for $id and $response")
}
} finally {
lock.unlock()
}
}

suspend fun allocateReceive(): Pair<Int, Deferred<T>> {
checkClosed()
lock.lock()
try {
val id = this.id++
val completable = CompletableDeferred<T>()
pending.add(id.toString() to completable)
return id to completable
} finally {
lock.unlock()
}
}

suspend fun close(t: CancellationException? = null) {
lock.lock()
isClosed = true
pending.forEach {
it.second.completeExceptionally(t ?: CancellationException("Closing MultiChannel"))
}
lock.unlock()
}
}

internal inline fun <T> MutableList<T>.consume(
crossinline matcher: (T) -> Boolean,
crossinline consumer: (T) -> Unit
): Boolean {
return removeAll {
if (matcher(it)) {
consumer(it)
true
} else false
}
}
6 changes: 3 additions & 3 deletions ksrpc/src/commonMain/kotlin/internal/Packet.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2021 Jason Monk
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Loading

0 comments on commit ff7cbc9

Please sign in to comment.