Skip to content

Commit

Permalink
PPoGATT connection, session handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
crc-32 committed May 21, 2024
1 parent d5b0b29 commit c197521
Show file tree
Hide file tree
Showing 8 changed files with 617 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import io.rebble.cobble.bluetooth.workarounds.WorkaroundDescriptor
import io.rebble.libpebblecommon.ProtocolHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.withTimeout
import timber.log.Timber
import java.io.IOException

Expand All @@ -27,7 +27,6 @@ class BlueLEDriver(
private val context: Context,
private val protocolHandler: ProtocolHandler,
private val scope: CoroutineScope,
private val ppogServer: PPoGService,
private val workaroundResolver: (WorkaroundDescriptor) -> Boolean
): BlueIO {
@OptIn(FlowPreview::class)
Expand All @@ -40,13 +39,23 @@ class BlueLEDriver(
?: throw IOException("Failed to connect to device")
emit(SingleConnectionStatus.Connecting(device))
val connector = PebbleLEConnector(gatt, context, scope)
var success = false
connector.connect().collect {
when (it) {
PebbleLEConnector.ConnectorState.CONNECTING -> Timber.d("PebbleLEConnector is connecting")
PebbleLEConnector.ConnectorState.PAIRING -> Timber.d("PebbleLEConnector is pairing")
PebbleLEConnector.ConnectorState.CONNECTED -> Timber.d("PebbleLEConnector connected watch, waiting for watch")
PebbleLEConnector.ConnectorState.CONNECTED -> {
Timber.d("PebbleLEConnector connected watch, waiting for watch")
PPoGLinkStateManager.updateState(device.address, PPoGLinkState.ReadyForSession)
success = true
}
}
}
check(success) { "Failed to connect to watch" }
withTimeout(10000) {
PPoGLinkStateManager.getState(device.address).first { it == PPoGLinkState.SessionOpen }
}
emit(SingleConnectionStatus.Connected(device))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package io.rebble.cobble.bluetooth.ble

import android.annotation.SuppressLint
import android.bluetooth.*
import android.content.Context
import androidx.annotation.RequiresPermission
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.shareIn
import timber.log.Timber
import java.util.UUID

class GattServer(private val bluetoothManager: BluetoothManager, private val context: Context, private val services: List<BluetoothGattService>) {
class GattServer(private val bluetoothManager: BluetoothManager, private val context: Context, private val services: List<GattService>) {
private val scope = CoroutineScope(Dispatchers.Default)
class GattServerException(message: String) : Exception(message)

@SuppressLint("MissingPermission")
val serverFlow: SharedFlow<ServerEvent> = openServer().shareIn(scope, SharingStarted.Lazily, replay = 1)
@OptIn(ExperimentalCoroutinesApi::class)
@RequiresPermission("android.permission.BLUETOOTH_CONNECT")
fun openServer() = callbackFlow {
Expand Down Expand Up @@ -105,7 +115,8 @@ class GattServer(private val bluetoothManager: BluetoothManager, private val con
openServer = bluetoothManager.openGattServer(context, callbacks)
services.forEach {
check(serviceAddedChannel.isEmpty) { "Service added event not consumed" }
if (!openServer.addService(it)) {
val service = it.register(serverFlow)
if (!openServer.addService(service)) {
throw GattServerException("Failed to request add service")
}
if (serviceAddedChannel.receive().status != BluetoothGatt.GATT_SUCCESS) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.rebble.cobble.bluetooth.ble

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow

object PPoGLinkStateManager {
private val states = mutableMapOf<String, Channel<PPoGLinkState>>()

fun getState(deviceAddress: String): Flow<PPoGLinkState> {
return states.getOrPut(deviceAddress) {
Channel(Channel.BUFFERED)
}.consumeAsFlow()
}

fun removeState(deviceAddress: String) {
states.remove(deviceAddress)
}

fun updateState(deviceAddress: String, state: PPoGLinkState) {
states.getOrPut(deviceAddress) {
Channel(Channel.BUFFERED)
}.trySend(state)
}
}

enum class PPoGLinkState {
Closed,
ReadyForSession,
SessionOpen
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.rebble.cobble.bluetooth.ble

import androidx.annotation.RequiresPermission
import io.rebble.libpebblecommon.ble.GATTPacket
import kotlinx.coroutines.*
import timber.log.Timber
import java.io.Closeable
import java.util.LinkedList
import kotlin.jvm.Throws

class PPoGPacketWriter(private val scope: CoroutineScope, private val stateManager: PPoGSession.StateManager, private val serviceConnection: PPoGServiceConnection, private val onTimeout: () -> Unit): Closeable {
private var metaWaitingToSend: GATTPacket? = null
private val dataWaitingToSend: LinkedList<GATTPacket> = LinkedList()
private val inflightPackets: LinkedList<GATTPacket> = LinkedList()
var txWindow = 1
private var timeoutJob: Job? = null

companion object {
private const val PACKET_ACK_TIMEOUT_MILLIS = 10_000L
}

suspend fun sendOrQueuePacket(packet: GATTPacket) {
if (packet.type == GATTPacket.PacketType.DATA) {
dataWaitingToSend.add(packet)
} else {
metaWaitingToSend = packet
}
sendNextPacket()
}

fun cancelTimeout() {
timeoutJob?.cancel()
}

suspend fun onAck(packet: GATTPacket) {
require(packet.type == GATTPacket.PacketType.ACK)
for (waitingPacket in dataWaitingToSend.iterator()) {
if (waitingPacket.sequence == packet.sequence) {
dataWaitingToSend.remove(waitingPacket)
break
}
}
if (!inflightPackets.contains(packet)) {
Timber.w("Received ACK for packet not in flight")
return
}
var ackedPacket: GATTPacket? = null

// remove packets until the acked packet
while (ackedPacket != packet) {
ackedPacket = inflightPackets.poll()
}
sendNextPacket()
rescheduleTimeout()
}

@Throws(SecurityException::class)
private suspend fun sendNextPacket() {
if (metaWaitingToSend == null && dataWaitingToSend.isEmpty()) {
return
}

val packet = if (metaWaitingToSend != null) {
metaWaitingToSend
} else {
if (inflightPackets.size > txWindow) {
return
} else {
dataWaitingToSend.peek()
}
}

if (packet == null) {
return
}

if (packet.type !in stateManager.state.allowedTxTypes) {
Timber.e("Attempted to send packet of type ${packet.type} in state ${stateManager.state}")
return
}

if (!sendPacket(packet)) {
return
}

if (packet.type == GATTPacket.PacketType.DATA) {
dataWaitingToSend.poll()
inflightPackets.offer(packet)
} else {
metaWaitingToSend = null
}

rescheduleTimeout()

sendNextPacket()
}

fun rescheduleTimeout(force: Boolean = false) {
timeoutJob?.cancel()
if (inflightPackets.isNotEmpty() || force) {
timeoutJob = scope.launch {
delay(PACKET_ACK_TIMEOUT_MILLIS)
onTimeout()
}
}
}

@RequiresPermission("android.permission.BLUETOOTH_CONNECT")
private suspend fun sendPacket(packet: GATTPacket): Boolean {
val data = packet.toByteArray()
require(data.size > stateManager.mtuSize) {"Packet too large to send: ${data.size} > ${stateManager.mtuSize}"}
return serviceConnection.writeData(data)
}

override fun close() {
timeoutJob?.cancel()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.rebble.cobble.bluetooth.ble

import io.rebble.libpebblecommon.protocolhelpers.PebblePacket
import io.rebble.libpebblecommon.structmapper.SUShort
import io.rebble.libpebblecommon.structmapper.StructMapper
import io.rebble.libpebblecommon.util.DataBuffer
import kotlinx.coroutines.flow.flow
import java.nio.ByteBuffer
import kotlin.math.min

class PPoGPebblePacketAssembler {
private var data: ByteBuffer? = null

/**
* Emits one or more [PebblePacket]s if the data is complete.
*/
fun assemble(dataToAdd: ByteArray) = flow {
val dataToAddBuf = ByteBuffer.wrap(dataToAdd)
while (dataToAddBuf.hasRemaining()) {
if (data == null) {
if (dataToAddBuf.remaining() < 4) {
throw PPoGPebblePacketAssemblyException("Not enough data for header")
}
beginAssembly(dataToAddBuf.slice())
dataToAddBuf.position(dataToAddBuf.position() + 4)
}

val remaining = data!!.remaining()
val toRead = min(remaining, dataToAddBuf.remaining())
data!!.put(dataToAddBuf.array(), dataToAddBuf.position(), toRead)
dataToAddBuf.position(dataToAddBuf.position() + toRead)

if (data!!.remaining() == 0) {
data!!.flip()
val packet = PebblePacket.deserialize(data!!.array().toUByteArray())
emit(packet)
clear()
}
}
}

private fun beginAssembly(headerSlice: ByteBuffer) {
val meta = StructMapper()
val length = SUShort(meta)
val ep = SUShort(meta)
meta.fromBytes(DataBuffer(headerSlice.array().asUByteArray()))
val packetLength = length.get()
data = ByteBuffer.allocate(packetLength.toInt())
}

fun clear() {
data = null
}
}

class PPoGPebblePacketAssemblyException(message: String) : Exception(message)
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package io.rebble.cobble.bluetooth.ble

import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattServer
import android.bluetooth.BluetoothGattService
import android.bluetooth.BluetoothStatusCodes
import android.os.Build
import androidx.annotation.RequiresPermission
import io.rebble.cobble.bluetooth.ble.util.GattCharacteristicBuilder
import io.rebble.cobble.bluetooth.ble.util.GattDescriptorBuilder
import io.rebble.cobble.bluetooth.ble.util.GattServiceBuilder
import io.rebble.libpebblecommon.ble.LEConstants
import io.rebble.libpebblecommon.protocolhelpers.PebblePacket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
Expand All @@ -16,7 +22,7 @@ import kotlinx.coroutines.launch
import timber.log.Timber
import java.util.UUID

class PPoGService(private val scope: CoroutineScope) : GattService {
class PPoGService(private val scope: CoroutineScope, private val onPebblePacket: (PebblePacket, BluetoothDevice) -> Unit) : GattService {
private val dataCharacteristic = GattCharacteristicBuilder()
.withUuid(UUID.fromString(LEConstants.UUIDs.PPOGATT_DEVICE_CHARACTERISTIC_SERVER))
.withProperties(BluetoothGattCharacteristic.PROPERTY_NOTIFY, BluetoothGattCharacteristic.PROPERTY_WRITE_NO_RESPONSE)
Expand All @@ -43,6 +49,7 @@ class PPoGService(private val scope: CoroutineScope) : GattService {
.build()

private val ppogConnections = mutableMapOf<String, PPoGServiceConnection>()
private var gattServer: BluetoothGattServer? = null

/**
* Filter flow for events related to a specific device
Expand All @@ -59,16 +66,29 @@ class PPoGService(private val scope: CoroutineScope) : GattService {
private suspend fun runService(eventFlow: Flow<ServerEvent>) {
eventFlow.collect {
when (it) {
is ServerInitializedEvent -> {
gattServer = it.server
}
is ConnectionStateEvent -> {
if (gattServer == null) {
Timber.w("Server not initialized yet")
return@collect
}
Timber.d("Connection state changed: ${it.newState} for device ${it.device.address}")
if (it.newState == BluetoothGatt.STATE_CONNECTED) {
check(ppogConnections[it.device.address] == null) { "Connection already exists for device ${it.device.address}" }
if (ppogConnections.isEmpty()) {
val connection = PPoGServiceConnection(this, it.device)
connection.start(eventFlow
.filterIsInstance<ServiceEvent>()
.filter(filterFlowForDevice(it.device.address))
)
val connection = PPoGServiceConnection(
scope,
this,
it.device,
eventFlow
.filterIsInstance<ServiceEvent>()
.filter(filterFlowForDevice(it.device.address))
) { packet ->
onPebblePacket(packet, it.device)
}
connection.start()
ppogConnections[it.device.address] = connection
} else {
//TODO: Handle multiple connections
Expand All @@ -83,11 +103,23 @@ class PPoGService(private val scope: CoroutineScope) : GattService {
}
}

@RequiresPermission("android.permission.BLUETOOTH_CONNECT")
fun sendData(device: BluetoothDevice, data: ByteArray): Boolean {
gattServer?.let {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
return it.notifyCharacteristicChanged(device, dataCharacteristic, false, data) == BluetoothStatusCodes.SUCCESS
} else {
dataCharacteristic.value = data
return it.notifyCharacteristicChanged(device, dataCharacteristic, false)
}
} ?: Timber.w("Tried to send data before server was initialized")
return false
}

override fun register(eventFlow: Flow<ServerEvent>): BluetoothGattService {
scope.launch {
runService(eventFlow)
}
return bluetoothGattService
}

}
Loading

0 comments on commit c197521

Please sign in to comment.