diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt index e96bdfdd..ca312f58 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/BlueLEDriver.kt @@ -12,8 +12,8 @@ import io.rebble.cobble.bluetooth.workarounds.WorkaroundDescriptor import io.rebble.libpebblecommon.ProtocolHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.withTimeout import timber.log.Timber import java.io.IOException @@ -27,7 +27,6 @@ class BlueLEDriver( private val context: Context, private val protocolHandler: ProtocolHandler, private val scope: CoroutineScope, - private val ppogServer: PPoGService, private val workaroundResolver: (WorkaroundDescriptor) -> Boolean ): BlueIO { @OptIn(FlowPreview::class) @@ -40,13 +39,23 @@ class BlueLEDriver( ?: throw IOException("Failed to connect to device") emit(SingleConnectionStatus.Connecting(device)) val connector = PebbleLEConnector(gatt, context, scope) + var success = false connector.connect().collect { when (it) { PebbleLEConnector.ConnectorState.CONNECTING -> Timber.d("PebbleLEConnector is connecting") PebbleLEConnector.ConnectorState.PAIRING -> Timber.d("PebbleLEConnector is pairing") - PebbleLEConnector.ConnectorState.CONNECTED -> Timber.d("PebbleLEConnector connected watch, waiting for watch") + PebbleLEConnector.ConnectorState.CONNECTED -> { + Timber.d("PebbleLEConnector connected watch, waiting for watch") + PPoGLinkStateManager.updateState(device.address, PPoGLinkState.ReadyForSession) + success = true + } } } + check(success) { "Failed to connect to watch" } + withTimeout(10000) { + PPoGLinkStateManager.getState(device.address).first { it == PPoGLinkState.SessionOpen } + } + emit(SingleConnectionStatus.Connected(device)) } } } \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt index 305c03e0..81d3c151 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/GattServer.kt @@ -1,17 +1,27 @@ package io.rebble.cobble.bluetooth.ble +import android.annotation.SuppressLint import android.bluetooth.* import android.content.Context import androidx.annotation.RequiresPermission +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.shareIn import timber.log.Timber import java.util.UUID -class GattServer(private val bluetoothManager: BluetoothManager, private val context: Context, private val services: List) { +class GattServer(private val bluetoothManager: BluetoothManager, private val context: Context, private val services: List) { + private val scope = CoroutineScope(Dispatchers.Default) class GattServerException(message: String) : Exception(message) + + @SuppressLint("MissingPermission") + val serverFlow: SharedFlow = openServer().shareIn(scope, SharingStarted.Lazily, replay = 1) @OptIn(ExperimentalCoroutinesApi::class) @RequiresPermission("android.permission.BLUETOOTH_CONNECT") fun openServer() = callbackFlow { @@ -105,7 +115,8 @@ class GattServer(private val bluetoothManager: BluetoothManager, private val con openServer = bluetoothManager.openGattServer(context, callbacks) services.forEach { check(serviceAddedChannel.isEmpty) { "Service added event not consumed" } - if (!openServer.addService(it)) { + val service = it.register(serverFlow) + if (!openServer.addService(service)) { throw GattServerException("Failed to request add service") } if (serviceAddedChannel.receive().status != BluetoothGatt.GATT_SUCCESS) { diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGLinkStateManager.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGLinkStateManager.kt new file mode 100644 index 00000000..c166aed1 --- /dev/null +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGLinkStateManager.kt @@ -0,0 +1,31 @@ +package io.rebble.cobble.bluetooth.ble + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow + +object PPoGLinkStateManager { + private val states = mutableMapOf>() + + fun getState(deviceAddress: String): Flow { + return states.getOrPut(deviceAddress) { + Channel(Channel.BUFFERED) + }.consumeAsFlow() + } + + fun removeState(deviceAddress: String) { + states.remove(deviceAddress) + } + + fun updateState(deviceAddress: String, state: PPoGLinkState) { + states.getOrPut(deviceAddress) { + Channel(Channel.BUFFERED) + }.trySend(state) + } +} + +enum class PPoGLinkState { + Closed, + ReadyForSession, + SessionOpen +} \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt new file mode 100644 index 00000000..dad27b7e --- /dev/null +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPacketWriter.kt @@ -0,0 +1,118 @@ +package io.rebble.cobble.bluetooth.ble + +import androidx.annotation.RequiresPermission +import io.rebble.libpebblecommon.ble.GATTPacket +import kotlinx.coroutines.* +import timber.log.Timber +import java.io.Closeable +import java.util.LinkedList +import kotlin.jvm.Throws + +class PPoGPacketWriter(private val scope: CoroutineScope, private val stateManager: PPoGSession.StateManager, private val serviceConnection: PPoGServiceConnection, private val onTimeout: () -> Unit): Closeable { + private var metaWaitingToSend: GATTPacket? = null + private val dataWaitingToSend: LinkedList = LinkedList() + private val inflightPackets: LinkedList = LinkedList() + var txWindow = 1 + private var timeoutJob: Job? = null + + companion object { + private const val PACKET_ACK_TIMEOUT_MILLIS = 10_000L + } + + suspend fun sendOrQueuePacket(packet: GATTPacket) { + if (packet.type == GATTPacket.PacketType.DATA) { + dataWaitingToSend.add(packet) + } else { + metaWaitingToSend = packet + } + sendNextPacket() + } + + fun cancelTimeout() { + timeoutJob?.cancel() + } + + suspend fun onAck(packet: GATTPacket) { + require(packet.type == GATTPacket.PacketType.ACK) + for (waitingPacket in dataWaitingToSend.iterator()) { + if (waitingPacket.sequence == packet.sequence) { + dataWaitingToSend.remove(waitingPacket) + break + } + } + if (!inflightPackets.contains(packet)) { + Timber.w("Received ACK for packet not in flight") + return + } + var ackedPacket: GATTPacket? = null + + // remove packets until the acked packet + while (ackedPacket != packet) { + ackedPacket = inflightPackets.poll() + } + sendNextPacket() + rescheduleTimeout() + } + + @Throws(SecurityException::class) + private suspend fun sendNextPacket() { + if (metaWaitingToSend == null && dataWaitingToSend.isEmpty()) { + return + } + + val packet = if (metaWaitingToSend != null) { + metaWaitingToSend + } else { + if (inflightPackets.size > txWindow) { + return + } else { + dataWaitingToSend.peek() + } + } + + if (packet == null) { + return + } + + if (packet.type !in stateManager.state.allowedTxTypes) { + Timber.e("Attempted to send packet of type ${packet.type} in state ${stateManager.state}") + return + } + + if (!sendPacket(packet)) { + return + } + + if (packet.type == GATTPacket.PacketType.DATA) { + dataWaitingToSend.poll() + inflightPackets.offer(packet) + } else { + metaWaitingToSend = null + } + + rescheduleTimeout() + + sendNextPacket() + } + + fun rescheduleTimeout(force: Boolean = false) { + timeoutJob?.cancel() + if (inflightPackets.isNotEmpty() || force) { + timeoutJob = scope.launch { + delay(PACKET_ACK_TIMEOUT_MILLIS) + onTimeout() + } + } + } + + @RequiresPermission("android.permission.BLUETOOTH_CONNECT") + private suspend fun sendPacket(packet: GATTPacket): Boolean { + val data = packet.toByteArray() + require(data.size > stateManager.mtuSize) {"Packet too large to send: ${data.size} > ${stateManager.mtuSize}"} + return serviceConnection.writeData(data) + } + + override fun close() { + timeoutJob?.cancel() + } +} \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt new file mode 100644 index 00000000..2d037a0f --- /dev/null +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGPebblePacketAssembler.kt @@ -0,0 +1,56 @@ +package io.rebble.cobble.bluetooth.ble + +import io.rebble.libpebblecommon.protocolhelpers.PebblePacket +import io.rebble.libpebblecommon.structmapper.SUShort +import io.rebble.libpebblecommon.structmapper.StructMapper +import io.rebble.libpebblecommon.util.DataBuffer +import kotlinx.coroutines.flow.flow +import java.nio.ByteBuffer +import kotlin.math.min + +class PPoGPebblePacketAssembler { + private var data: ByteBuffer? = null + + /** + * Emits one or more [PebblePacket]s if the data is complete. + */ + fun assemble(dataToAdd: ByteArray) = flow { + val dataToAddBuf = ByteBuffer.wrap(dataToAdd) + while (dataToAddBuf.hasRemaining()) { + if (data == null) { + if (dataToAddBuf.remaining() < 4) { + throw PPoGPebblePacketAssemblyException("Not enough data for header") + } + beginAssembly(dataToAddBuf.slice()) + dataToAddBuf.position(dataToAddBuf.position() + 4) + } + + val remaining = data!!.remaining() + val toRead = min(remaining, dataToAddBuf.remaining()) + data!!.put(dataToAddBuf.array(), dataToAddBuf.position(), toRead) + dataToAddBuf.position(dataToAddBuf.position() + toRead) + + if (data!!.remaining() == 0) { + data!!.flip() + val packet = PebblePacket.deserialize(data!!.array().toUByteArray()) + emit(packet) + clear() + } + } + } + + private fun beginAssembly(headerSlice: ByteBuffer) { + val meta = StructMapper() + val length = SUShort(meta) + val ep = SUShort(meta) + meta.fromBytes(DataBuffer(headerSlice.array().asUByteArray())) + val packetLength = length.get() + data = ByteBuffer.allocate(packetLength.toInt()) + } + + fun clear() { + data = null + } +} + +class PPoGPebblePacketAssemblyException(message: String) : Exception(message) \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt index d726c3a9..b6cf0edc 100644 --- a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGService.kt @@ -1,12 +1,18 @@ package io.rebble.cobble.bluetooth.ble +import android.bluetooth.BluetoothDevice import android.bluetooth.BluetoothGatt import android.bluetooth.BluetoothGattCharacteristic +import android.bluetooth.BluetoothGattServer import android.bluetooth.BluetoothGattService +import android.bluetooth.BluetoothStatusCodes +import android.os.Build +import androidx.annotation.RequiresPermission import io.rebble.cobble.bluetooth.ble.util.GattCharacteristicBuilder import io.rebble.cobble.bluetooth.ble.util.GattDescriptorBuilder import io.rebble.cobble.bluetooth.ble.util.GattServiceBuilder import io.rebble.libpebblecommon.ble.LEConstants +import io.rebble.libpebblecommon.protocolhelpers.PebblePacket import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow @@ -16,7 +22,7 @@ import kotlinx.coroutines.launch import timber.log.Timber import java.util.UUID -class PPoGService(private val scope: CoroutineScope) : GattService { +class PPoGService(private val scope: CoroutineScope, private val onPebblePacket: (PebblePacket, BluetoothDevice) -> Unit) : GattService { private val dataCharacteristic = GattCharacteristicBuilder() .withUuid(UUID.fromString(LEConstants.UUIDs.PPOGATT_DEVICE_CHARACTERISTIC_SERVER)) .withProperties(BluetoothGattCharacteristic.PROPERTY_NOTIFY, BluetoothGattCharacteristic.PROPERTY_WRITE_NO_RESPONSE) @@ -43,6 +49,7 @@ class PPoGService(private val scope: CoroutineScope) : GattService { .build() private val ppogConnections = mutableMapOf() + private var gattServer: BluetoothGattServer? = null /** * Filter flow for events related to a specific device @@ -59,16 +66,29 @@ class PPoGService(private val scope: CoroutineScope) : GattService { private suspend fun runService(eventFlow: Flow) { eventFlow.collect { when (it) { + is ServerInitializedEvent -> { + gattServer = it.server + } is ConnectionStateEvent -> { + if (gattServer == null) { + Timber.w("Server not initialized yet") + return@collect + } Timber.d("Connection state changed: ${it.newState} for device ${it.device.address}") if (it.newState == BluetoothGatt.STATE_CONNECTED) { check(ppogConnections[it.device.address] == null) { "Connection already exists for device ${it.device.address}" } if (ppogConnections.isEmpty()) { - val connection = PPoGServiceConnection(this, it.device) - connection.start(eventFlow - .filterIsInstance() - .filter(filterFlowForDevice(it.device.address)) - ) + val connection = PPoGServiceConnection( + scope, + this, + it.device, + eventFlow + .filterIsInstance() + .filter(filterFlowForDevice(it.device.address)) + ) { packet -> + onPebblePacket(packet, it.device) + } + connection.start() ppogConnections[it.device.address] = connection } else { //TODO: Handle multiple connections @@ -83,11 +103,23 @@ class PPoGService(private val scope: CoroutineScope) : GattService { } } + @RequiresPermission("android.permission.BLUETOOTH_CONNECT") + fun sendData(device: BluetoothDevice, data: ByteArray): Boolean { + gattServer?.let { + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) { + return it.notifyCharacteristicChanged(device, dataCharacteristic, false, data) == BluetoothStatusCodes.SUCCESS + } else { + dataCharacteristic.value = data + return it.notifyCharacteristicChanged(device, dataCharacteristic, false) + } + } ?: Timber.w("Tried to send data before server was initialized") + return false + } + override fun register(eventFlow: Flow): BluetoothGattService { scope.launch { runService(eventFlow) } return bluetoothGattService } - } \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt new file mode 100644 index 00000000..6bdd80f2 --- /dev/null +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGServiceConnection.kt @@ -0,0 +1,77 @@ +package io.rebble.cobble.bluetooth.ble + +import android.bluetooth.BluetoothDevice +import android.bluetooth.BluetoothGatt +import androidx.annotation.RequiresPermission +import io.rebble.libpebblecommon.ble.LEConstants +import io.rebble.libpebblecommon.protocolhelpers.PebblePacket +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.first +import timber.log.Timber +import java.io.Closeable + +class PPoGServiceConnection(private val scope: CoroutineScope, private val ppogService: PPoGService, val device: BluetoothDevice, private val deviceEventFlow: Flow, val onPebblePacket: suspend (PebblePacket) -> Unit): Closeable { + private var job: Job? = null + private val ppogSession = PPoGSession(scope, this, 23) + suspend fun runConnection() { + deviceEventFlow.collect { + when (it) { + is CharacteristicReadEvent -> { + if (it.characteristic.uuid.toString() == LEConstants.UUIDs.META_CHARACTERISTIC_SERVER) { + it.respond(makeMetaResponse()) + } else { + Timber.w("Unknown characteristic read request: ${it.characteristic.uuid}") + it.respond(CharacteristicResponse.Failure) + } + } + is CharacteristicWriteEvent -> { + if (it.characteristic.uuid.toString() == LEConstants.UUIDs.PPOGATT_DEVICE_CHARACTERISTIC_SERVER) { + try { + ppogSession.handleData(it.value) + it.respond(BluetoothGatt.GATT_SUCCESS) + } catch (e: Exception) { + it.respond(BluetoothGatt.GATT_FAILURE) + throw e + } + } else { + Timber.w("Unknown characteristic write request: ${it.characteristic.uuid}") + it.respond(BluetoothGatt.GATT_FAILURE) + } + } + is MtuChangedEvent -> { + ppogSession.mtu = it.mtu + } + } + } + } + + private fun makeMetaResponse(): CharacteristicResponse { + return CharacteristicResponse(BluetoothGatt.GATT_SUCCESS, 0, LEConstants.SERVER_META_RESPONSE) + } + + suspend fun start() { + job = scope.launch { + runConnection() + } + } + + @RequiresPermission("android.permission.BLUETOOTH_CONNECT") + suspend fun writeData(data: ByteArray): Boolean { + val result = CompletableDeferred() + val job = scope.launch { + val evt = deviceEventFlow.filterIsInstance().first() + result.complete(evt.status == BluetoothGatt.GATT_SUCCESS) + } + if (!ppogService.sendData(device, data)) { + job.cancel() + return false + } + return result.await() + } + override fun close() { + job?.cancel() + } +} \ No newline at end of file diff --git a/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt new file mode 100644 index 00000000..fa1425e1 --- /dev/null +++ b/android/pebble_bt_transport/src/main/java/io/rebble/cobble/bluetooth/ble/PPoGSession.kt @@ -0,0 +1,270 @@ +package io.rebble.cobble.bluetooth.ble + +import io.rebble.libpebblecommon.ble.GATTPacket +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.actor +import timber.log.Timber +import java.io.Closeable +import kotlin.math.min + +class PPoGSession(private val scope: CoroutineScope, private val serviceConnection: PPoGServiceConnection, var mtu: Int): Closeable { + class PPoGSessionException(message: String) : Exception(message) + + private val pendingPackets = mutableMapOf() + private var ppogVersion: GATTPacket.PPoGConnectionVersion = GATTPacket.PPoGConnectionVersion.ZERO + + private var rxWindow = 0 + private var packetsSinceLastAck = 0 + private var sequenceInCursor = 0 + private var sequenceOutCursor = 0 + private var lastAck: GATTPacket? = null + private var delayedAckJob: Job? = null + private var delayedNACKJob: Job? = null + private var resetAckJob: Job? = null + private var failedResetAttempts = 0 + private val pebblePacketAssembler = PPoGPebblePacketAssembler() + + private val jobActor = scope.actor Unit> { + for (job in channel) { + job() + } + } + + inner class StateManager { + var state: State = State.Closed + var mtuSize: Int get() = mtu + set(value) {} + } + + private val stateManager = StateManager() + private var packetWriter = makePacketWriter() + + companion object { + private const val MAX_SEQUENCE = 32 + private const val COALESCED_ACK_DELAY_MS = 200L + private const val OUT_OF_ORDER_MAX_DELAY_MS = 50L + private const val MAX_FAILED_RESETS = 3 + private const val MAX_SUPPORTED_WINDOW_SIZE = 25 + private const val MAX_SUPPORTED_WINDOW_SIZE_V0 = 4 + } + + enum class State(val allowedRxTypes: List, val allowedTxTypes: List) { + Closed(listOf(GATTPacket.PacketType.RESET), listOf(GATTPacket.PacketType.RESET_ACK)), + AwaitingResetAck(listOf(GATTPacket.PacketType.RESET_ACK), listOf(GATTPacket.PacketType.RESET)), + AwaitingResetAckRequested(listOf(GATTPacket.PacketType.RESET_ACK), listOf(GATTPacket.PacketType.RESET)), + Open(listOf(GATTPacket.PacketType.RESET, GATTPacket.PacketType.ACK, GATTPacket.PacketType.DATA), listOf(GATTPacket.PacketType.ACK, GATTPacket.PacketType.DATA)), + } + + private fun makePacketWriter(): PPoGPacketWriter { + return PPoGPacketWriter(scope, stateManager, serviceConnection) { onTimeout() } + } + + suspend fun handleData(value: ByteArray) { + val ppogPacket = GATTPacket(value) + when (ppogPacket.type) { + GATTPacket.PacketType.RESET -> onResetRequest(ppogPacket) + GATTPacket.PacketType.RESET_ACK -> onResetAck(ppogPacket) + GATTPacket.PacketType.ACK -> onAck(ppogPacket) + GATTPacket.PacketType.DATA -> { + pendingPackets[ppogPacket.sequence] = ppogPacket + processDataQueue() + } + } + } + + private suspend fun onResetRequest(packet: GATTPacket) { + require(packet.type == GATTPacket.PacketType.RESET) + if (packet.sequence != 0) { + throw PPoGSessionException("Reset packet must have sequence 0") + } + val nwVersion = packet.getPPoGConnectionVersion() + Timber.d("Reset requested, new PPoGATT version: ${nwVersion}") + ppogVersion = nwVersion + stateManager.state = State.AwaitingResetAck + packetWriter.rescheduleTimeout(true) + resetState() + val resetAckPacket = makeResetAck(sequenceOutCursor, MAX_SUPPORTED_WINDOW_SIZE, MAX_SUPPORTED_WINDOW_SIZE, ppogVersion) + sendResetAck(resetAckPacket) + } + + private fun makeResetAck(sequence: Int, rxWindow: Int, txWindow: Int, ppogVersion: GATTPacket.PPoGConnectionVersion): GATTPacket { + return GATTPacket(GATTPacket.PacketType.RESET_ACK, sequence, if (ppogVersion.supportsWindowNegotiation) { + byteArrayOf(rxWindow.toByte(), txWindow.toByte()) + } else { + null + }) + } + + private suspend fun sendResetAck(packet: GATTPacket) { + val job = scope.launch(start = CoroutineStart.LAZY) { + packetWriter.sendOrQueuePacket(packet) + } + resetAckJob = job + jobActor.send { + job.start() + try { + job.join() + } catch (e: CancellationException) { + Timber.v("Reset ACK job cancelled") + } + } + } + + private suspend fun onResetAck(packet: GATTPacket) { + require(packet.type == GATTPacket.PacketType.RESET_ACK) + if (packet.sequence != 0) { + throw PPoGSessionException("Reset ACK packet must have sequence 0") + } + if (stateManager.state == State.AwaitingResetAckRequested) { + packetWriter.sendOrQueuePacket(makeResetAck(0, MAX_SUPPORTED_WINDOW_SIZE, MAX_SUPPORTED_WINDOW_SIZE, ppogVersion)) + } + packetWriter.cancelTimeout() + lastAck = null + failedResetAttempts = 0 + + if (ppogVersion.supportsWindowNegotiation && !packet.hasWindowSizes()) { + Timber.i("FW claimed PPoGATT V1+ but did not send window sizes, reverting to V0") + ppogVersion = GATTPacket.PPoGConnectionVersion.ZERO + } + Timber.d("Link established, PPoGATT version: ${ppogVersion}") + if (!ppogVersion.supportsWindowNegotiation) { + rxWindow = MAX_SUPPORTED_WINDOW_SIZE_V0 + } else { + rxWindow = min(packet.getMaxRXWindow().toInt(), MAX_SUPPORTED_WINDOW_SIZE) + packetWriter.txWindow = packet.getMaxTXWindow().toInt() + } + stateManager.state = State.Open + PPoGLinkStateManager.updateState(serviceConnection.device.address, PPoGLinkState.SessionOpen) + } + + private suspend fun onAck(packet: GATTPacket) { + require(packet.type == GATTPacket.PacketType.ACK) + packetWriter.onAck(packet) + } + + private fun incrementSequence(sequence: Int): Int { + return (sequence + 1) % MAX_SEQUENCE + } + + private suspend fun ack(sequence: Int) { + lastAck = GATTPacket(GATTPacket.PacketType.ACK, sequence) + if (!ppogVersion.supportsCoalescedAcking) { + sendAckCancelling() + return + } + if (++packetsSinceLastAck >= (rxWindow / 2)) { + sendAckCancelling() + return + } + // We want to coalesce acks + scheduleDelayedAck() + } + + private suspend fun scheduleDelayedAck() { + delayedAckJob?.cancel() + val job = scope.launch(start = CoroutineStart.LAZY) { + delay(COALESCED_ACK_DELAY_MS) + sendAck() + } + delayedAckJob = job + jobActor.send { + job.start() + try { + job.join() + } catch (e: CancellationException) { + Timber.v("Delayed ACK job cancelled") + } + } + } + + /** + * Send an ACK cancelling the delayed ACK job if present + */ + private suspend fun sendAckCancelling() { + delayedAckJob?.cancel() + sendAck() + } + + /** + * Send the last ACK packet + */ + private suspend fun sendAck() { + // Send ack + lastAck?.let { + packetsSinceLastAck = 0 + packetWriter.sendOrQueuePacket(it) + } + } + + /** + * Process received packet(s) in the queue + */ + private suspend fun processDataQueue() { + delayedNACKJob?.cancel() + while (sequenceInCursor in pendingPackets) { + val packet = pendingPackets.remove(sequenceInCursor)!! + ack(packet.sequence) + pebblePacketAssembler.assemble(packet.data).collect { + serviceConnection.onPebblePacket(it) + } + sequenceInCursor = incrementSequence(sequenceInCursor) + } + if (pendingPackets.isNotEmpty()) { + // We have out of order packets, schedule a resend of last ACK + scheduleDelayedNACK() + } + } + + private suspend fun scheduleDelayedNACK() { + delayedNACKJob?.cancel() + val job = scope.launch(start = CoroutineStart.LAZY) { + delay(OUT_OF_ORDER_MAX_DELAY_MS) + if (pendingPackets.isNotEmpty()) { + pendingPackets.clear() + sendAck() + } + } + delayedNACKJob = job + jobActor.send { + job.start() + try { + job.join() + } catch (e: CancellationException) { + Timber.v("Delayed NACK job cancelled") + } + } + } + + private fun resetState() { + sequenceInCursor = 0 + sequenceOutCursor = 0 + packetWriter.close() + packetWriter = makePacketWriter() + delayedNACKJob?.cancel() + delayedAckJob?.cancel() + } + + private suspend fun requestReset() { + stateManager.state = State.AwaitingResetAckRequested + resetState() + packetWriter.rescheduleTimeout(true) + packetWriter.sendOrQueuePacket(GATTPacket(GATTPacket.PacketType.RESET, 0, byteArrayOf(ppogVersion.value))) + } + + private fun onTimeout() { + scope.launch { + if (stateManager.state in listOf(State.AwaitingResetAck, State.AwaitingResetAckRequested)) { + Timber.w("Timeout in state ${stateManager.state}, resetting") + if (++failedResetAttempts > MAX_FAILED_RESETS) { + throw PPoGSessionException("Failed to reset connection after $MAX_FAILED_RESETS attempts") + } + requestReset() + } + //TODO: handle data timeout + } + } + + override fun close() { + resetState() + } +} \ No newline at end of file