Skip to content

Commit

Permalink
chore: Add #onError Method to PropertyObserver
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed Oct 19, 2023
1 parent a46835e commit 5369dfa
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ import androidx.lifecycle.lifecycleScope
import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.VssSpecificationObserver
import org.eclipse.kuksa.extension.metadata
import org.eclipse.kuksa.extension.valueType
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.proto.v1.KuksaValV1
import org.eclipse.kuksa.proto.v1.KuksaValV1.GetResponse
import org.eclipse.kuksa.proto.v1.Types
import org.eclipse.kuksa.proto.v1.Types.Datapoint
import org.eclipse.kuksa.testapp.databroker.DataBrokerEngine
import org.eclipse.kuksa.testapp.databroker.JavaDataBrokerEngine
Expand Down Expand Up @@ -85,6 +88,27 @@ class KuksaDataBrokerActivity : ComponentActivity() {
outputViewModel.appendOutput("DataBroker disconnected")
}

private val propertyObserver = object : PropertyObserver {
override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) {
Log.d(TAG, "onPropertyChanged path: vssPath = $vssPath, changedValue = $updatedValue")
outputViewModel.appendOutput("Updated value: $updatedValue")
}

override fun onError(throwable: Throwable) {
outputViewModel.appendOutput("${throwable.message}")
}
}

private val specificationObserver = object : VssSpecificationObserver<VssSpecification> {
override fun onSpecificationChanged(vssSpecification: VssSpecification) {
outputViewModel.appendOutput("Updated specification: $vssSpecification")
}

override fun onError(throwable: Throwable) {
outputViewModel.appendOutput("Updated specification: ${throwable.message}")
}
}

private lateinit var dataBrokerEngine: DataBrokerEngine
private val kotlinDataBrokerEngine by lazy {
KotlinDataBrokerEngine(lifecycleScope)
Expand Down Expand Up @@ -236,10 +260,8 @@ class KuksaDataBrokerActivity : ComponentActivity() {

private fun subscribeProperty(property: Property) {
Log.d(TAG, "Subscribing to property: $property")
dataBrokerEngine.subscribe(property) { vssPath, updatedValue ->
Log.d(TAG, "onPropertyChanged path: vssPath = $vssPath, changedValue = $updatedValue")
outputViewModel.appendOutput("Updated value: $updatedValue")
}

dataBrokerEngine.subscribe(property, propertyObserver)
}

private fun fetchSpecification(specification: VssSpecification) {
Expand All @@ -259,8 +281,6 @@ class KuksaDataBrokerActivity : ComponentActivity() {
}

private fun subscribeSpecification(specification: VssSpecification) {
dataBrokerEngine.subscribe(specification) { updatedSpecification ->
outputViewModel.appendOutput("Updated specification: $updatedSpecification")
}
dataBrokerEngine.subscribe(specification, specificationObserver)
}
}
1 change: 1 addition & 0 deletions docs/kuksa-sdk_class-diagram.puml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ package kuksa {

interface PropertyObserver {
+ onPropertyChanged(vssPath: String, Types.DataEntry)
+ onError(Throwable)
}

class Property {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class DataBrokerConnection internal constructor(

override fun onError(t: Throwable?) {
Log.e(TAG, "onError() called with: t = $t, cause: ${t?.cause}")
t?.let { propertyObserver.onError(t) }
}

override fun onCompleted() {
Expand Down Expand Up @@ -160,18 +161,25 @@ class DataBrokerConnection internal constructor(
// This is currently needed because we get multiple subscribe responses for every heir. Otherwise we
// would override the last heir value with every new response.
var updatedVssSpecification = specification
subscribe(leafProperties) { vssPath, updatedValue ->
Log.v(TAG, "Update from subscribed property: $vssPath - $updatedValue")
val propertyObserver = object : PropertyObserver {
override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) {
Log.v(TAG, "Update from subscribed property: $vssPath - $updatedValue")

updatedVssSpecification = updatedVssSpecification.copy(vssPath, updatedValue.value)
updatedVssSpecification = updatedVssSpecification.copy(vssPath, updatedValue.value)

initialSubscriptionUpdates[vssPath] = true
val isInitialSubscriptionComplete = initialSubscriptionUpdates.values.all { it }
if (isInitialSubscriptionComplete) {
Log.d(TAG, "Initial update for subscribed property complete: $vssPath - $updatedValue")
observer.onSpecificationChanged(updatedVssSpecification)
initialSubscriptionUpdates[vssPath] = true
val isInitialSubscriptionComplete = initialSubscriptionUpdates.values.all { it }
if (isInitialSubscriptionComplete) {
Log.d(TAG, "Initial update for subscribed property complete: $vssPath - $updatedValue")
observer.onSpecificationChanged(updatedVssSpecification)
}
}

override fun onError(throwable: Throwable) {
observer.onError(throwable)
}
}
subscribe(leafProperties, propertyObserver)
} catch (e: Exception) {
throw DataBrokerException(e.message, e)
}
Expand Down
14 changes: 12 additions & 2 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/PropertyObserver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,31 @@ import org.eclipse.kuksa.vsscore.model.VssSpecification
/**
* The Observer is used to notify about changes to subscribed properties.
*/
fun interface PropertyObserver {
interface PropertyObserver {
/**
* Will be triggered with the [updatedValue] when the underlying [vssPath] changed it's value.
*/
fun onPropertyChanged(vssPath: String, updatedValue: DataEntry)

/**
* Will be triggered when an error happens during subscription and forwards the [throwable].
*/
fun onError(throwable: Throwable)
}

/**
* The Observer is used to notify about subscribed [VssSpecification]. If a [VssSpecification] has children
* then [onSpecificationChanged] will be called on every value change for every children.
*/
fun interface VssSpecificationObserver<T : VssSpecification> {
interface VssSpecificationObserver<T : VssSpecification> {
/**
* Will be triggered with the [vssSpecification] when the underlying vssPath changed it's value or to inform about
* the initial state.
*/
fun onSpecificationChanged(vssSpecification: T)

/**
* Will be triggered when an error happens during subscription and forwards the [throwable].
*/
fun onError(throwable: Throwable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ package org.eclipse.kuksa
import io.grpc.ManagedChannel
import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.string.shouldContain
import io.mockk.clearMocks
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.eclipse.kuksa.databroker.DataBrokerConnectorProvider
import org.eclipse.kuksa.model.Property
Expand Down Expand Up @@ -162,6 +164,8 @@ class DataBrokerConnectionTest : BehaviorSpec({
val propertyObserver = mockk<VssSpecificationObserver<VssDriver>>(relaxed = true)
dataBrokerConnection.subscribe(specification, observer = propertyObserver)

delay(100)

then("The #onSpecificationChanged method is triggered") {
verify { propertyObserver.onSpecificationChanged(any()) }
}
Expand Down Expand Up @@ -214,8 +218,13 @@ class DataBrokerConnectionTest : BehaviorSpec({
val propertyObserver = mockk<PropertyObserver>(relaxed = true)
dataBrokerConnection.subscribe(properties, propertyObserver)

then("No crash should happen") {
// ignored
delay(100)

then("The PropertyObserver#onError method should be triggered with 'NOT_FOUND' (Path not found)") {
val capturingSlot = slot<Throwable>()
verify { propertyObserver.onError(capture(capturingSlot)) }
val capturedThrowable = capturingSlot.captured
capturedThrowable.message shouldContain "NOT_FOUND"
}
}

Expand Down
12 changes: 10 additions & 2 deletions samples/src/main/kotlin/com/example/sample/KotlinActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.eclipse.kuksa.DataBrokerException
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.proto.v1.Types
import org.eclipse.kuksa.proto.v1.Types.Datapoint
import java.io.IOException

Expand Down Expand Up @@ -118,9 +119,16 @@ class KotlinActivity : AppCompatActivity() {
}

fun subscribeProperty(property: Property) {
val propertyObserver = PropertyObserver { vssPath, updatedValue ->
// handle property change
val propertyObserver = object : PropertyObserver {
override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) {
// handle property change
}

override fun onError(throwable: Throwable) {
// handle error
}
}

val properties = listOf(property)
dataBrokerConnection?.subscribe(properties, propertyObserver)
}
Expand Down

0 comments on commit 5369dfa

Please sign in to comment.