diff --git a/app/src/main/kotlin/org/eclipse/kuksa/testapp/KuksaDataBrokerActivity.kt b/app/src/main/kotlin/org/eclipse/kuksa/testapp/KuksaDataBrokerActivity.kt index 706aef7c..d1a382a7 100644 --- a/app/src/main/kotlin/org/eclipse/kuksa/testapp/KuksaDataBrokerActivity.kt +++ b/app/src/main/kotlin/org/eclipse/kuksa/testapp/KuksaDataBrokerActivity.kt @@ -32,11 +32,14 @@ import androidx.lifecycle.lifecycleScope import org.eclipse.kuksa.CoroutineCallback import org.eclipse.kuksa.DataBrokerConnection import org.eclipse.kuksa.DisconnectListener +import org.eclipse.kuksa.PropertyObserver +import org.eclipse.kuksa.VssSpecificationObserver import org.eclipse.kuksa.extension.metadata import org.eclipse.kuksa.extension.valueType import org.eclipse.kuksa.model.Property import org.eclipse.kuksa.proto.v1.KuksaValV1 import org.eclipse.kuksa.proto.v1.KuksaValV1.GetResponse +import org.eclipse.kuksa.proto.v1.Types import org.eclipse.kuksa.proto.v1.Types.Datapoint import org.eclipse.kuksa.testapp.databroker.DataBrokerEngine import org.eclipse.kuksa.testapp.databroker.JavaDataBrokerEngine @@ -85,6 +88,27 @@ class KuksaDataBrokerActivity : ComponentActivity() { outputViewModel.appendOutput("DataBroker disconnected") } + private val propertyObserver = object : PropertyObserver { + override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) { + Log.d(TAG, "onPropertyChanged path: vssPath = $vssPath, changedValue = $updatedValue") + outputViewModel.appendOutput("Updated value: $updatedValue") + } + + override fun onError(throwable: Throwable) { + outputViewModel.appendOutput("${throwable.message}") + } + } + + private val specificationObserver = object : VssSpecificationObserver { + override fun onSpecificationChanged(vssSpecification: VssSpecification) { + outputViewModel.appendOutput("Updated specification: $vssSpecification") + } + + override fun onError(throwable: Throwable) { + outputViewModel.appendOutput("Updated specification: ${throwable.message}") + } + } + private lateinit var dataBrokerEngine: DataBrokerEngine private val kotlinDataBrokerEngine by lazy { KotlinDataBrokerEngine(lifecycleScope) @@ -236,10 +260,8 @@ class KuksaDataBrokerActivity : ComponentActivity() { private fun subscribeProperty(property: Property) { Log.d(TAG, "Subscribing to property: $property") - dataBrokerEngine.subscribe(property) { vssPath, updatedValue -> - Log.d(TAG, "onPropertyChanged path: vssPath = $vssPath, changedValue = $updatedValue") - outputViewModel.appendOutput("Updated value: $updatedValue") - } + + dataBrokerEngine.subscribe(property, propertyObserver) } private fun fetchSpecification(specification: VssSpecification) { @@ -259,8 +281,6 @@ class KuksaDataBrokerActivity : ComponentActivity() { } private fun subscribeSpecification(specification: VssSpecification) { - dataBrokerEngine.subscribe(specification) { updatedSpecification -> - outputViewModel.appendOutput("Updated specification: $updatedSpecification") - } + dataBrokerEngine.subscribe(specification, specificationObserver) } } diff --git a/docs/kuksa-sdk_class-diagram.puml b/docs/kuksa-sdk_class-diagram.puml index a7d064b3..e6365887 100644 --- a/docs/kuksa-sdk_class-diagram.puml +++ b/docs/kuksa-sdk_class-diagram.puml @@ -49,6 +49,7 @@ package kuksa { interface PropertyObserver { + onPropertyChanged(vssPath: String, Types.DataEntry) + + onError(Throwable) } class Property { diff --git a/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DataBrokerConnection.kt b/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DataBrokerConnection.kt index 63d75b02..f3ac9033 100644 --- a/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DataBrokerConnection.kt +++ b/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DataBrokerConnection.kt @@ -107,6 +107,7 @@ class DataBrokerConnection internal constructor( override fun onError(t: Throwable?) { Log.e(TAG, "onError() called with: t = $t, cause: ${t?.cause}") + t?.let { propertyObserver.onError(t) } } override fun onCompleted() { @@ -160,18 +161,25 @@ class DataBrokerConnection internal constructor( // This is currently needed because we get multiple subscribe responses for every heir. Otherwise we // would override the last heir value with every new response. var updatedVssSpecification = specification - subscribe(leafProperties) { vssPath, updatedValue -> - Log.v(TAG, "Update from subscribed property: $vssPath - $updatedValue") + val propertyObserver = object : PropertyObserver { + override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) { + Log.v(TAG, "Update from subscribed property: $vssPath - $updatedValue") - updatedVssSpecification = updatedVssSpecification.copy(vssPath, updatedValue.value) + updatedVssSpecification = updatedVssSpecification.copy(vssPath, updatedValue.value) - initialSubscriptionUpdates[vssPath] = true - val isInitialSubscriptionComplete = initialSubscriptionUpdates.values.all { it } - if (isInitialSubscriptionComplete) { - Log.d(TAG, "Initial update for subscribed property complete: $vssPath - $updatedValue") - observer.onSpecificationChanged(updatedVssSpecification) + initialSubscriptionUpdates[vssPath] = true + val isInitialSubscriptionComplete = initialSubscriptionUpdates.values.all { it } + if (isInitialSubscriptionComplete) { + Log.d(TAG, "Initial update for subscribed property complete: $vssPath - $updatedValue") + observer.onSpecificationChanged(updatedVssSpecification) + } + } + + override fun onError(throwable: Throwable) { + observer.onError(throwable) } } + subscribe(leafProperties, propertyObserver) } catch (e: Exception) { throw DataBrokerException(e.message, e) } diff --git a/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/PropertyObserver.kt b/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/PropertyObserver.kt index 7ce8baf4..c01b547b 100644 --- a/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/PropertyObserver.kt +++ b/kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/PropertyObserver.kt @@ -25,21 +25,31 @@ import org.eclipse.kuksa.vsscore.model.VssSpecification /** * The Observer is used to notify about changes to subscribed properties. */ -fun interface PropertyObserver { +interface PropertyObserver { /** * Will be triggered with the [updatedValue] when the underlying [vssPath] changed it's value. */ fun onPropertyChanged(vssPath: String, updatedValue: DataEntry) + + /** + * Will be triggered when an error happens during subscription and forwards the [throwable]. + */ + fun onError(throwable: Throwable) } /** * The Observer is used to notify about subscribed [VssSpecification]. If a [VssSpecification] has children * then [onSpecificationChanged] will be called on every value change for every children. */ -fun interface VssSpecificationObserver { +interface VssSpecificationObserver { /** * Will be triggered with the [vssSpecification] when the underlying vssPath changed it's value or to inform about * the initial state. */ fun onSpecificationChanged(vssSpecification: T) + + /** + * Will be triggered when an error happens during subscription and forwards the [throwable]. + */ + fun onError(throwable: Throwable) } diff --git a/kuksa-sdk/src/test/kotlin/org/eclipse/kuksa/DataBrokerConnectionTest.kt b/kuksa-sdk/src/test/kotlin/org/eclipse/kuksa/DataBrokerConnectionTest.kt index 248e401a..8d9f9125 100644 --- a/kuksa-sdk/src/test/kotlin/org/eclipse/kuksa/DataBrokerConnectionTest.kt +++ b/kuksa-sdk/src/test/kotlin/org/eclipse/kuksa/DataBrokerConnectionTest.kt @@ -22,10 +22,12 @@ package org.eclipse.kuksa import io.grpc.ManagedChannel import io.kotest.core.spec.style.BehaviorSpec import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain import io.mockk.clearMocks import io.mockk.mockk import io.mockk.slot import io.mockk.verify +import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.eclipse.kuksa.databroker.DataBrokerConnectorProvider import org.eclipse.kuksa.model.Property @@ -162,6 +164,8 @@ class DataBrokerConnectionTest : BehaviorSpec({ val propertyObserver = mockk>(relaxed = true) dataBrokerConnection.subscribe(specification, observer = propertyObserver) + delay(100) + then("The #onSpecificationChanged method is triggered") { verify { propertyObserver.onSpecificationChanged(any()) } } @@ -214,8 +218,13 @@ class DataBrokerConnectionTest : BehaviorSpec({ val propertyObserver = mockk(relaxed = true) dataBrokerConnection.subscribe(properties, propertyObserver) - then("No crash should happen") { - // ignored + delay(100) + + then("The PropertyObserver#onError method should be triggered with 'NOT_FOUND' (Path not found)") { + val capturingSlot = slot() + verify { propertyObserver.onError(capture(capturingSlot)) } + val capturedThrowable = capturingSlot.captured + capturedThrowable.message shouldContain "NOT_FOUND" } } diff --git a/samples/src/main/kotlin/com/example/sample/KotlinActivity.kt b/samples/src/main/kotlin/com/example/sample/KotlinActivity.kt index 5bd20f97..3680d485 100644 --- a/samples/src/main/kotlin/com/example/sample/KotlinActivity.kt +++ b/samples/src/main/kotlin/com/example/sample/KotlinActivity.kt @@ -32,6 +32,7 @@ import org.eclipse.kuksa.DataBrokerException import org.eclipse.kuksa.DisconnectListener import org.eclipse.kuksa.PropertyObserver import org.eclipse.kuksa.model.Property +import org.eclipse.kuksa.proto.v1.Types import org.eclipse.kuksa.proto.v1.Types.Datapoint import java.io.IOException @@ -118,9 +119,16 @@ class KotlinActivity : AppCompatActivity() { } fun subscribeProperty(property: Property) { - val propertyObserver = PropertyObserver { vssPath, updatedValue -> - // handle property change + val propertyObserver = object : PropertyObserver { + override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) { + // handle property change + } + + override fun onError(throwable: Throwable) { + // handle error + } } + val properties = listOf(property) dataBrokerConnection?.subscribe(properties, propertyObserver) }