Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify about DataBroker Disconnect #14

Merged
merged 7 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.kuksa.CoroutineCallback;
import org.eclipse.kuksa.DataBrokerConnection;
import org.eclipse.kuksa.DataBrokerConnector;
import org.eclipse.kuksa.DisconnectListener;
import org.eclipse.kuksa.PropertyObserver;
import org.eclipse.kuksa.TimeoutConfig;
import org.eclipse.kuksa.model.Property;
Expand All @@ -41,7 +42,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand All @@ -62,6 +65,8 @@ public class JavaDataBrokerEngine implements DataBrokerEngine {
@Nullable
private DataBrokerConnection dataBrokerConnection = null;

private final Set<DisconnectListener> disconnectListeners = new HashSet<>();

public JavaDataBrokerEngine(@NonNull AssetManager assetManager) {
this.assetManager = assetManager;
}
Expand Down Expand Up @@ -135,7 +140,12 @@ private void connect(
connector.connect(new CoroutineCallback<>() {
@Override
public void onSuccess(@Nullable DataBrokerConnection result) {
if (result == null) return;

JavaDataBrokerEngine.this.dataBrokerConnection = result;
for (DisconnectListener listener : disconnectListeners) {
result.getDisconnectListeners().register(listener);
}

callback.onSuccess(result);
}
Expand Down Expand Up @@ -186,6 +196,7 @@ public void disconnect() {
}

dataBrokerConnection.disconnect();
dataBrokerConnection = null;
}

@Nullable
Expand All @@ -198,4 +209,20 @@ public DataBrokerConnection getDataBrokerConnection() {
public void setDataBrokerConnection(@Nullable DataBrokerConnection dataBrokerConnection) {
this.dataBrokerConnection = dataBrokerConnection;
}

@Override
public void registerDisconnectListener(@NonNull DisconnectListener listener) {
disconnectListeners.add(listener);
if (dataBrokerConnection != null) {
dataBrokerConnection.getDisconnectListeners().register(listener);
}
}

@Override
public void unregisterDisconnectListener(@NonNull DisconnectListener listener) {
disconnectListeners.remove(listener);
if (dataBrokerConnection != null) {
dataBrokerConnection.getDisconnectListeners().unregister(listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import androidx.compose.ui.Modifier
import androidx.lifecycle.lifecycleScope
import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.extension.metadata
import org.eclipse.kuksa.model.Property
Expand Down Expand Up @@ -61,20 +62,26 @@ class KuksaDataBrokerActivity : ComponentActivity() {

private val dataBrokerConnectionCallback = object : CoroutineCallback<DataBrokerConnection>() {
override fun onSuccess(result: DataBrokerConnection?) {
outputViewModel.appendOutput("Connection to data broker was successful")
outputViewModel.appendOutput("Connection to DataBroker successful established")
connectionViewModel.updateConnectionState(ConnectionViewState.CONNECTED)
}

override fun onError(error: Throwable) {
outputViewModel.appendOutput("Connection to data broker failed: ${error.message}")
outputViewModel.appendOutput("Connection to DataBroker failed: ${error.message}")
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
}
}

private val onDisconnectListener = DisconnectListener {
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
outputViewModel.appendOutput("DataBroker disconnected")
}

private lateinit var dataBrokerEngine: DataBrokerEngine
private val kotlinDataBrokerEngine by lazy {
KotlinDataBrokerEngine(lifecycleScope, assets)
}

private val javaDataBrokerEngine by lazy {
JavaDataBrokerEngine(assets)
}
Expand Down Expand Up @@ -125,6 +132,18 @@ class KuksaDataBrokerActivity : ComponentActivity() {
}
}

override fun onPostCreate(savedInstanceState: Bundle?) {
super.onPostCreate(savedInstanceState)

dataBrokerEngine.registerDisconnectListener(onDisconnectListener)
}

override fun onDestroy() {
super.onDestroy()

dataBrokerEngine.unregisterDisconnectListener(onDisconnectListener)
}

private fun connect(connectionInfo: ConnectionInfo) {
Log.d(TAG, "Connecting to DataBroker: $connectionInfo")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.kuksa.testapp.databroker

import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.proto.v1.KuksaValV1
Expand All @@ -41,4 +42,8 @@ interface DataBrokerEngine {

fun subscribe(property: Property, propertyObserver: PropertyObserver)
fun disconnect()

fun registerDisconnectListener(listener: DisconnectListener)

fun unregisterDisconnectListener(listener: DisconnectListener)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DataBrokerConnector
import org.eclipse.kuksa.DataBrokerException
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.TimeoutConfig
import org.eclipse.kuksa.model.Property
Expand All @@ -47,6 +48,8 @@ class KotlinDataBrokerEngine(
) : DataBrokerEngine {
override var dataBrokerConnection: DataBrokerConnection? = null

private val disconnectListeners = mutableSetOf<DisconnectListener>()

override fun connect(
connectionInfo: ConnectionInfo,
callback: CoroutineCallback<DataBrokerConnection>,
Expand Down Expand Up @@ -118,6 +121,10 @@ class KotlinDataBrokerEngine(
lifecycleScope.launch {
try {
dataBrokerConnection = connector.connect()
.also { connection ->
disconnectListeners.forEach { listener -> connection.disconnectListeners.register(listener) }
}

callback.onSuccess(dataBrokerConnection)
} catch (e: DataBrokerException) {
callback.onError(e)
Expand Down Expand Up @@ -158,6 +165,17 @@ class KotlinDataBrokerEngine(

override fun disconnect() {
dataBrokerConnection?.disconnect()
dataBrokerConnection = null
}

override fun registerDisconnectListener(listener: DisconnectListener) {
disconnectListeners.add(listener)
dataBrokerConnection?.disconnectListeners?.register(listener)
}

override fun unregisterDisconnectListener(listener: DisconnectListener) {
disconnectListeners.remove(listener)
dataBrokerConnection?.disconnectListeners?.unregister(listener)
}

companion object {
Expand Down
12 changes: 12 additions & 0 deletions docs/kuksa-sdk_class-diagram.puml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package kuksa {
DataBrokerConnection -down-> PropertyObserver
DataBrokerConnection -down-> Property
DataBrokerConnection -left-> DataBrokerException
DataBrokerConnection -up-> MultiListener
MultiListener -right-> DisconnectListener
TimeoutConfig -left-* DataBrokerConnector

class DataBrokerConnector {
Expand All @@ -38,6 +40,7 @@ package kuksa {
}

class DataBrokerConnection {
+ disconnectListeners: MultiListener<DisconnectListener>
+ subscribe(List<Property>, PropertyObserver)
+ fetchProperty(Property): GetResponse
+ updateProperty(Property, Datapoint): SetResponse
Expand All @@ -54,6 +57,15 @@ package kuksa {
}

class DataBrokerException

abstract class MultiListener<T> {
+ register(T)
+ unregister(T)
}

interface DisconnectListener {
+ onDisconnect()
}
}

DataBrokerConnector -up-> ManagedChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ class DataBrokerConnection internal constructor(
private val managedChannel: ManagedChannel,
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default,
) {
val disconnectListeners = MultiListener<DisconnectListener>()

init {
val state = managedChannel.getState(false)
managedChannel.notifyWhenStateChanged(state) {
val listeners = disconnectListeners.get()
wba2hi marked this conversation as resolved.
Show resolved Hide resolved
listeners.forEach { listener ->
listener.onDisconnect()
}
}
}

/**
* Subscribes to the specified vssPath with the provided propertyObserver. Once subscribed the application will be
Expand Down
31 changes: 31 additions & 0 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DisconnectListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa

/**
* The [DisconnectListener] can be registered to [DataBrokerConnection.disconnectListeners]
* When registered it will notify about manual or unexpected connection disconnects from the DataBroker.
*/
fun interface DisconnectListener {
/**
* Will be triggered, when the connection to the DataBroker was closed manually or unexpectedly.
*/
fun onDisconnect()
}
50 changes: 50 additions & 0 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/MultiListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa

/**
* Generic Listener interface, to support multiple listeners.
*/
class MultiListener<T> {
wba2hi marked this conversation as resolved.
Show resolved Hide resolved
private var listeners: MutableSet<T> = mutableSetOf()

/**
* Adds a new [listener] and returns true if the [listener] was successfully added, returns false otherwise.
* A [listener] can only be added once.
*/
fun register(listener: T): Boolean {
return listeners.add(listener)
}

/**
* Removes a [listener] and returns true if the [listener] was successfully removed, returns false otherwise.
*/
fun unregister(listener: T): Boolean {
return listeners.remove(listener)
}

/**
* Retrieves a defensive copy of the underlying list of listeners.
*/
@JvmSynthetic
internal fun get(): List<T> {
wba2hi marked this conversation as resolved.
Show resolved Hide resolved
return listeners.toList()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.kuksa

import io.grpc.ManagedChannel
import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
import io.mockk.clearMocks
import io.mockk.mockk
import io.mockk.slot
Expand Down Expand Up @@ -178,6 +179,32 @@ class DataBrokerConnectionTest : BehaviorSpec({
}
}
}

// this test closes the connection, the connection can't be used afterward anymore
`when`("A DisconnectListener is registered successfully") {
val disconnectListener = mockk<DisconnectListener>()
val disconnectListeners = dataBrokerConnection.disconnectListeners
disconnectListeners.register(disconnectListener)

then("The number of registered DisconnectListeners should be 1") {
disconnectListeners.get().size shouldBe 1
}
`when`("Trying to register the same listener again") {
disconnectListeners.register(disconnectListener)

then("It is not added multiple times") {
disconnectListeners.get().size shouldBe 1
}
}
`when`("The Connection is closed manually") {
dataBrokerConnection.disconnect()

then("The DisconnectListener is triggered") {
verify { disconnectListener.onDisconnect() }
}
}
}
// connection is closed at this point
}
given("A DataBrokerConnection with a mocked ManagedChannel") {
val managedChannel = mockk<ManagedChannel>(relaxed = true)
Expand Down
Loading