Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify about DataBroker Disconnect #14

Merged
merged 7 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.kuksa.CoroutineCallback;
import org.eclipse.kuksa.DataBrokerConnection;
import org.eclipse.kuksa.DataBrokerConnector;
import org.eclipse.kuksa.DisconnectListener;
import org.eclipse.kuksa.PropertyObserver;
import org.eclipse.kuksa.TimeoutConfig;
import org.eclipse.kuksa.model.Property;
Expand All @@ -41,7 +42,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand All @@ -62,6 +65,8 @@ public class JavaDataBrokerEngine implements DataBrokerEngine {
@Nullable
private DataBrokerConnection dataBrokerConnection = null;

private final Set<DisconnectListener> disconnectListeners = new HashSet<>();

public JavaDataBrokerEngine(@NonNull AssetManager assetManager) {
this.assetManager = assetManager;
}
Expand Down Expand Up @@ -135,7 +140,12 @@ private void connect(
connector.connect(new CoroutineCallback<>() {
@Override
public void onSuccess(@Nullable DataBrokerConnection result) {
if (result == null) return;

JavaDataBrokerEngine.this.dataBrokerConnection = result;
for (DisconnectListener listener : disconnectListeners) {
result.getDisconnectListeners().register(listener);
}

callback.onSuccess(result);
}
Expand Down Expand Up @@ -186,6 +196,7 @@ public void disconnect() {
}

dataBrokerConnection.disconnect();
dataBrokerConnection = null;
}

@Nullable
Expand All @@ -198,4 +209,20 @@ public DataBrokerConnection getDataBrokerConnection() {
public void setDataBrokerConnection(@Nullable DataBrokerConnection dataBrokerConnection) {
this.dataBrokerConnection = dataBrokerConnection;
}

@Override
public void registerDisconnectListener(@NonNull DisconnectListener listener) {
disconnectListeners.add(listener);
if (dataBrokerConnection != null) {
dataBrokerConnection.getDisconnectListeners().register(listener);
}
}

@Override
public void unregisterDisconnectListener(@NonNull DisconnectListener listener) {
disconnectListeners.remove(listener);
if (dataBrokerConnection != null) {
dataBrokerConnection.getDisconnectListeners().unregister(listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import androidx.compose.ui.Modifier
import androidx.lifecycle.lifecycleScope
import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.extension.metadata
import org.eclipse.kuksa.model.Property
Expand Down Expand Up @@ -61,20 +62,27 @@ class KuksaDataBrokerActivity : ComponentActivity() {

private val dataBrokerConnectionCallback = object : CoroutineCallback<DataBrokerConnection>() {
override fun onSuccess(result: DataBrokerConnection?) {
outputViewModel.appendOutput("Connection to data broker was successful")
outputViewModel.appendOutput("Connection to DataBroker successful established")
connectionViewModel.updateConnectionState(ConnectionViewState.CONNECTED)
}

override fun onError(error: Throwable) {
outputViewModel.appendOutput("Connection to data broker failed: ${error.message}")
outputViewModel.appendOutput("Connection to DataBroker failed: ${error.message}")
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
}
}

private val onDisconnectListener = DisconnectListener {
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
outputViewModel.clear()
outputViewModel.appendOutput("DataBroker disconnected")
}

private lateinit var dataBrokerEngine: DataBrokerEngine
private val kotlinDataBrokerEngine by lazy {
KotlinDataBrokerEngine(lifecycleScope, assets)
}

private val javaDataBrokerEngine by lazy {
JavaDataBrokerEngine(assets)
}
Expand Down Expand Up @@ -125,20 +133,26 @@ class KuksaDataBrokerActivity : ComponentActivity() {
}
}

override fun onDestroy() {
super.onDestroy()

dataBrokerEngine.unregisterDisconnectListener(onDisconnectListener)
}

private fun connect(connectionInfo: ConnectionInfo) {
Log.d(TAG, "Connecting to DataBroker: $connectionInfo")

outputViewModel.appendOutput("Connecting to data broker - Please wait")
connectionViewModel.updateConnectionState(ConnectionViewState.CONNECTING)

dataBrokerEngine.registerDisconnectListener(onDisconnectListener)
dataBrokerEngine.connect(connectionInfo, dataBrokerConnectionCallback)
}

private fun disconnect() {
Log.d(TAG, "Disconnecting from DataBroker")
dataBrokerEngine.disconnect()
outputViewModel.clear()
connectionViewModel.updateConnectionState(ConnectionViewState.DISCONNECTED)
dataBrokerEngine.unregisterDisconnectListener(onDisconnectListener)
}

private fun fetchProperty(property: Property) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.kuksa.testapp.databroker

import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.proto.v1.KuksaValV1
Expand All @@ -41,4 +42,8 @@ interface DataBrokerEngine {

fun subscribe(property: Property, propertyObserver: PropertyObserver)
fun disconnect()

fun registerDisconnectListener(listener: DisconnectListener)

fun unregisterDisconnectListener(listener: DisconnectListener)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.eclipse.kuksa.CoroutineCallback
import org.eclipse.kuksa.DataBrokerConnection
import org.eclipse.kuksa.DataBrokerConnector
import org.eclipse.kuksa.DataBrokerException
import org.eclipse.kuksa.DisconnectListener
import org.eclipse.kuksa.PropertyObserver
import org.eclipse.kuksa.TimeoutConfig
import org.eclipse.kuksa.model.Property
Expand All @@ -47,6 +48,8 @@ class KotlinDataBrokerEngine(
) : DataBrokerEngine {
override var dataBrokerConnection: DataBrokerConnection? = null

private val disconnectListeners = mutableSetOf<DisconnectListener>()

override fun connect(
connectionInfo: ConnectionInfo,
callback: CoroutineCallback<DataBrokerConnection>,
Expand Down Expand Up @@ -118,6 +121,10 @@ class KotlinDataBrokerEngine(
lifecycleScope.launch {
try {
dataBrokerConnection = connector.connect()
.also { connection ->
disconnectListeners.forEach { listener -> connection.disconnectListeners.register(listener) }
}

callback.onSuccess(dataBrokerConnection)
} catch (e: DataBrokerException) {
callback.onError(e)
Expand Down Expand Up @@ -158,6 +165,17 @@ class KotlinDataBrokerEngine(

override fun disconnect() {
dataBrokerConnection?.disconnect()
dataBrokerConnection = null
}

override fun registerDisconnectListener(listener: DisconnectListener) {
disconnectListeners.add(listener)
dataBrokerConnection?.disconnectListeners?.register(listener)
}

override fun unregisterDisconnectListener(listener: DisconnectListener) {
disconnectListeners.remove(listener)
dataBrokerConnection?.disconnectListeners?.unregister(listener)
}

companion object {
Expand Down
12 changes: 12 additions & 0 deletions docs/kuksa-sdk_class-diagram.puml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package kuksa {
DataBrokerConnection -down-> PropertyObserver
DataBrokerConnection -down-> Property
DataBrokerConnection -left-> DataBrokerException
DataBrokerConnection -up-> MultiListener
MultiListener -right-> DisconnectListener
TimeoutConfig -left-* DataBrokerConnector

class DataBrokerConnector {
Expand All @@ -38,6 +40,7 @@ package kuksa {
}

class DataBrokerConnection {
+ disconnectListeners: MultiListener<DisconnectListener>
+ subscribe(List<Property>, PropertyObserver)
+ fetchProperty(Property): GetResponse
+ updateProperty(Property, Datapoint): SetResponse
Expand All @@ -54,6 +57,15 @@ package kuksa {
}

class DataBrokerException

abstract class MultiListener<T> {
+ register(T)
+ unregister(T)
}

interface DisconnectListener {
+ onDisconnect()
}
}

DataBrokerConnector -up-> ManagedChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.eclipse.kuksa

import android.util.Log
import io.grpc.ConnectivityState
import io.grpc.ManagedChannel
import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.pattern.listener.MultiListener
import org.eclipse.kuksa.proto.v1.KuksaValV1
import org.eclipse.kuksa.proto.v1.KuksaValV1.GetResponse
import org.eclipse.kuksa.proto.v1.KuksaValV1.SetResponse
Expand All @@ -45,6 +47,22 @@ class DataBrokerConnection internal constructor(
private val managedChannel: ManagedChannel,
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default,
) {
val disconnectListeners = MultiListener<DisconnectListener>()

init {
val state = managedChannel.getState(false)
managedChannel.notifyWhenStateChanged(state) {
val newState = managedChannel.getState(false)
Log.d(TAG, "DataBrokerConnection state changed: $newState")
if (newState != ConnectivityState.SHUTDOWN) {
managedChannel.shutdownNow()
}

disconnectListeners.forEach { listener ->
listener.onDisconnect()
}
}
}

/**
* Subscribes to the specified vssPath with the provided propertyObserver. Once subscribed the application will be
Expand Down Expand Up @@ -165,7 +183,7 @@ class DataBrokerConnection internal constructor(
*/
fun disconnect() {
Log.d(TAG, "disconnect() called")
managedChannel.shutdown()
managedChannel.shutdownNow()
}

private companion object {
Expand Down
33 changes: 33 additions & 0 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DisconnectListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa

import org.eclipse.kuksa.pattern.listener.Listener

/**
* The [DisconnectListener] can be registered to [DataBrokerConnection.disconnectListeners]
* When registered it will notify about manual or unexpected connection disconnects from the DataBroker.
*/
fun interface DisconnectListener : Listener {
/**
* Will be triggered, when the connection to the DataBroker was closed manually or unexpectedly.
*/
fun onDisconnect()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa.pattern.listener

/**
* Marker Interface for generic listeners.
*/
interface Listener
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa.pattern.listener

/**
* The ListenerCollection interface provides methods to register and unregister multiple listeners with a generic type.
* The underlying collection decides if the same listener can be added only once or multiple times.
*/
interface ListenerCollection<T : Listener> : Iterable<T> {
/**
* Adds a new [listener] and returns true if the [listener] was successfully added, returns false otherwise.
*/
fun register(listener: T): Boolean

/**
* Removes a [listener] and returns true if the [listener] was successfully removed, returns false otherwise.
*/
fun unregister(listener: T): Boolean
}
Loading