diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 383dc3e5..787103c1 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -14,17 +14,17 @@ object Versions { } const val java = 17 - const val slf4j = "2.0.9" - const val confluent = "7.5.0" - const val kafka = "7.5.0-ce" + const val slf4j = "2.0.13" + const val confluent = "7.6.0" + const val kafka = "${confluent}-ce" const val avro = "1.11.3" - const val jackson = "2.15.2" - const val okhttp = "4.11.0" + const val jackson = "2.15.3" + const val okhttp = "4.12.0" const val junit = "5.10.0" const val mockito = "5.5.0" const val mockitoKotlin = "5.1.0" const val hamcrest = "2.2" - const val radarSchemas = "0.8.4" + const val radarSchemas = "0.8.8" const val opencsv = "5.8" const val ktor = "2.3.4" const val coroutines = "1.7.3" diff --git a/radar-commons-gradle/build.gradle.kts b/radar-commons-gradle/build.gradle.kts index 909d919e..9e8ace5c 100644 --- a/radar-commons-gradle/build.gradle.kts +++ b/radar-commons-gradle/build.gradle.kts @@ -190,17 +190,17 @@ object Versions { } const val java = 17 - const val slf4j = "2.0.9" - const val confluent = "7.5.0" - const val kafka = "7.5.0-ce" + const val slf4j = "2.0.13" + const val confluent = "7.6.0" + const val kafka = "${confluent}-ce" const val avro = "1.11.3" - const val jackson = "2.15.2" - const val okhttp = "4.11.0" - const val junit = "5.10.0" + const val jackson = "2.15.3" + const val okhttp = "4.12.0" + const val junit = "5.10.3" const val mockito = "5.5.0" const val mockitoKotlin = "5.1.0" const val hamcrest = "2.2" - const val radarSchemas = "0.8.4" + const val radarSchemas = "0.8.8" const val opencsv = "5.8" const val ktor = "2.3.4" const val coroutines = "1.7.3" @@ -210,4 +210,3 @@ object Versions { const val gradleVersionsPlugin = "0.50.0" const val ktlint = "12.0.3" } - diff --git a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt index 59fa1d11..81b6b9b8 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt @@ -2,13 +2,11 @@ package org.radarbase.producer.schema import io.ktor.client.HttpClient import io.ktor.client.call.body +import io.ktor.client.plugins.auth.* +import io.ktor.client.plugins.auth.providers.* import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.client.plugins.defaultRequest -import io.ktor.client.request.HttpRequestBuilder -import io.ktor.client.request.accept -import io.ktor.client.request.request -import io.ktor.client.request.setBody -import io.ktor.client.request.url +import io.ktor.client.request.* import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.contentType @@ -21,13 +19,15 @@ import kotlinx.coroutines.withContext import kotlinx.serialization.json.Json import org.apache.avro.Schema import org.radarbase.producer.rest.RestException.Companion.toRestException +import org.slf4j.LoggerFactory import java.io.IOException +import java.net.URI import kotlin.coroutines.CoroutineContext /** REST client for Confluent schema registry. */ class SchemaRestClient( httpClient: HttpClient, - baseUrl: String, + private val baseUrl: String, private val ioContext: CoroutineContext = Dispatchers.IO, ) { private val httpClient: HttpClient = httpClient.config { @@ -39,10 +39,6 @@ class SchemaRestClient( }, ) } - defaultRequest { - url(baseUrl) - accept(ContentType.Application.Json) - } } suspend inline fun request( @@ -88,7 +84,7 @@ class SchemaRestClient( @Throws(IOException::class) suspend fun schemaGet(path: String): SchemaMetadata = request { method = HttpMethod.Get - url(path) + url(URI(baseUrl).resolve(path).toString()) } @Throws(IOException::class) @@ -97,7 +93,7 @@ class SchemaRestClient( schema: Schema, ): SchemaMetadata = request { method = HttpMethod.Post - url(path) + url(URI(baseUrl).resolve(path).toString()) contentType(ContentType.Application.Json) setBody(SchemaMetadata(schema = schema.toString())) } @@ -132,4 +128,8 @@ class SchemaRestClient( schemaGet("/schemas/ids/$id") .toParsedSchemaMetadata(id) .schema + + companion object { + private val logger = LoggerFactory.getLogger(SchemaRestClient::class.java) + } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt index 29d792b2..a1ca9761 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt @@ -205,8 +205,8 @@ open class SchemaRetriever(config: Config) { @RadarProducerDsl class Config( val baseUrl: String, - ) { var httpClient: HttpClient? = null + ) { var schemaTimeout: CacheConfig = DEFAULT_SCHEMA_TIMEOUT_CONFIG var ioContext: CoroutineContext = Dispatchers.IO fun httpClient(config: HttpClientConfig<*>.() -> Unit) { diff --git a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientIntegrationTest.kt b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientIntegrationTest.kt new file mode 100644 index 00000000..489cafe8 --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientIntegrationTest.kt @@ -0,0 +1,92 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.radarbase.producer.schema + +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.http.* +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import okhttp3.mockwebserver.MockWebServer +import org.apache.avro.Schema +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.radarbase.producer.io.timeout +import org.radarbase.producer.rest.RestKafkaSenderTest.Companion.enqueueJson +import java.io.IOException +import kotlin.time.Duration.Companion.seconds +import io.ktor.client.* +import io.ktor.client.request.* +import io.ktor.client.engine.cio.* +import io.ktor.client.plugins.auth.* +import io.ktor.client.plugins.auth.providers.* +import io.ktor.client.statement.* +import org.junit.Ignore + +class SchemaRestClientIntegrationTest { + private lateinit var retriever: SchemaRestClient + + // This testclass requires a functioning confluent cloud provider, therefore all tests here are ignored + + @BeforeEach + fun setUp() { + val apiSecret = "exampleSecret" + val apiKey = "exampleKey" + retriever = SchemaRestClient( + httpClient = HttpClient { + timeout(30.seconds) + install(Auth) { + basic { + sendWithoutRequest { true } + credentials { + BasicAuthCredentials(username = apiKey, password = apiSecret) + } + } + } + }, + baseUrl = "https://SOME_EXAMPLE.westeurope.azure.confluent.cloud", + ) + } + + @Ignore + @AfterEach + @Throws(IOException::class) + fun tearDown() { + } + + @Ignore + @Test + fun testSchemaGet() = runBlocking { + val actualSchemaMetadata = retriever.schemaGet("/schemas/ids/100042") + println(actualSchemaMetadata) + } + + @Ignore + @Test + fun testSchemaPost() = runBlocking { + val postResult = retriever.schemaPost("subjects/d90c8b88-5793-438a-b27d-6c87580cc3d9", Schema.create(Schema.Type.STRING)) + + println(postResult) + } + + @Test + fun testRetrieveSchemaById() = runBlocking { + val res = retriever.retrieveSchemaById(100086) + println(res) + } +} diff --git a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverIntegrationTest.kt b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverIntegrationTest.kt new file mode 100644 index 00000000..a0f8f575 --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverIntegrationTest.kt @@ -0,0 +1,96 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.radarbase.producer.schema + +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.http.* +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import okhttp3.mockwebserver.MockWebServer +import org.apache.avro.Schema +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.radarbase.producer.io.timeout +import org.radarbase.producer.rest.RestKafkaSenderTest.Companion.enqueueJson +import java.io.IOException +import kotlin.time.Duration.Companion.seconds +import io.ktor.client.* +import io.ktor.client.request.* +import io.ktor.client.engine.cio.* +import io.ktor.client.plugins.auth.* +import io.ktor.client.plugins.auth.providers.* +import io.ktor.client.statement.* +import org.junit.Ignore + +class SchemaRetrieverIntegrationTest { + private lateinit var retriever: SchemaRetriever + + // This testclass requires a functioning confluent cloud provider, therefore all tests here are ignored + + @BeforeEach + fun setUp() { + val apiSecret = "exampleSecret" + val apiKey = "exampleKey" + retriever = SchemaRetriever.schemaRetriever(baseUrl = "https://SOME_EXAMPLE.westeurope.azure.confluent.cloud") + { + httpClient = HttpClient(CIO) { + timeout(30.seconds) + install(Auth) { + basic { + sendWithoutRequest { true } + credentials { + BasicAuthCredentials(username = apiKey, password = apiSecret) + } + } + } + } + } + + + val restClient = SchemaRestClient( + httpClient = HttpClient { + timeout(30.seconds) + install(Auth) { + basic { + sendWithoutRequest { true } + credentials { + BasicAuthCredentials(username = apiKey, password = apiSecret) + } + } + } + }, + baseUrl = "https://SOME_EXAMPLE.westeurope.azure.confluent.cloud", + ) + } + + @AfterEach + @Throws(IOException::class) + fun tearDown() { + } + + @Ignore + @Test + fun testGetById() = runBlocking { + val topic = "android_phone_acceleration" + val id = 100042 + val ofValue = true + val res = retriever.getById(topic, ofValue, id) + println(res) + } +}