diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 383dc3e5..787103c1 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -14,17 +14,17 @@ object Versions { } const val java = 17 - const val slf4j = "2.0.9" - const val confluent = "7.5.0" - const val kafka = "7.5.0-ce" + const val slf4j = "2.0.13" + const val confluent = "7.6.0" + const val kafka = "${confluent}-ce" const val avro = "1.11.3" - const val jackson = "2.15.2" - const val okhttp = "4.11.0" + const val jackson = "2.15.3" + const val okhttp = "4.12.0" const val junit = "5.10.0" const val mockito = "5.5.0" const val mockitoKotlin = "5.1.0" const val hamcrest = "2.2" - const val radarSchemas = "0.8.4" + const val radarSchemas = "0.8.8" const val opencsv = "5.8" const val ktor = "2.3.4" const val coroutines = "1.7.3" diff --git a/radar-commons-gradle/build.gradle.kts b/radar-commons-gradle/build.gradle.kts index 909d919e..9e8ace5c 100644 --- a/radar-commons-gradle/build.gradle.kts +++ b/radar-commons-gradle/build.gradle.kts @@ -190,17 +190,17 @@ object Versions { } const val java = 17 - const val slf4j = "2.0.9" - const val confluent = "7.5.0" - const val kafka = "7.5.0-ce" + const val slf4j = "2.0.13" + const val confluent = "7.6.0" + const val kafka = "${confluent}-ce" const val avro = "1.11.3" - const val jackson = "2.15.2" - const val okhttp = "4.11.0" - const val junit = "5.10.0" + const val jackson = "2.15.3" + const val okhttp = "4.12.0" + const val junit = "5.10.3" const val mockito = "5.5.0" const val mockitoKotlin = "5.1.0" const val hamcrest = "2.2" - const val radarSchemas = "0.8.4" + const val radarSchemas = "0.8.8" const val opencsv = "5.8" const val ktor = "2.3.4" const val coroutines = "1.7.3" @@ -210,4 +210,3 @@ object Versions { const val gradleVersionsPlugin = "0.50.0" const val ktlint = "12.0.3" } - diff --git a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt index 59fa1d11..1e85dab0 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt @@ -2,13 +2,10 @@ package org.radarbase.producer.schema import io.ktor.client.HttpClient import io.ktor.client.call.body +import io.ktor.client.plugins.auth.* +import io.ktor.client.plugins.auth.providers.* import io.ktor.client.plugins.contentnegotiation.ContentNegotiation -import io.ktor.client.plugins.defaultRequest -import io.ktor.client.request.HttpRequestBuilder -import io.ktor.client.request.accept -import io.ktor.client.request.request -import io.ktor.client.request.setBody -import io.ktor.client.request.url +import io.ktor.client.request.* import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.contentType @@ -21,13 +18,15 @@ import kotlinx.coroutines.withContext import kotlinx.serialization.json.Json import org.apache.avro.Schema import org.radarbase.producer.rest.RestException.Companion.toRestException +import org.slf4j.LoggerFactory import java.io.IOException +import java.net.URI import kotlin.coroutines.CoroutineContext /** REST client for Confluent schema registry. */ class SchemaRestClient( httpClient: HttpClient, - baseUrl: String, + private val baseUrl: String, private val ioContext: CoroutineContext = Dispatchers.IO, ) { private val httpClient: HttpClient = httpClient.config { @@ -39,10 +38,6 @@ class SchemaRestClient( }, ) } - defaultRequest { - url(baseUrl) - accept(ContentType.Application.Json) - } } suspend inline fun request( @@ -88,7 +83,7 @@ class SchemaRestClient( @Throws(IOException::class) suspend fun schemaGet(path: String): SchemaMetadata = request { method = HttpMethod.Get - url(path) + url(URI(baseUrl).resolve(path).toString()) } @Throws(IOException::class) @@ -97,7 +92,7 @@ class SchemaRestClient( schema: Schema, ): SchemaMetadata = request { method = HttpMethod.Post - url(path) + url(URI(baseUrl).resolve(path).toString()) contentType(ContentType.Application.Json) setBody(SchemaMetadata(schema = schema.toString())) } @@ -132,4 +127,8 @@ class SchemaRestClient( schemaGet("/schemas/ids/$id") .toParsedSchemaMetadata(id) .schema + + companion object { + private val logger = LoggerFactory.getLogger(SchemaRestClient::class.java) + } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt index 29d792b2..82824b2c 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt @@ -205,8 +205,8 @@ open class SchemaRetriever(config: Config) { @RadarProducerDsl class Config( val baseUrl: String, + var httpClient: HttpClient? = null, ) { - var httpClient: HttpClient? = null var schemaTimeout: CacheConfig = DEFAULT_SCHEMA_TIMEOUT_CONFIG var ioContext: CoroutineContext = Dispatchers.IO fun httpClient(config: HttpClientConfig<*>.() -> Unit) { diff --git a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientIntegrationTest.kt b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientIntegrationTest.kt new file mode 100644 index 00000000..75555442 --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientIntegrationTest.kt @@ -0,0 +1,83 @@ +// /* +// * Copyright 2017 The Hyve and King's College London +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package org.radarbase.producer.schema +// +// import io.ktor.client.* +// import io.ktor.client.engine.cio.* +// import io.ktor.client.plugins.auth.* +// import io.ktor.client.plugins.auth.providers.* +// import io.ktor.client.request.* +// import io.ktor.client.statement.* +// import io.ktor.http.* +// import kotlinx.coroutines.runBlocking +// import org.apache.avro.Schema +// import org.junit.Ignore +// import org.junit.jupiter.api.AfterEach +// import org.junit.jupiter.api.BeforeEach +// import org.junit.jupiter.api.Test +// import org.radarbase.producer.io.timeout +// import java.io.IOException +// import kotlin.time.Duration.Companion.seconds +// +// class SchemaRestClientIntegrationTest { +// private lateinit var retriever: SchemaRestClient +// +// // This testclass requires a functioning confluent cloud provider, therefore all tests here are ignored +// +// @BeforeEach +// fun setUp() { +// val apiSecret = "exampleSecret" +// val apiKey = "exampleKey" +// retriever = SchemaRestClient( +// httpClient = HttpClient { +// timeout(30.seconds) +// install(Auth) { +// basic { +// sendWithoutRequest { true } +// credentials { +// BasicAuthCredentials(username = apiKey, password = apiSecret) +// } +// } +// } +// }, +// baseUrl = "https://SOME_EXAMPLE.westeurope.azure.confluent.cloud", +// ) +// } +// +// @AfterEach +// @Throws(IOException::class) +// fun tearDown() { +// } +// +// @Test +// fun testSchemaGet() = runBlocking { +// val actualSchemaMetadata = retriever.schemaGet("/schemas/ids/100042") +// println(actualSchemaMetadata) +// } +// +// @Test +// fun testSchemaPost() = runBlocking { +// val postResult = retriever.schemaPost("subjects/d90c8b88-5793-438a-b27d-6c87580cc3d9", Schema.create(Schema.Type.STRING)) +// +// println(postResult) +// } +// +// @Test +// fun testRetrieveSchemaById() = runBlocking { +// val res = retriever.retrieveSchemaById(100086) +// println(res) +// } +// } diff --git a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverIntegrationTest.kt b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverIntegrationTest.kt new file mode 100644 index 00000000..91ff643a --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverIntegrationTest.kt @@ -0,0 +1,86 @@ +// /* +// * Copyright 2017 The Hyve and King's College London +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package org.radarbase.producer.schema +// +// import io.ktor.client.* +// import io.ktor.client.engine.cio.* +// import io.ktor.client.plugins.auth.* +// import io.ktor.client.plugins.auth.providers.* +// import io.ktor.client.request.* +// import io.ktor.client.statement.* +// import io.ktor.http.* +// import kotlinx.coroutines.runBlocking +// import org.junit.Ignore +// import org.junit.jupiter.api.AfterEach +// import org.junit.jupiter.api.BeforeEach +// import org.junit.jupiter.api.Test +// import org.radarbase.producer.io.timeout +// import java.io.IOException +// import kotlin.time.Duration.Companion.seconds +// +// class SchemaRetrieverIntegrationTest { +// private lateinit var retriever: SchemaRetriever +// +// // This testclass requires a functioning confluent cloud provider, therefore all tests here are ignored +// +// @BeforeEach +// fun setUp() { +// val apiSecret = "exampleSecret" +// val apiKey = "exampleKey" +// retriever = SchemaRetriever.schemaRetriever(baseUrl = "https://SOME_EXAMPLE.westeurope.azure.confluent.cloud") { +// httpClient = HttpClient(CIO) { +// timeout(30.seconds) +// install(Auth) { +// basic { +// sendWithoutRequest { true } +// credentials { +// BasicAuthCredentials(username = apiKey, password = apiSecret) +// } +// } +// } +// } +// } +// +// val restClient = SchemaRestClient( +// httpClient = HttpClient { +// timeout(30.seconds) +// install(Auth) { +// basic { +// sendWithoutRequest { true } +// credentials { +// BasicAuthCredentials(username = apiKey, password = apiSecret) +// } +// } +// } +// }, +// baseUrl = "https://SOME_EXAMPLE.westeurope.azure.confluent.cloud", +// ) +// } +// +// @AfterEach +// @Throws(IOException::class) +// fun tearDown() { +// } +// +// @Test +// fun testGetById() = runBlocking { +// val topic = "android_phone_acceleration" +// val id = 100042 +// val ofValue = true +// val res = retriever.getById(topic, ofValue, id) +// println(res) +// } +// }