diff --git a/android/app/src/main/kotlin/io/rebble/cobble/bluetooth/DeviceTransport.kt b/android/app/src/main/kotlin/io/rebble/cobble/bluetooth/DeviceTransport.kt index b343b9ce..39bf0919 100644 --- a/android/app/src/main/kotlin/io/rebble/cobble/bluetooth/DeviceTransport.kt +++ b/android/app/src/main/kotlin/io/rebble/cobble/bluetooth/DeviceTransport.kt @@ -45,7 +45,6 @@ class DeviceTransport @Inject constructor( val driver = getTargetTransport(bluetoothDevice) this@DeviceTransport.driver = driver - return driver.startSingleWatchConnection(bluetoothDevice) } @@ -59,7 +58,7 @@ class DeviceTransport @Inject constructor( incomingPacketsListener.receivedPackets ) } - btDevice?.type == BluetoothDevice.DEVICE_TYPE_LE -> { // LE only device + btDevice?.type == BluetoothDevice.DEVICE_TYPE_LE || btDevice?.type == BluetoothDevice.DEVICE_TYPE_DUAL -> { // LE device BlueLEDriver( context = context, protocolHandler = protocolHandler diff --git a/android/app/src/main/kotlin/io/rebble/cobble/service/ServiceLifecycleControl.kt b/android/app/src/main/kotlin/io/rebble/cobble/service/ServiceLifecycleControl.kt index df204e73..32ac6af4 100644 --- a/android/app/src/main/kotlin/io/rebble/cobble/service/ServiceLifecycleControl.kt +++ b/android/app/src/main/kotlin/io/rebble/cobble/service/ServiceLifecycleControl.kt @@ -27,7 +27,8 @@ class ServiceLifecycleControl @Inject constructor( connectionLooper.connectionState.collect { Timber.d("Watch connection status %s", it) - val shouldServiceBeRunning = it !is ConnectionState.Disconnected + //val shouldServiceBeRunning = it !is ConnectionState.Disconnected + val shouldServiceBeRunning = true if (shouldServiceBeRunning != serviceRunning) { if (shouldServiceBeRunning) { diff --git a/android/app/src/main/kotlin/io/rebble/cobble/service/WatchService.kt b/android/app/src/main/kotlin/io/rebble/cobble/service/WatchService.kt index de15fbbb..0733b1bc 100644 --- a/android/app/src/main/kotlin/io/rebble/cobble/service/WatchService.kt +++ b/android/app/src/main/kotlin/io/rebble/cobble/service/WatchService.kt @@ -2,6 +2,7 @@ package io.rebble.cobble.service import android.app.PendingIntent import android.bluetooth.BluetoothAdapter +import android.bluetooth.BluetoothManager import android.content.Intent import android.os.Build import androidx.annotation.DrawableRes @@ -11,11 +12,17 @@ import androidx.lifecycle.lifecycleScope import io.rebble.cobble.* import io.rebble.cobble.bluetooth.ConnectionLooper import io.rebble.cobble.bluetooth.ConnectionState +import io.rebble.cobble.bluetooth.ble.DummyService +import io.rebble.cobble.bluetooth.ble.GattServerImpl +import io.rebble.cobble.bluetooth.ble.GattServerManager +import io.rebble.cobble.bluetooth.ble.PPoGService import io.rebble.cobble.handlers.CobbleHandler import io.rebble.libpebblecommon.ProtocolHandler import io.rebble.libpebblecommon.services.notification.NotificationService import kotlinx.coroutines.* import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import timber.log.Timber import javax.inject.Provider @@ -66,6 +73,11 @@ class WatchService : LifecycleService() { return START_STICKY } + override fun onDestroy() { + GattServerManager.close() + super.onDestroy() + } + private fun startNotificationLoop() { coroutineScope.launch { Timber.d("Notification Loop start") diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt index 4f625b57..c638d921 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt @@ -10,11 +10,8 @@ import io.rebble.cobble.bluetooth.SingleConnectionStatus import io.rebble.cobble.bluetooth.workarounds.UnboundWatchBeforeConnecting import io.rebble.cobble.bluetooth.workarounds.WorkaroundDescriptor import io.rebble.libpebblecommon.ProtocolHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -import kotlinx.coroutines.withTimeout import timber.log.Timber import java.io.IOException import kotlin.coroutines.CoroutineContext @@ -38,12 +35,12 @@ class BlueLEDriver( require(!device.emulated) require(device.bluetoothDevice != null) return flow { + GattServerManager.initIfNeeded(context, scope) val gatt = device.bluetoothDevice.connectGatt(context, workaroundResolver(UnboundWatchBeforeConnecting)) ?: throw IOException("Failed to connect to device") emit(SingleConnectionStatus.Connecting(device)) val connector = PebbleLEConnector(gatt, context, scope) var success = false - check(PPoGLinkStateManager.getState(device.address).value == PPoGLinkState.Closed) { "Device is already connected" } connector.connect().collect { when (it) { PebbleLEConnector.ConnectorState.CONNECTING -> Timber.d("PebbleLEConnector is connecting") @@ -56,10 +53,41 @@ class BlueLEDriver( } } check(success) { "Failed to connect to watch" } - withTimeout(10000) { - PPoGLinkStateManager.getState(device.address).first { it == PPoGLinkState.SessionOpen } + GattServerManager.getGattServer()?.getServer()?.connect(device.bluetoothDevice, true) + try { + withTimeout(10000) { + val result = PPoGLinkStateManager.getState(device.address).first { it != PPoGLinkState.ReadyForSession } + if (result == PPoGLinkState.SessionOpen) { + Timber.d("Session established") + emit(SingleConnectionStatus.Connected(device)) + } else { + throw IOException("Failed to establish session") + } + } + } catch (e: TimeoutCancellationException) { + throw IOException("Failed to establish session, timeout") + } + + val sendLoop = scope.launch { + protocolHandler.startPacketSendingLoop { + Timber.v("Sending packet") + GattServerManager.ppogService!!.emitPacket(device.bluetoothDevice, it.asByteArray()) + Timber.v("Sent packet") + return@startPacketSendingLoop true + } + } + GattServerManager.ppogService?.rxFlowFor(device.bluetoothDevice)!!.collect { + when (it) { + is PPoGService.PPoGConnectionEvent.PacketReceived -> { + protocolHandler.receivePacket(it.packet.asUByteArray()) + } + is PPoGService.PPoGConnectionEvent.LinkError -> { + Timber.e(it.error, "Link error") + throw it.error + } + } } - emit(SingleConnectionStatus.Connected(device)) + sendLoop.cancel() } } } \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt index e0791a45..6ce1e3cd 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt @@ -4,9 +4,11 @@ import android.bluetooth.BluetoothDevice import android.bluetooth.BluetoothGattCharacteristic import android.bluetooth.BluetoothGattServer import kotlinx.coroutines.flow.Flow +import java.io.Closeable -interface GattServer { +interface GattServer: Closeable { fun getServer(): BluetoothGattServer? fun getFlow(): Flow + fun isOpened(): Boolean suspend fun notifyCharacteristicChanged(device: BluetoothDevice, characteristic: BluetoothGattCharacteristic, confirm: Boolean, value: ByteArray) } \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerImpl.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerImpl.kt index 49f17754..21a9793d 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerImpl.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerImpl.kt @@ -5,10 +5,7 @@ import android.bluetooth.* import android.content.Context import android.os.Build import androidx.annotation.RequiresPermission -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.actor import kotlinx.coroutines.channels.awaitClose @@ -40,13 +37,16 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val Timber.w("Event received while listening disabled: onConnectionStateChange") return } - trySend(ConnectionStateEvent(device, status, newState)) + val newStateDecoded = GattConnectionState.fromInt(newState) + Timber.v("onConnectionStateChange: $device, $status, $newStateDecoded") + trySend(ConnectionStateEvent(device, status, newStateDecoded)) } override fun onCharacteristicReadRequest(device: BluetoothDevice, requestId: Int, offset: Int, characteristic: BluetoothGattCharacteristic) { if (!listeningEnabled) { Timber.w("Event received while listening disabled: onCharacteristicReadRequest") return } + Timber.v("onCharacteristicReadRequest: $device, $requestId, $offset, ${characteristic.uuid}") trySend(CharacteristicReadEvent(device, requestId, offset, characteristic) { data -> try { openServer?.sendResponse(device, requestId, data.status, data.offset, data.value) @@ -61,6 +61,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val Timber.w("Event received while listening disabled: onCharacteristicWriteRequest") return } + Timber.v("onCharacteristicWriteRequest: $device, $requestId, ${characteristic.uuid}, $preparedWrite, $responseNeeded, $offset, $value") trySend(CharacteristicWriteEvent(device, requestId, characteristic, preparedWrite, responseNeeded, offset, value) { status -> try { openServer?.sendResponse(device, requestId, status, offset, null) @@ -75,6 +76,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val Timber.w("Event received while listening disabled: onDescriptorReadRequest") return } + Timber.v("onDescriptorReadRequest: $device, $requestId, $offset, ${descriptor?.characteristic?.uuid}->${descriptor?.uuid}") trySend(DescriptorReadEvent(device!!, requestId, offset, descriptor!!) { data -> try { openServer?.sendResponse(device, requestId, data.status, data.offset, data.value) @@ -89,6 +91,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val Timber.w("Event received while listening disabled: onDescriptorWriteRequest") return } + Timber.v("onDescriptorWriteRequest: $device, $requestId, ${descriptor?.characteristic?.uuid}->${descriptor?.uuid}, $preparedWrite, $responseNeeded, $offset, $value") trySend(DescriptorWriteEvent(device!!, requestId, descriptor!!, offset, value ?: byteArrayOf()) { status -> try { openServer?.sendResponse(device, requestId, status, offset, null) @@ -103,6 +106,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val Timber.w("Event received while listening disabled: onNotificationSent") return } + Timber.v("onNotificationSent: $device, $status") trySend(NotificationSentEvent(device!!, status)) } @@ -111,14 +115,17 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val Timber.w("Event received while listening disabled: onMtuChanged") return } + Timber.v("onMtuChanged: $device, $mtu") trySend(MtuChangedEvent(device!!, mtu)) } override fun onServiceAdded(status: Int, service: BluetoothGattService?) { + Timber.v("onServiceAdded: $status, ${service?.uuid}") serviceAddedChannel.trySend(ServiceAddedEvent(status, service)) } } openServer = bluetoothManager.openGattServer(context, callbacks) + openServer.clearServices() services.forEach { check(serviceAddedChannel.isEmpty) { "Service added event not consumed" } val service = it.register(serverFlow) @@ -132,7 +139,10 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val server = openServer send(ServerInitializedEvent(this@GattServerImpl)) listeningEnabled = true - awaitClose { openServer.close() } + awaitClose { + openServer.close() + server = null + } } private val serverActor = scope.actor { @@ -169,4 +179,14 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val override fun getFlow(): Flow { return serverFlow } + + override fun isOpened(): Boolean { + return server != null + } + + override fun close() { + scope.cancel("GattServerImpl closed") + server?.close() + serverActor.close() + } } \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerManager.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerManager.kt new file mode 100644 index 00000000..4df86ecf --- /dev/null +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerManager.kt @@ -0,0 +1,45 @@ +package io.rebble.cobble.bluetooth.ble + +import android.bluetooth.BluetoothManager +import android.content.Context +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import timber.log.Timber + +object GattServerManager { + private var gattServer: GattServer? = null + private var gattServerJob: Job? = null + private var _ppogService: PPoGService? = null + val ppogService: PPoGService? + get() = _ppogService + + fun getGattServer(): GattServer? { + return gattServer + } + + fun initIfNeeded(context: Context, scope: CoroutineScope): GattServer { + if (gattServer?.isOpened() != true || gattServerJob?.isActive != true) { + gattServer?.close() + _ppogService = PPoGService(scope) + gattServer = GattServerImpl( + context.getSystemService(BluetoothManager::class.java)!!, + context, + listOf(ppogService!!, DummyService()) + ) + } + gattServerJob = gattServer!!.getFlow().onEach { + Timber.v("Server state: $it") + }.launchIn(scope) + return gattServer!! + } + + fun close() { + gattServer?.close() + gattServerJob?.cancel() + gattServer = null + gattServerJob = null + } + +} \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerTypes.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerTypes.kt index ae9a116b..38b7ef0d 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerTypes.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServerTypes.kt @@ -11,7 +11,19 @@ class ServiceAddedEvent(val status: Int, val service: BluetoothGattService?) : S class ServerInitializedEvent(val server: GattServer) : ServerEvent open class ServiceEvent(val device: BluetoothDevice) : ServerEvent -class ConnectionStateEvent(device: BluetoothDevice, val status: Int, val newState: Int) : ServiceEvent(device) +class ConnectionStateEvent(device: BluetoothDevice, val status: Int, val newState: GattConnectionState) : ServiceEvent(device) +enum class GattConnectionState(val value: Int) { + Disconnected(BluetoothGatt.STATE_DISCONNECTED), + Connecting(BluetoothGatt.STATE_CONNECTING), + Connected(BluetoothGatt.STATE_CONNECTED), + Disconnecting(BluetoothGatt.STATE_DISCONNECTING); + + companion object { + fun fromInt(value: Int): GattConnectionState { + return entries.firstOrNull { it.value == value } ?: throw IllegalArgumentException("Unknown connection state: $value") + } + } +} class CharacteristicReadEvent(device: BluetoothDevice, val requestId: Int, val offset: Int, val characteristic: BluetoothGattCharacteristic, val respond: (CharacteristicResponse) -> Unit) : ServiceEvent(device) class CharacteristicWriteEvent(device: BluetoothDevice, val requestId: Int, val characteristic: BluetoothGattCharacteristic, val preparedWrite: Boolean, val responseNeeded: Boolean, val offset: Int, val value: ByteArray, val respond: (Int) -> Unit) : ServiceEvent(device) class CharacteristicResponse(val status: Int, val offset: Int, val value: ByteArray) { diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt index d72c0d52..95319e63 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt @@ -14,8 +14,8 @@ import kotlin.jvm.Throws class PPoGPacketWriter(private val scope: CoroutineScope, private val stateManager: PPoGSession.StateManager, private val onTimeout: () -> Unit): Closeable { private var metaWaitingToSend: GATTPacket? = null - private val dataWaitingToSend: LinkedList = LinkedList() - private val inflightPackets: LinkedList = LinkedList() + val dataWaitingToSend: LinkedList = LinkedList() + val inflightPackets: LinkedList = LinkedList() var txWindow = 1 private var timeoutJob: Job? = null private val _packetWriteFlow = MutableSharedFlow() @@ -55,22 +55,23 @@ class PPoGPacketWriter(private val scope: CoroutineScope, private val stateManag break } } - if (!inflightPackets.contains(packet)) { + if (inflightPackets.find { it.sequence == packet.sequence } == null) { Timber.w("Received ACK for packet not in flight") return } var ackedPacket: GATTPacket? = null // remove packets until the acked packet - while (ackedPacket != packet) { + while (ackedPacket?.sequence != packet.sequence) { ackedPacket = inflightPackets.poll() + check(ackedPacket != null) { "Polled inflightPackets to empty" } } sendNextPacket() rescheduleTimeout() } @Throws(SecurityException::class) - private suspend fun sendNextPacket() { + suspend fun sendNextPacket() { if (metaWaitingToSend == null && dataWaitingToSend.isEmpty()) { return } diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt index bc299277..d6eca6b2 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt @@ -34,7 +34,7 @@ class PPoGPebblePacketAssembler { if (!data!!.hasRemaining()) { data!!.flip() - val packet = PebblePacket.deserialize(data!!.array().asUByteArray()) + val packet = data!!.array().clone() emit(packet) clear() } diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt index f9cf996d..083e3ec2 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt @@ -46,7 +46,7 @@ class PPoGService(private val scope: CoroutineScope) : GattService { private val ppogConnections = mutableMapOf() private var gattServer: GattServer? = null private val deviceRxFlow = MutableSharedFlow(replay = 1) - private val deviceTxFlow = MutableSharedFlow>() + private val deviceTxFlow = MutableSharedFlow>() /** * Filter flow for events related to a specific device @@ -62,7 +62,7 @@ class PPoGService(private val scope: CoroutineScope) : GattService { open class PPoGConnectionEvent(val device: BluetoothDevice) { class LinkError(device: BluetoothDevice, val error: Throwable) : PPoGConnectionEvent(device) - class PacketReceived(device: BluetoothDevice, val packet: PebblePacket) : PPoGConnectionEvent(device) + class PacketReceived(device: BluetoothDevice, val packet: ByteArray) : PPoGConnectionEvent(device) } private suspend fun runService(eventFlow: SharedFlow) { @@ -78,11 +78,12 @@ class PPoGService(private val scope: CoroutineScope) : GattService { return@collect } Timber.d("Connection state changed: ${it.newState} for device ${it.device.address}") - if (it.newState == BluetoothGatt.STATE_CONNECTED) { + if (it.newState == GattConnectionState.Connected) { check(ppogConnections[it.device.address] == null) { "Connection already exists for device ${it.device.address}" } if (ppogConnections.isEmpty()) { Timber.d("Creating new connection for device ${it.device.address}") - val connectionScope = CoroutineScope(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) + val supervisor = SupervisorJob(scope.coroutineContext[Job]) + val connectionScope = CoroutineScope(scope.coroutineContext + supervisor) val connection = PPoGServiceConnection( connectionScope, this@PPoGService, @@ -110,7 +111,6 @@ class PPoGService(private val scope: CoroutineScope) : GattService { } } connection.start().collect { packet -> - Timber.v("RX ${packet.endpoint}") deviceRxFlow.emit(PPoGConnectionEvent.PacketReceived(it.device, packet)) } } @@ -119,7 +119,7 @@ class PPoGService(private val scope: CoroutineScope) : GattService { //TODO: Handle multiple connections Timber.w("Multiple connections not supported yet") } - } else if (it.newState == BluetoothGatt.STATE_DISCONNECTED) { + } else if (it.newState == GattConnectionState.Disconnected) { ppogConnections[it.device.address]?.close() ppogConnections.remove(it.device.address) } @@ -159,12 +159,10 @@ class PPoGService(private val scope: CoroutineScope) : GattService { } fun rxFlowFor(device: BluetoothDevice): Flow { - return deviceRxFlow.onEach { - Timber.d("RX ${it.device.address} ${it::class.simpleName}") - }.filter { it.device.address == device.address } + return deviceRxFlow.filter { it.device.address == device.address } } - suspend fun emitPacket(device: BluetoothDevice, packet: PebblePacket) { + suspend fun emitPacket(device: BluetoothDevice, packet: ByteArray) { deviceTxFlow.emit(Pair(device, packet)) } } \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt index a61d35df..93bda519 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt @@ -13,14 +13,13 @@ import java.io.Closeable import java.util.UUID class PPoGServiceConnection(val connectionScope: CoroutineScope, private val ppogService: PPoGService, val device: BluetoothDevice, private val deviceEventFlow: Flow): Closeable { - private val ppogSession = PPoGSession(connectionScope, this, 23) + private val ppogSession = PPoGSession(connectionScope, device, LEConstants.DEFAULT_MTU) companion object { val metaCharacteristicUUID = UUID.fromString(LEConstants.UUIDs.META_CHARACTERISTIC_SERVER) val ppogCharacteristicUUID = UUID.fromString(LEConstants.UUIDs.PPOGATT_DEVICE_CHARACTERISTIC_SERVER) val configurationDescriptorUUID = UUID.fromString(LEConstants.UUIDs.CHARACTERISTIC_CONFIGURATION_DESCRIPTOR) } private suspend fun runConnection() = deviceEventFlow.onEach { - Timber.d("Event: $it") when (it) { is CharacteristicReadEvent -> { if (it.characteristic.uuid == metaCharacteristicUUID) { @@ -32,7 +31,7 @@ class PPoGServiceConnection(val connectionScope: CoroutineScope, private val ppo } is CharacteristicWriteEvent -> { if (it.characteristic.uuid == ppogCharacteristicUUID) { - ppogSession.handleData(it.value) + ppogSession.handlePacket(it.value) } else { Timber.w("Unknown characteristic write request: ${it.characteristic.uuid}") it.respond(BluetoothGatt.GATT_FAILURE) @@ -47,7 +46,7 @@ class PPoGServiceConnection(val connectionScope: CoroutineScope, private val ppo } } is MtuChangedEvent -> { - ppogSession.mtu = it.mtu + ppogSession.setMTU(it.mtu) } } }.catch { @@ -59,9 +58,17 @@ class PPoGServiceConnection(val connectionScope: CoroutineScope, private val ppo return CharacteristicResponse(BluetoothGatt.GATT_SUCCESS, 0, LEConstants.SERVER_META_RESPONSE) } - suspend fun start(): Flow { + /** + * Start the connection and return a flow of received data (pebble packets) + * @return Flow of received serialized pebble packets + */ + suspend fun start(): Flow { runConnection() - return ppogSession.openPacketFlow() + return ppogSession.flow().onEach { + if (it is PPoGSession.PPoGSessionResponse.WritePPoGCharacteristic) { + it.result.complete(ppogService.sendData(device, it.data)) + } + }.filterIsInstance().map { it.packet } } @RequiresPermission("android.permission.BLUETOOTH_CONNECT") @@ -69,9 +76,8 @@ class PPoGServiceConnection(val connectionScope: CoroutineScope, private val ppo return ppogService.sendData(device, data) } - suspend fun sendPebblePacket(packet: PebblePacket) { - val data = packet.serialize().asByteArray() - ppogSession.sendData(data) + suspend fun sendPebblePacket(packet: ByteArray) { + ppogSession.sendMessage(packet) } override fun close() { connectionScope.cancel() diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt index 24b1a9e9..af3393f4 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt @@ -1,19 +1,19 @@ package io.rebble.cobble.bluetooth.ble +import android.bluetooth.BluetoothDevice import io.rebble.cobble.bluetooth.ble.util.chunked import io.rebble.libpebblecommon.ble.GATTPacket import io.rebble.libpebblecommon.protocolhelpers.PebblePacket import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.actor -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.* import timber.log.Timber import java.io.Closeable +import java.util.LinkedList import kotlin.math.min -class PPoGSession(private val scope: CoroutineScope, private val serviceConnection: PPoGServiceConnection, var mtu: Int): Closeable { +class PPoGSession(private val scope: CoroutineScope, val device: BluetoothDevice, var mtu: Int): Closeable { class PPoGSessionException(message: String) : Exception(message) private val pendingPackets = mutableMapOf() @@ -24,23 +24,99 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti private var sequenceInCursor = 0 private var sequenceOutCursor = 0 private var lastAck: GATTPacket? = null - private var delayedAckJob: Job? = null - private var delayedNACKJob: Job? = null + private val delayedAckScope = scope + Job() + private var delayedNACKScope = scope + Job() private var resetAckJob: Job? = null private var writerJob: Job? = null private var failedResetAttempts = 0 private val pebblePacketAssembler = PPoGPebblePacketAssembler() - private val rxPebblePacketChannel = Channel(Channel.BUFFERED) + private val sessionFlow = MutableSharedFlow() + private val packetRetries: MutableMap = mutableMapOf() - private val jobActor = scope.actor Unit> { - for (job in channel) { - job() + open class PPoGSessionResponse { + class PebblePacket(val packet: ByteArray) : PPoGSessionResponse() + class WritePPoGCharacteristic(val data: ByteArray, val result: CompletableDeferred) : PPoGSessionResponse() + } + open class SessionCommand { + class SendMessage(val data: ByteArray) : SessionCommand() + class HandlePacket(val packet: ByteArray) : SessionCommand() + class SetMTU(val mtu: Int) : SessionCommand() + class OnUnblocked : SessionCommand() + class DelayedAck : SessionCommand() + class DelayedNack : SessionCommand() + } + + private val sessionActor = scope.actor(capacity = 8) { + for (command in channel) { + when (command) { + is SessionCommand.SendMessage -> { + if (stateManager.state != State.Open) { + throw PPoGSessionException("Session not open") + } + val dataChunks = command.data.chunked(stateManager.mtuSize - 3) + for (chunk in dataChunks) { + val packet = GATTPacket(GATTPacket.PacketType.DATA, sequenceOutCursor, chunk) + packetWriter.sendOrQueuePacket(packet) + sequenceOutCursor = incrementSequence(sequenceOutCursor) + } + } + is SessionCommand.HandlePacket -> { + val ppogPacket = GATTPacket(command.packet) + try { + withTimeout(1000L) { + when (ppogPacket.type) { + GATTPacket.PacketType.RESET -> onResetRequest(ppogPacket) + GATTPacket.PacketType.RESET_ACK -> onResetAck(ppogPacket) + GATTPacket.PacketType.ACK -> onAck(ppogPacket) + GATTPacket.PacketType.DATA -> { + Timber.v("-> DATA ${ppogPacket.sequence}") + pendingPackets[ppogPacket.sequence] = ppogPacket + processDataQueue() + } + } + } + } catch (e: TimeoutCancellationException) { + Timber.e("Timeout while processing packet ${ppogPacket.type} ${ppogPacket.sequence}") + } + } + is SessionCommand.SetMTU -> { + mtu = command.mtu + } + is SessionCommand.OnUnblocked -> { + packetWriter.sendNextPacket() + } + is SessionCommand.DelayedAck -> { + delayedAckScope.coroutineContext.job.cancelChildren() + delayedAckScope.launch { + delay(COALESCED_ACK_DELAY_MS) + sendAck() + }.join() + } + is SessionCommand.DelayedNack -> { + delayedNACKScope.coroutineContext.job.cancelChildren() + delayedNACKScope.launch { + delay(OUT_OF_ORDER_MAX_DELAY_MS) + sendAck() + }.join() + } + } } } + fun sendMessage(data: ByteArray): Boolean = sessionActor.trySend(SessionCommand.SendMessage(data)).isSuccess + fun handlePacket(packet: ByteArray): Boolean = sessionActor.trySend(SessionCommand.HandlePacket(packet)).isSuccess + fun setMTU(mtu: Int): Boolean = sessionActor.trySend(SessionCommand.SetMTU(mtu)).isSuccess + fun onUnblocked(): Boolean = sessionActor.trySend(SessionCommand.OnUnblocked()).isSuccess + inner class StateManager { - var state: State = State.Closed + private var _state = State.Closed + var state: State + get() = _state + set(value) { + Timber.d("State changed from ${_state.name} to ${value.name}") + _state = value + } var mtuSize: Int get() = mtu set(value) {} } @@ -55,6 +131,7 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti private const val MAX_FAILED_RESETS = 3 private const val MAX_SUPPORTED_WINDOW_SIZE = 25 private const val MAX_SUPPORTED_WINDOW_SIZE_V0 = 4 + private const val MAX_NUM_RETRIES = 2 } enum class State(val allowedRxTypes: List, val allowedTxTypes: List) { @@ -67,38 +144,17 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti private fun makePacketWriter(): PPoGPacketWriter { val writer = PPoGPacketWriter(scope, stateManager) { onTimeout() } writerJob = writer.packetWriteFlow.onEach { - packetWriter.setPacketSendStatus(it, serviceConnection.writeDataRaw(it.toByteArray())) + Timber.v("<- ${it.type.name} ${it.sequence}") + val resultCompletable = CompletableDeferred() + sessionFlow.emit(PPoGSessionResponse.WritePPoGCharacteristic(it.toByteArray(), resultCompletable)) + packetWriter.setPacketSendStatus(it, resultCompletable.await()) }.launchIn(scope) return writer } - suspend fun handleData(value: ByteArray) { - val ppogPacket = GATTPacket(value) - when (ppogPacket.type) { - GATTPacket.PacketType.RESET -> onResetRequest(ppogPacket) - GATTPacket.PacketType.RESET_ACK -> onResetAck(ppogPacket) - GATTPacket.PacketType.ACK -> onAck(ppogPacket) - GATTPacket.PacketType.DATA -> { - pendingPackets[ppogPacket.sequence] = ppogPacket - processDataQueue() - } - } - } - - suspend fun sendData(data: ByteArray) { - if (stateManager.state != State.Open) { - throw PPoGSessionException("Session not open") - } - val dataChunks = data.chunked(stateManager.mtuSize - 3) - for (chunk in dataChunks) { - val packet = GATTPacket(GATTPacket.PacketType.DATA, sequenceOutCursor, data) - packetWriter.sendOrQueuePacket(packet) - sequenceOutCursor = incrementSequence(sequenceOutCursor) - } - } - private suspend fun onResetRequest(packet: GATTPacket) { require(packet.type == GATTPacket.PacketType.RESET) + Timber.v("-> RESET ${packet.sequence}") if (packet.sequence != 0) { throw PPoGSessionException("Reset packet must have sequence 0") } @@ -108,7 +164,8 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti packetWriter.rescheduleTimeout(true) resetState() val resetAckPacket = makeResetAck(sequenceOutCursor, MAX_SUPPORTED_WINDOW_SIZE, MAX_SUPPORTED_WINDOW_SIZE, ppogVersion) - sendResetAck(resetAckPacket).join() + packetWriter.sendOrQueuePacket(resetAckPacket) + stateManager.state = State.AwaitingResetAck } @@ -120,24 +177,9 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti }) } - private suspend fun sendResetAck(packet: GATTPacket): Job { - val job = scope.launch(start = CoroutineStart.LAZY) { - packetWriter.sendOrQueuePacket(packet) - } - resetAckJob = job - jobActor.send { - job.start() - try { - job.join() - } catch (e: CancellationException) { - Timber.v("Reset ACK job cancelled") - } - } - return job - } - private suspend fun onResetAck(packet: GATTPacket) { require(packet.type == GATTPacket.PacketType.RESET_ACK) + Timber.v("-> RESET_ACK ${packet.sequence}") if (packet.sequence != 0) { throw PPoGSessionException("Reset ACK packet must have sequence 0") } @@ -160,11 +202,12 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti packetWriter.txWindow = packet.getMaxTXWindow().toInt() } stateManager.state = State.Open - PPoGLinkStateManager.updateState(serviceConnection.device.address, PPoGLinkState.SessionOpen) + PPoGLinkStateManager.updateState(device.address, PPoGLinkState.SessionOpen) } private suspend fun onAck(packet: GATTPacket) { require(packet.type == GATTPacket.PacketType.ACK) + Timber.v("-> ACK ${packet.sequence}") packetWriter.onAck(packet) } @@ -186,31 +229,20 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti scheduleDelayedAck() } - private suspend fun scheduleDelayedAck() { - delayedAckJob?.cancel() - val job = scope.launch(start = CoroutineStart.LAZY) { - delay(COALESCED_ACK_DELAY_MS) - sendAck() - } - delayedAckJob = job - jobActor.send { - job.start() - try { - job.join() - } catch (e: CancellationException) { - Timber.v("Delayed ACK job cancelled") - } - } - } + private fun scheduleDelayedAck() = sessionActor.trySend(SessionCommand.DelayedAck()).isSuccess + private fun scheduleDelayedNACK() = sessionActor.trySend(SessionCommand.DelayedNack()).isSuccess /** * Send an ACK cancelling the delayed ACK job if present */ private suspend fun sendAckCancelling() { - delayedAckJob?.cancel() + delayedAckScope.coroutineContext.job.cancelChildren() sendAck() } + + var dbgLastAckSeq = -1 + /** * Send the last ACK packet */ @@ -218,6 +250,9 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti // Send ack lastAck?.let { packetsSinceLastAck = 0 + check(it.sequence != dbgLastAckSeq) { "Sending duplicate ACK for sequence ${it.sequence}" } + dbgLastAckSeq = it.sequence + Timber.d("Writing ACK for sequence ${it.sequence}") packetWriter.sendOrQueuePacket(it) } } @@ -226,13 +261,13 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti * Process received packet(s) in the queue */ private suspend fun processDataQueue() { - delayedNACKJob?.cancel() + delayedNACKScope.coroutineContext.job.cancelChildren() while (sequenceInCursor in pendingPackets) { val packet = pendingPackets.remove(sequenceInCursor)!! ack(packet.sequence) val pebblePacket = packet.data.sliceArray(1 until packet.data.size) pebblePacketAssembler.assemble(pebblePacket).collect { - rxPebblePacketChannel.send(it) + sessionFlow.emit(PPoGSessionResponse.PebblePacket(it)) } sequenceInCursor = incrementSequence(sequenceInCursor) } @@ -242,34 +277,14 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti } } - private suspend fun scheduleDelayedNACK() { - delayedNACKJob?.cancel() - val job = scope.launch(start = CoroutineStart.LAZY) { - delay(OUT_OF_ORDER_MAX_DELAY_MS) - if (pendingPackets.isNotEmpty()) { - pendingPackets.clear() - sendAck() - } - } - delayedNACKJob = job - jobActor.send { - job.start() - try { - job.join() - } catch (e: CancellationException) { - Timber.v("Delayed NACK job cancelled") - } - } - } - private fun resetState() { sequenceInCursor = 0 sequenceOutCursor = 0 packetWriter.close() writerJob?.cancel() packetWriter = makePacketWriter() - delayedNACKJob?.cancel() - delayedAckJob?.cancel() + delayedNACKScope.coroutineContext.job.cancelChildren() + delayedAckScope.coroutineContext.job.cancelChildren() } private suspend fun requestReset() { @@ -288,11 +303,25 @@ class PPoGSession(private val scope: CoroutineScope, private val serviceConnecti } requestReset() } - //TODO: handle data timeout + val packetsToResend = LinkedList() + while (true) { + val packet = packetWriter.inflightPackets.poll() ?: break + if ((packetRetries[packet] ?: 0) <= MAX_NUM_RETRIES) { + Timber.w("Packet ${packet.sequence} timed out, resending") + packetsToResend.add(packet) + packetRetries[packet] = (packetRetries[packet] ?: 0) + 1 + } else { + Timber.w("Packet ${packet.sequence} timed out too many times, resetting") + requestReset() + } + } + + for (packet in packetsToResend.reversed()) { + packetWriter.dataWaitingToSend.addFirst(packet) + } } } - - fun openPacketFlow() = rxPebblePacketChannel.consumeAsFlow() + fun flow() = sessionFlow.asSharedFlow() override fun close() { resetState() diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PebbleLEConnector.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PebbleLEConnector.kt index ebd9f358..fba70f90 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PebbleLEConnector.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PebbleLEConnector.kt @@ -46,7 +46,15 @@ class PebbleLEConnector(private val connection: BlueGATTConnection, private val throw IOException("Failed to discover services") } emit(ConnectorState.CONNECTING) - + success = connection.requestMtu(LEConstants.TARGET_MTU)?.isSuccess() == true + if (!success) { + throw IOException("Failed to request MTU") + } + val paramManager = ConnectionParamManager(connection) + success = paramManager.subscribe() + if (!success) { + Timber.w("Continuing without connection parameters management") + } val connectivityWatcher = ConnectivityWatcher(connection) success = connectivityWatcher.subscribe() if (!success) {