Skip to content

Commit

Permalink
Add initial support for jsonrpc 2.0
Browse files Browse the repository at this point in the history
This separates out an API for channels that can handle subservices (like
http, sockets, etc.) and those that can't (like jsonrpc). Bump the minor
version of this so it can be pushed out.

Squashed commit of the following:

commit 8b80111
Author: Jason Monk <jmonk@dream-crushed.com>
Date:   Wed Feb 16 20:12:51 2022 -0500

    Fix last couple API tweaks and run auto-formatter

    Also bump the version

commit 931fa02
Author: Jason Monk <jmonk@dream-crushed.com>
Date:   Mon Feb 14 20:35:23 2022 -0500

    Allow bidirectional jsonrpc

    Since LSP operates this way, assuming the protocol generally supports
    it.

commit 6e06e1f
Author: Jason Monk <jmonk@dream-crushed.com>
Date:   Sat Feb 12 10:52:19 2022 -0500

    Working json rpc implementation with all the tests passing

    Need to do some e2e tests to verify before pushing to main

commit 796f5ce
Author: Jason Monk <monk@fb.com>
Date:   Mon Jan 10 10:03:58 2022 -0500

    WIP Stuff

commit 98ebd77
Author: Jason Monk <monk@fb.com>
Date:   Mon Jan 10 10:03:58 2022 -0500

    WIP Stuff
  • Loading branch information
Monkopedia committed Feb 17, 2022
1 parent 15def2f commit bdc5bbf
Show file tree
Hide file tree
Showing 30 changed files with 1,377 additions and 183 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
build/
local.properties
**/build
**/*.swp

# Eclipse
.classpath
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.5.0
version=0.5.1
plugin_version=0.5.0
signing.gnupg.keyName=5B83421E2338B907
14 changes: 12 additions & 2 deletions ksrpc/src/commonMain/kotlin/RpcMethod.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import kotlinx.serialization.StringFormat
import kotlinx.serialization.builtins.serializer

internal sealed interface Transformer<T> {
val hasContent: Boolean
get() = true

suspend fun transform(input: T, channel: SerializedService): CallData
suspend fun untransform(data: CallData, channel: SerializedService): T

Expand All @@ -44,6 +47,9 @@ internal sealed interface Transformer<T> {
}

internal class SerializerTransformer<I>(private val serializer: KSerializer<I>) : Transformer<I> {
override val hasContent: Boolean
get() = serializer != Unit.serializer()

override suspend fun transform(input: I, channel: SerializedService): CallData {
return CallData.create(channel.env.serialization.encodeToString(serializer, input))
}
Expand Down Expand Up @@ -95,11 +101,15 @@ internal interface ServiceExecutor {
* A wrapper around calling into or from stubs/serialization.
*/
class RpcMethod<T : RpcService, I, O> internal constructor(
private val endpoint: String,
val endpoint: String,
private val inputTransform: Transformer<I>,
private val outputTransform: Transformer<O>,
private val method: ServiceExecutor
) {

internal val hasReturnType: Boolean
get() = outputTransform.hasContent

internal suspend fun call(
channel: SerializedService,
service: RpcService,
Expand All @@ -115,7 +125,7 @@ class RpcMethod<T : RpcService, I, O> internal constructor(
internal suspend fun callChannel(channel: SerializedService, input: Any?): Any? {
return withContext(channel.context) {
val input = inputTransform.transform(input as I, channel)
val transformedOutput = channel.call(endpoint, input)
val transformedOutput = channel.call(this@RpcMethod, input)
outputTransform.untransform(transformedOutput, channel)
}
}
Expand Down
32 changes: 25 additions & 7 deletions ksrpc/src/commonMain/kotlin/channels/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package com.monkopedia.ksrpc.channels
import com.monkopedia.ksrpc.RpcService
import com.monkopedia.ksrpc.serialized
import com.monkopedia.ksrpc.toStub
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.jvm.JvmName

internal interface SuspendInit {
Expand All @@ -29,7 +32,7 @@ internal interface SuspendInit {
*
* (Meaning @KsServices can be used for both input and output of any @KsMethod)
*/
interface Connection : ChannelHost, ChannelClient
interface Connection : ChannelHost, ChannelClient, SingleChannelConnection

internal interface ConnectionInternal :
Connection,
Expand All @@ -40,6 +43,11 @@ internal interface ConnectionInternal :

internal interface ConnectionProvider : ChannelHostProvider, ChannelClientProvider

/**
* A bidirectional channel that can host one service in each direction (1 host and 1 client).
*/
interface SingleChannelConnection : SingleChannelHost, SingleChannelClient

// Problems with JS compiler and serialization
data class ChannelId(val id: String)

Expand All @@ -55,19 +63,29 @@ internal expect interface VoidService : RpcService
* This is equivalent to calling [registerDefault] for [T] instance and using
* [defaultChannel] and [toStub] to create [R].
*/
suspend inline fun <reified T : RpcService, reified R : RpcService> Connection.connect(
crossinline host: (R) -> T
) = connect { channel ->
host(channel.toStub()).serialized(env)
@OptIn(ExperimentalContracts::class)
suspend inline fun <reified T : RpcService, reified R : RpcService> SingleChannelConnection.connect(
crossinline host: suspend (R) -> T
) {
contract {
callsInPlace(host, InvocationKind.EXACTLY_ONCE)
}
connect { channel ->
host(channel.toStub()).serialized(env)
}
}

/**
* Raw version of [connect], performing the same functionality with [SerializedService] directly.
*/
@JvmName("connectSerialized")
suspend fun Connection.connect(
host: (SerializedService) -> SerializedService
@OptIn(ExperimentalContracts::class)
suspend fun SingleChannelConnection.connect(
host: suspend (SerializedService) -> SerializedService
) {
contract {
callsInPlace(host, InvocationKind.EXACTLY_ONCE)
}
val recv = defaultChannel()
val serializedHost = host(recv)
registerDefault(serializedHost)
Expand Down
4 changes: 2 additions & 2 deletions ksrpc/src/commonMain/kotlin/channels/HttpChannels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ suspend fun HttpClient.asWebsocketConnection(baseUrl: String, env: KsrpcEnvironm
url.takeFrom(baseUrl.trimEnd('/'))
url.protocol = URLProtocol.WS
}
return threadSafe { context ->
return threadSafe<Connection> { context ->
WebsocketPacketChannel(
CoroutineScope(context),
context,
session,
env
)
}.also {
it.init()
(it as SuspendInit).init()
}
}
41 changes: 41 additions & 0 deletions ksrpc/src/commonMain/kotlin/channels/JsonRpcChannels.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Jason Monk
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.monkopedia.ksrpc.channels

import com.monkopedia.ksrpc.KsrpcEnvironment
import com.monkopedia.ksrpc.internal.ThreadSafeManager.threadSafe
import com.monkopedia.ksrpc.internal.jsonrpc.JsonRpcWriterBase
import com.monkopedia.ksrpc.internal.jsonrpc.jsonHeader
import com.monkopedia.ksrpc.internal.jsonrpc.jsonLine
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import kotlinx.coroutines.CoroutineScope

suspend fun Pair<ByteReadChannel, ByteWriteChannel>.asJsonRpcConnection(
env: KsrpcEnvironment,
includeContentHeaders: Boolean = true
): SingleChannelConnection {
return threadSafe<SingleChannelConnection> { context ->
JsonRpcWriterBase(
CoroutineScope(context),
context,
env,
if (includeContentHeaders) jsonHeader(env) else jsonLine(env)
)
}.also {
(it as? SuspendInit)?.init()
}
}
2 changes: 2 additions & 0 deletions ksrpc/src/commonMain/kotlin/channels/ReadWriteChannels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import io.ktor.utils.io.ByteWriteChannel
import kotlinx.coroutines.CoroutineScope

internal const val CONTENT_LENGTH = "Content-Length"
internal const val CONTENT_TYPE = "Content-Type"
internal const val DEFAULT_CONTENT_TYPE = "application/vscode-jsonrpc; charset=utf-8"
internal const val METHOD = "Method"
internal const val INPUT = "Input"
internal const val TYPE = "Type"
Expand Down
55 changes: 49 additions & 6 deletions ksrpc/src/commonMain/kotlin/channels/SerializedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package com.monkopedia.ksrpc.channels

import com.monkopedia.ksrpc.KsrpcEnvironment
import com.monkopedia.ksrpc.RpcMethod
import com.monkopedia.ksrpc.RpcObject
import com.monkopedia.ksrpc.RpcService
import com.monkopedia.ksrpc.SuspendCloseableObservable
import com.monkopedia.ksrpc.annotation.KsMethod
import com.monkopedia.ksrpc.channels.ChannelClient.Companion.DEFAULT
import com.monkopedia.ksrpc.internal.HostSerializedServiceImpl
import com.monkopedia.ksrpc.rpcObject
import io.ktor.utils.io.ByteReadChannel
Expand All @@ -37,7 +40,7 @@ suspend inline fun <reified T : RpcService> ChannelHost.registerHost(
/**
* Register a service to be hosted on the default channel.
*/
suspend inline fun <reified T : RpcService> ChannelHost.registerDefault(
suspend inline fun <reified T : RpcService> SingleChannelHost.registerDefault(
service: T
) = registerDefault(service, rpcObject())

Expand All @@ -56,7 +59,7 @@ suspend fun <T : RpcService> ChannelHost.registerHost(
/**
* Register a service to be hosted on the default channel.
*/
suspend fun <T : RpcService> ChannelHost.registerDefault(
suspend fun <T : RpcService> SingleChannelHost.registerDefault(
service: T,
obj: RpcObject<T>
) {
Expand All @@ -67,15 +70,35 @@ internal interface ChannelHostProvider {
val host: ChannelHost?
}

/**
* A wrapper around a communication pathway that can be turned into a primary
* SerializedService.
*/
interface SingleChannelHost : KsrpcEnvironment.Element {
/**
* Register the primary service to be hosted on this communication channel.
*
* The coroutine context and dispatcher on which calls are executed in on depends
* on the construction of the host.
*/
suspend fun registerDefault(service: SerializedService)
}

/**
* A [SerializedChannel] that can host sub-services.
*
* This could be a bidirectional conduit like a [Connection], or it could be a hosting only
* service such as http hosting.
*/
interface ChannelHost : SerializedChannel, KsrpcEnvironment.Element {
interface ChannelHost : SerializedChannel, SingleChannelHost, KsrpcEnvironment.Element {
/**
* Add a serialized service that can receive calls on this channel with the returned
* [ChannelId]. The calls will be allowed until [close] is called.
*
* Generally this shouldn't need to be called directly, as services returned from
* [KsMethod]s are automatically registered and translated across a channel.
*/
suspend fun registerHost(service: SerializedService): ChannelId
suspend fun registerDefault(service: SerializedService)
}

internal interface ChannelHostInternal : ChannelHost, ChannelHostProvider {
Expand All @@ -87,20 +110,38 @@ internal interface ChannelClientProvider {
val client: ChannelClient?
}

/**
* A wrapper around a communication pathway that can be turned into a primary
* SerializedService.
*/
interface SingleChannelClient {

/**
* Get a [SerializedService] that is the default on this client
*/
suspend fun defaultChannel(): SerializedService
}

/**
* A [SerializedChannel] that can call into sub-services.
*
* This could be a bidirectional conduit like a [Connection], or it could be a client only
* service such as http client.
*/
interface ChannelClient : SerializedChannel, KsrpcEnvironment.Element {
interface ChannelClient : SerializedChannel, SingleChannelClient, KsrpcEnvironment.Element {
/**
* Takes a given channel id and creates a service wrapper to make calls on that channel.
*
* Generally this shouldn't be called directly, as services returned from [KsMethod]s
* will automatically be wrapped before being returned from stubs.
*/
suspend fun wrapChannel(channelId: ChannelId): SerializedService

/**
* Get a [SerializedService] that is the default on this client
* (i.e. using [DEFAULT] channel id). This should act as the root service for most scenarios.
*/
suspend fun defaultChannel() = wrapChannel(ChannelId(DEFAULT))
override suspend fun defaultChannel() = wrapChannel(ChannelId(DEFAULT))

companion object {
/**
Expand Down Expand Up @@ -146,6 +187,8 @@ interface SerializedService :
ContextContainer,
KsrpcEnvironment.Element {
suspend fun call(endpoint: String, input: CallData): CallData
suspend fun call(endpoint: RpcMethod<*, *, *>, input: CallData): CallData =
call(endpoint.endpoint, input)
}

internal expect fun randomUuid(): String
Expand Down
16 changes: 16 additions & 0 deletions ksrpc/src/commonMain/kotlin/channels/SingleServiceChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2021 Jason Monk
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.monkopedia.ksrpc.channels
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ internal class ReadWritePacketChannel(
}
}

private suspend fun ByteWriteChannel.appendLine(s: String = "") = writeStringUtf8("$s\n")
internal suspend fun ByteWriteChannel.appendLine(s: String = "") = writeStringUtf8("$s\n")

@OptIn(InternalAPI::class)
private suspend fun ByteWriteChannel.send(
Expand Down Expand Up @@ -113,7 +113,7 @@ private suspend fun ByteReadChannel.readContent(
}
}

private suspend fun ByteReadChannel.readFields(): Map<String, String> {
internal suspend fun ByteReadChannel.readFields(): Map<String, String> {
val fields = mutableListOf<String>()
var line = readUTF8Line()
while (line == null || line.isNotEmpty()) {
Expand Down
5 changes: 4 additions & 1 deletion ksrpc/src/commonMain/kotlin/internal/ThreadUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.monkopedia.ksrpc.internal

import com.monkopedia.ksrpc.KsrpcEnvironment
import com.monkopedia.ksrpc.channels.ChannelClientInternal
import com.monkopedia.ksrpc.channels.ChannelClientProvider
import com.monkopedia.ksrpc.channels.ChannelHostInternal
Expand All @@ -37,7 +38,9 @@ internal interface ThreadSafeKeyedHost : ThreadSafeKeyed, ChannelHostInternal
internal expect object ThreadSafeManager {
inline fun createKey(): Any
inline fun <reified T : Any> T.threadSafe(): T
inline fun <reified T : Any> threadSafe(creator: (CoroutineContext) -> T): T
inline fun <reified T : KsrpcEnvironment.Element> threadSafe(
creator: (CoroutineContext) -> T
): T

inline fun ThreadSafeKeyedConnection.threadSafeProvider(): ConnectionProvider
inline fun ThreadSafeKeyedClient.threadSafeProvider(): ChannelClientProvider
Expand Down
Loading

0 comments on commit bdc5bbf

Please sign in to comment.