Skip to content

Commit

Permalink
Split annotation into separate project, add logging API
Browse files Browse the repository at this point in the history
  • Loading branch information
Monkopedia committed Mar 28, 2024
1 parent 362f91a commit 63600fa
Show file tree
Hide file tree
Showing 23 changed files with 167 additions and 106 deletions.
7 changes: 5 additions & 2 deletions compiler/local-plugin/src/main/kotlin/GenerateKsrpcProject.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ fun Project.ksrpcModule(
plugins.apply("org.jetbrains.kotlin.plugin.serialization")

plugins.apply("org.jetbrains.dokka")
plugins.apply("com.monkopedia.ksrpc.plugin")
if (name != "ksrpc-annotation") {
plugins.apply("com.monkopedia.ksrpc.plugin")
}
if (includePublications) {
plugins.apply("org.gradle.maven-publish")
plugins.apply("org.gradle.signing")
Expand Down Expand Up @@ -146,8 +148,9 @@ fun Project.ksrpcModule(
}
applyDefaultHierarchyTemplate()
sourceSets["commonMain"].dependencies {
if (name != "ksrpc-core") {
if (name != "ksrpc-core" && name != "ksrpc-annotation") {
api(project(":ksrpc-core"))
api(project(":ksrpc-annotation"))
}
}
sourceSets["commonTest"].dependencies {
Expand Down
31 changes: 31 additions & 0 deletions ksrpc-annotation/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 Jason Monk
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.monkopedia.ksrpc.local.ksrpcModule

plugins {
kotlin("multiplatform")
}

ksrpcModule()

kotlin {
sourceSets["commonMain"].dependencies {
}
sourceSets["jvmMain"].dependencies {
}
sourceSets["jsMain"].dependencies {
}
}
1 change: 1 addition & 0 deletions ksrpc-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ kotlin {
api(libs.kotlinx.coroutines)
api(libs.kotlinx.atomicfu)
api(kotlin("stdlib"))
api(project(":ksrpc-annotation"))
}
sourceSets["jvmMain"].dependencies {
implementation(libs.jnanoid)
Expand Down
6 changes: 4 additions & 2 deletions ksrpc-core/src/commonMain/kotlin/KsrpcEnvironment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ interface KsrpcEnvironment<T> {
val serialization: CallDataSerializer<T>
val defaultScope: CoroutineScope
val errorListener: ErrorListener
val logger: Logger
val coroutineExceptionHandler: CoroutineExceptionHandler

interface Element<T> {
Expand All @@ -53,7 +54,7 @@ fun <T> KsrpcEnvironment<T>.reconfigure(
builder: KsrpcEnvironmentBuilder<T>.() -> Unit
): KsrpcEnvironment<T> {
val b = (this as? KsrpcEnvironmentBuilder)?.copy()
?: KsrpcEnvironmentBuilder(serialization, defaultScope, errorListener)
?: KsrpcEnvironmentBuilder(serialization, defaultScope, logger, errorListener)
b.builder()
return b
}
Expand All @@ -63,7 +64,7 @@ fun <T> KsrpcEnvironment<T>.reconfigure(
*/
fun <T> KsrpcEnvironment<T>.onError(listener: ErrorListener): KsrpcEnvironment<T> {
val b = (this as? KsrpcEnvironmentBuilder)?.copy()
?: KsrpcEnvironmentBuilder(serialization, defaultScope, errorListener)
?: KsrpcEnvironmentBuilder(serialization, defaultScope, logger, errorListener)
b.errorListener = listener
return b
}
Expand Down Expand Up @@ -109,6 +110,7 @@ private class StringSerializer(val stringFormat: StringFormat = Json) : CallData
data class KsrpcEnvironmentBuilder<T> internal constructor(
override var serialization: CallDataSerializer<T>,
override var defaultScope: CoroutineScope = GlobalScope,
override var logger: Logger = object : Logger {},
override var errorListener: ErrorListener = ErrorListener { }
) : KsrpcEnvironment<T> {
override val coroutineExceptionHandler: CoroutineExceptionHandler by lazy {
Expand Down
8 changes: 8 additions & 0 deletions ksrpc-core/src/commonMain/kotlin/Logger.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.monkopedia.ksrpc

interface Logger {
fun debug(tag: String, message: String, throwable: Throwable? = null) = Unit
fun info(tag: String, message: String, throwable: Throwable? = null) = Unit
fun warn(tag: String, message: String, throwable: Throwable? = null) = Unit
fun error(tag: String, message: String, throwable: Throwable? = null) = Unit
}
17 changes: 16 additions & 1 deletion ksrpc-core/src/commonMain/kotlin/RpcMethod.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.monkopedia.ksrpc
import com.monkopedia.ksrpc.channels.CallData
import com.monkopedia.ksrpc.channels.ChannelId
import com.monkopedia.ksrpc.channels.SerializedService
import com.monkopedia.ksrpc.channels.randomUuid
import com.monkopedia.ksrpc.channels.registerHost
import com.monkopedia.ksrpc.internal.client
import com.monkopedia.ksrpc.internal.host
Expand All @@ -35,7 +36,9 @@ internal sealed interface Transformer<T> {

fun <S> unpackError(data: CallData<S>, channel: SerializedService<S>) {
if (!data.isBinary && channel.env.serialization.isError(data)) {
throw channel.env.serialization.decodeErrorCallData(data)
throw channel.env.serialization.decodeErrorCallData(data).also {
channel.env.logger.info("Transformer", "Decoding Throwable form CallData", it)
}
}
}
}
Expand All @@ -45,11 +48,13 @@ internal class SerializerTransformer<I>(private val serializer: KSerializer<I>)
get() = serializer != Unit.serializer()

override suspend fun <T> transform(input: I, channel: SerializedService<T>): CallData<T> {
channel.env.logger.debug("Transformer", "Serializing input to CallData")
return channel.env.serialization.createCallData(serializer, input)
}

override suspend fun <T> untransform(data: CallData<T>, channel: SerializedService<T>): I {
unpackError(data, channel)
channel.env.logger.debug("Transformer", "Deserializing CallData to type")
return channel.env.serialization.decodeCallData(serializer, data)
}
}
Expand All @@ -59,6 +64,7 @@ internal object BinaryTransformer : Transformer<ByteReadChannel> {
input: ByteReadChannel,
channel: SerializedService<T>
): CallData<T> {
channel.env.logger.debug("Transformer", "Serializing ByteReadChannel to CallData")
return CallData.createBinary(input)
}

Expand All @@ -67,6 +73,7 @@ internal object BinaryTransformer : Transformer<ByteReadChannel> {
channel: SerializedService<T>
): ByteReadChannel {
unpackError(data, channel)
channel.env.logger.debug("Transformer", "Deserializing ByteReadChannel to CallData")
return data.readBinary()
}
}
Expand All @@ -77,13 +84,15 @@ internal class SubserviceTransformer<T : RpcService>(
override suspend fun <S> transform(input: T, channel: SerializedService<S>): CallData<S> {
val host = host<S>() ?: error("Cannot transform service type to non-hosting channel")
val serviceId = host.registerHost(input, serviceObj)
channel.env.logger.info("Transformer", "Serializing Service to CallData(${serviceId.id})")
return channel.env.serialization.createCallData(String.serializer(), serviceId.id)
}

override suspend fun <S> untransform(data: CallData<S>, channel: SerializedService<S>): T {
val client = client<S>() ?: error("Cannot untransform service type from non-client channel")
unpackError(data, channel)
val serviceId = channel.env.serialization.decodeCallData(String.serializer(), data)
channel.env.logger.info("Transformer", "Deserializing CallData(${serviceId}) to Stub")
return serviceObj.createStub(client.wrapChannel(ChannelId(serviceId)))
}
}
Expand Down Expand Up @@ -113,7 +122,10 @@ class RpcMethod<T : RpcService, I, O> internal constructor(
): CallData<S> {
return withContext(channel.context) {
val transformedInput = inputTransform.untransform(input, channel)
val id = randomUuid()
channel.env.logger.info("Transformer", "($id) Calling endpoint $endpoint")
val output = method.invoke(service as T, transformedInput)
channel.env.logger.debug("Transformer", "($id) Completed endpoint $endpoint")
outputTransform.transform(output as O, channel)
}
}
Expand All @@ -122,7 +134,10 @@ class RpcMethod<T : RpcService, I, O> internal constructor(
internal suspend fun <S> callChannel(channel: SerializedService<S>, input: Any?): Any? {
return withContext(channel.context) {
val input = inputTransform.transform(input as I, channel)
val id = randomUuid()
channel.env.logger.info("Transformer", "($id) Calling remote endpoint $endpoint")
val transformedOutput = channel.call(this@RpcMethod, input)
channel.env.logger.debug("Transformer", "($id) Completed remote endpoint $endpoint")
outputTransform.untransform(transformedOutput, channel)
}
}
Expand Down
1 change: 0 additions & 1 deletion ksrpc-core/src/commonMain/kotlin/RpcService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.monkopedia.ksrpc

import com.monkopedia.ksrpc.annotation.KsService
import com.monkopedia.ksrpc.channels.SerializedService
import com.monkopedia.ksrpc.internal.HostSerializedServiceImpl

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class HostSerializedChannelImpl<T>(
}
}
} catch (t: Throwable) {
env.logger.info("SerializedChannel", "Exception thrown during dispatching", t)
env.errorListener.onError(t)
env.serialization.createErrorCallData(
RpcFailure.serializer(),
Expand All @@ -76,13 +77,15 @@ class HostSerializedChannelImpl<T>(
}

override suspend fun close(id: ChannelId) {
env.logger.debug("SerializedChannel", "Closing channel ${id.id}")
serviceMap.remove(id.id)?.let {
it.trackingService?.onSerializationClosed(it)
it.close()
}
}

override suspend fun close() {
env.logger.debug("SerializedChannel", "Closing entire channel")
serviceMap.values.forEach {
it.trackingService?.onSerializationClosed(it)
it.close()
Expand All @@ -102,12 +105,14 @@ class HostSerializedChannelImpl<T>(

override suspend fun registerHost(service: SerializedService<T>): ChannelId {
val serviceId = ChannelId(randomUuid())
env.logger.debug("SerializedChannel", "Registered host service ${serviceId.id}")
serviceMap[serviceId.id] = service
service.trackingService?.onSerializationCreated(service)
return serviceId
}

override suspend fun wrapChannel(channelId: ChannelId): SerializedService<T> {
env.logger.debug("SerializedChannel", "Wrapping (unmapping) local channel ${channelId.id}")
return serviceMap[channelId.id] ?: error("Unknown service ${channelId.id}")
}

Expand All @@ -118,6 +123,7 @@ class HostSerializedChannelImpl<T>(
val <T> SerializedChannel<T>.asClient: ChannelClient<T>
get() = object : ChannelClient<T>, SerializedChannel<T> by this {
override suspend fun wrapChannel(channelId: ChannelId): SerializedService<T> {
env.logger.debug("SerializedChannel", "Wrapping channel ${channelId.id}")
return SubserviceChannel(this, channelId)
}
}
Expand Down
18 changes: 7 additions & 11 deletions ksrpc-core/src/commonMain/kotlin/internal/MultiChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.monkopedia.ksrpc.internal

import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
Expand All @@ -26,7 +27,7 @@ class MultiChannel<T> {
private var closeException: Throwable? = null
private val lock = Mutex()
private val pending = mutableListOf<Pair<String, CompletableDeferred<T>>>()
private var id = 1
private val id = atomic(1)

private fun checkClosed() {
if (isClosed) {
Expand All @@ -49,17 +50,12 @@ class MultiChannel<T> {
}
}

suspend fun allocateReceive(): Pair<Int, Deferred<T>> {
fun allocateReceive(): Pair<Int, Deferred<T>> {
checkClosed()
lock.lock()
try {
val id = this.id++
val completable = CompletableDeferred<T>()
pending.add(id.toString() to completable)
return id to completable
} finally {
lock.unlock()
}
val id = this.id.getAndIncrement()
val completable = CompletableDeferred<T>()
pending.add(id.toString() to completable)
return id to completable
}

suspend fun close(t: CancellationException? = null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class SubserviceChannel<T>(
get() = baseChannel.context

override suspend fun call(endpoint: String, input: CallData<T>): CallData<T> {
env.logger.debug("SerializeService", "Calling from subservice ${serviceId.id}")
return baseChannel.call(serviceId, endpoint, input)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class JniConnection(
private val nativeEnvironment: Long
) : PacketChannelBase<JniSerialized>(scope, env) {
private val receiveChannel = Channel<Packet<JniSerialized>>()
private val sendLock = Mutex()
private val receiveLock = Mutex()
private val nativeConnection = createConnection(scope.asNativeScope, nativeEnvironment)

constructor(
Expand All @@ -48,32 +46,22 @@ class JniConnection(
finalize(nativeConnection, nativeEnvironment)
}

override suspend fun send(packet: Packet<JniSerialized>) {
sendLock.lock()
try {
val serialized = env.serialization.createCallData(
Packet.serializer(JniSerialized),
packet
override suspend fun sendLocked(packet: Packet<JniSerialized>) {
val serialized = env.serialization.createCallData(
Packet.serializer(JniSerialized),
packet
)
suspendCoroutine<Int> {
sendSerialized(
nativeConnection,
serialized.readSerialized(),
it.withConverter(newTypeConverter<Any?>().int)
)
suspendCoroutine<Int> {
sendSerialized(
nativeConnection,
serialized.readSerialized(),
it.withConverter(newTypeConverter<Any?>().int)
)
}
} finally {
sendLock.unlock()
}
}

override suspend fun receive(): Packet<JniSerialized> {
receiveLock.lock()
try {
return receiveChannel.receive()
} finally {
receiveLock.unlock()
}
override suspend fun receiveLocked(): Packet<JniSerialized> {
return receiveChannel.receive()
}

fun sendFromNative(packet: JniSerialized, continuation: NativeJniContinuation<Int>) {
Expand Down
Loading

0 comments on commit 63600fa

Please sign in to comment.