Skip to content

Commit

Permalink
PPoG neatening, patches found while working on other things
Browse files Browse the repository at this point in the history
  • Loading branch information
crc-32 committed May 29, 2024
1 parent f763462 commit 73265b9
Show file tree
Hide file tree
Showing 14 changed files with 305 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class DeviceTransport @Inject constructor(

val driver = getTargetTransport(bluetoothDevice)
this@DeviceTransport.driver = driver

return driver.startSingleWatchConnection(bluetoothDevice)
}

Expand All @@ -59,7 +58,7 @@ class DeviceTransport @Inject constructor(
incomingPacketsListener.receivedPackets
)
}
btDevice?.type == BluetoothDevice.DEVICE_TYPE_LE -> { // LE only device
btDevice?.type == BluetoothDevice.DEVICE_TYPE_LE || btDevice?.type == BluetoothDevice.DEVICE_TYPE_DUAL -> { // LE device
BlueLEDriver(
context = context,
protocolHandler = protocolHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class ServiceLifecycleControl @Inject constructor(
connectionLooper.connectionState.collect {
Timber.d("Watch connection status %s", it)

val shouldServiceBeRunning = it !is ConnectionState.Disconnected
//val shouldServiceBeRunning = it !is ConnectionState.Disconnected
val shouldServiceBeRunning = true

if (shouldServiceBeRunning != serviceRunning) {
if (shouldServiceBeRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.rebble.cobble.service

import android.app.PendingIntent
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothManager
import android.content.Intent
import android.os.Build
import androidx.annotation.DrawableRes
Expand All @@ -11,11 +12,17 @@ import androidx.lifecycle.lifecycleScope
import io.rebble.cobble.*
import io.rebble.cobble.bluetooth.ConnectionLooper
import io.rebble.cobble.bluetooth.ConnectionState
import io.rebble.cobble.bluetooth.ble.DummyService
import io.rebble.cobble.bluetooth.ble.GattServerImpl
import io.rebble.cobble.bluetooth.ble.GattServerManager
import io.rebble.cobble.bluetooth.ble.PPoGService
import io.rebble.cobble.handlers.CobbleHandler
import io.rebble.libpebblecommon.ProtocolHandler
import io.rebble.libpebblecommon.services.notification.NotificationService
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import timber.log.Timber
import javax.inject.Provider

Expand Down Expand Up @@ -66,6 +73,11 @@ class WatchService : LifecycleService() {
return START_STICKY
}

override fun onDestroy() {
GattServerManager.close()
super.onDestroy()
}

private fun startNotificationLoop() {
coroutineScope.launch {
Timber.d("Notification Loop start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ import io.rebble.cobble.bluetooth.SingleConnectionStatus
import io.rebble.cobble.bluetooth.workarounds.UnboundWatchBeforeConnecting
import io.rebble.cobble.bluetooth.workarounds.WorkaroundDescriptor
import io.rebble.libpebblecommon.ProtocolHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.withTimeout
import timber.log.Timber
import java.io.IOException
import kotlin.coroutines.CoroutineContext
Expand All @@ -38,12 +35,12 @@ class BlueLEDriver(
require(!device.emulated)
require(device.bluetoothDevice != null)
return flow {
GattServerManager.initIfNeeded(context, scope)
val gatt = device.bluetoothDevice.connectGatt(context, workaroundResolver(UnboundWatchBeforeConnecting))
?: throw IOException("Failed to connect to device")
emit(SingleConnectionStatus.Connecting(device))
val connector = PebbleLEConnector(gatt, context, scope)
var success = false
check(PPoGLinkStateManager.getState(device.address).value == PPoGLinkState.Closed) { "Device is already connected" }
connector.connect().collect {
when (it) {
PebbleLEConnector.ConnectorState.CONNECTING -> Timber.d("PebbleLEConnector is connecting")
Expand All @@ -56,10 +53,41 @@ class BlueLEDriver(
}
}
check(success) { "Failed to connect to watch" }
withTimeout(10000) {
PPoGLinkStateManager.getState(device.address).first { it == PPoGLinkState.SessionOpen }
GattServerManager.getGattServer()?.getServer()?.connect(device.bluetoothDevice, true)
try {
withTimeout(10000) {
val result = PPoGLinkStateManager.getState(device.address).first { it != PPoGLinkState.ReadyForSession }
if (result == PPoGLinkState.SessionOpen) {
Timber.d("Session established")
emit(SingleConnectionStatus.Connected(device))
} else {
throw IOException("Failed to establish session")
}
}
} catch (e: TimeoutCancellationException) {
throw IOException("Failed to establish session, timeout")
}

val sendLoop = scope.launch {
protocolHandler.startPacketSendingLoop {
Timber.v("Sending packet")
GattServerManager.ppogService!!.emitPacket(device.bluetoothDevice, it.asByteArray())
Timber.v("Sent packet")
return@startPacketSendingLoop true
}
}
GattServerManager.ppogService?.rxFlowFor(device.bluetoothDevice)!!.collect {
when (it) {
is PPoGService.PPoGConnectionEvent.PacketReceived -> {
protocolHandler.receivePacket(it.packet.asUByteArray())
}
is PPoGService.PPoGConnectionEvent.LinkError -> {
Timber.e(it.error, "Link error")
throw it.error
}
}
}
emit(SingleConnectionStatus.Connected(device))
sendLoop.cancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattServer
import kotlinx.coroutines.flow.Flow
import java.io.Closeable

interface GattServer {
interface GattServer: Closeable {
fun getServer(): BluetoothGattServer?
fun getFlow(): Flow<ServerEvent>
fun isOpened(): Boolean
suspend fun notifyCharacteristicChanged(device: BluetoothDevice, characteristic: BluetoothGattCharacteristic, confirm: Boolean, value: ByteArray)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import android.bluetooth.*
import android.content.Context
import android.os.Build
import androidx.annotation.RequiresPermission
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.awaitClose
Expand Down Expand Up @@ -40,13 +37,16 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
Timber.w("Event received while listening disabled: onConnectionStateChange")
return
}
trySend(ConnectionStateEvent(device, status, newState))
val newStateDecoded = GattConnectionState.fromInt(newState)
Timber.v("onConnectionStateChange: $device, $status, $newStateDecoded")
trySend(ConnectionStateEvent(device, status, newStateDecoded))
}
override fun onCharacteristicReadRequest(device: BluetoothDevice, requestId: Int, offset: Int, characteristic: BluetoothGattCharacteristic) {
if (!listeningEnabled) {
Timber.w("Event received while listening disabled: onCharacteristicReadRequest")
return
}
Timber.v("onCharacteristicReadRequest: $device, $requestId, $offset, ${characteristic.uuid}")
trySend(CharacteristicReadEvent(device, requestId, offset, characteristic) { data ->
try {
openServer?.sendResponse(device, requestId, data.status, data.offset, data.value)
Expand All @@ -61,6 +61,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
Timber.w("Event received while listening disabled: onCharacteristicWriteRequest")
return
}
Timber.v("onCharacteristicWriteRequest: $device, $requestId, ${characteristic.uuid}, $preparedWrite, $responseNeeded, $offset, $value")
trySend(CharacteristicWriteEvent(device, requestId, characteristic, preparedWrite, responseNeeded, offset, value) { status ->
try {
openServer?.sendResponse(device, requestId, status, offset, null)
Expand All @@ -75,6 +76,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
Timber.w("Event received while listening disabled: onDescriptorReadRequest")
return
}
Timber.v("onDescriptorReadRequest: $device, $requestId, $offset, ${descriptor?.characteristic?.uuid}->${descriptor?.uuid}")
trySend(DescriptorReadEvent(device!!, requestId, offset, descriptor!!) { data ->
try {
openServer?.sendResponse(device, requestId, data.status, data.offset, data.value)
Expand All @@ -89,6 +91,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
Timber.w("Event received while listening disabled: onDescriptorWriteRequest")
return
}
Timber.v("onDescriptorWriteRequest: $device, $requestId, ${descriptor?.characteristic?.uuid}->${descriptor?.uuid}, $preparedWrite, $responseNeeded, $offset, $value")
trySend(DescriptorWriteEvent(device!!, requestId, descriptor!!, offset, value ?: byteArrayOf()) { status ->
try {
openServer?.sendResponse(device, requestId, status, offset, null)
Expand All @@ -103,6 +106,7 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
Timber.w("Event received while listening disabled: onNotificationSent")
return
}
Timber.v("onNotificationSent: $device, $status")
trySend(NotificationSentEvent(device!!, status))
}

Expand All @@ -111,14 +115,17 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
Timber.w("Event received while listening disabled: onMtuChanged")
return
}
Timber.v("onMtuChanged: $device, $mtu")
trySend(MtuChangedEvent(device!!, mtu))
}

override fun onServiceAdded(status: Int, service: BluetoothGattService?) {
Timber.v("onServiceAdded: $status, ${service?.uuid}")
serviceAddedChannel.trySend(ServiceAddedEvent(status, service))
}
}
openServer = bluetoothManager.openGattServer(context, callbacks)
openServer.clearServices()
services.forEach {
check(serviceAddedChannel.isEmpty) { "Service added event not consumed" }
val service = it.register(serverFlow)
Expand All @@ -132,7 +139,10 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
server = openServer
send(ServerInitializedEvent(this@GattServerImpl))
listeningEnabled = true
awaitClose { openServer.close() }
awaitClose {
openServer.close()
server = null
}
}

private val serverActor = scope.actor<ServerAction> {
Expand Down Expand Up @@ -169,4 +179,14 @@ class GattServerImpl(private val bluetoothManager: BluetoothManager, private val
override fun getFlow(): Flow<ServerEvent> {
return serverFlow
}

override fun isOpened(): Boolean {
return server != null
}

override fun close() {
scope.cancel("GattServerImpl closed")
server?.close()
serverActor.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.rebble.cobble.bluetooth.ble

import android.bluetooth.BluetoothManager
import android.content.Context
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import timber.log.Timber

object GattServerManager {
private var gattServer: GattServer? = null
private var gattServerJob: Job? = null
private var _ppogService: PPoGService? = null
val ppogService: PPoGService?
get() = _ppogService

fun getGattServer(): GattServer? {
return gattServer
}

fun initIfNeeded(context: Context, scope: CoroutineScope): GattServer {
if (gattServer?.isOpened() != true || gattServerJob?.isActive != true) {
gattServer?.close()
_ppogService = PPoGService(scope)
gattServer = GattServerImpl(
context.getSystemService(BluetoothManager::class.java)!!,
context,
listOf(ppogService!!, DummyService())
)
}
gattServerJob = gattServer!!.getFlow().onEach {
Timber.v("Server state: $it")
}.launchIn(scope)
return gattServer!!
}

fun close() {
gattServer?.close()
gattServerJob?.cancel()
gattServer = null
gattServerJob = null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,19 @@ class ServiceAddedEvent(val status: Int, val service: BluetoothGattService?) : S
class ServerInitializedEvent(val server: GattServer) : ServerEvent

open class ServiceEvent(val device: BluetoothDevice) : ServerEvent
class ConnectionStateEvent(device: BluetoothDevice, val status: Int, val newState: Int) : ServiceEvent(device)
class ConnectionStateEvent(device: BluetoothDevice, val status: Int, val newState: GattConnectionState) : ServiceEvent(device)
enum class GattConnectionState(val value: Int) {
Disconnected(BluetoothGatt.STATE_DISCONNECTED),
Connecting(BluetoothGatt.STATE_CONNECTING),
Connected(BluetoothGatt.STATE_CONNECTED),
Disconnecting(BluetoothGatt.STATE_DISCONNECTING);

companion object {
fun fromInt(value: Int): GattConnectionState {
return entries.firstOrNull { it.value == value } ?: throw IllegalArgumentException("Unknown connection state: $value")
}
}
}
class CharacteristicReadEvent(device: BluetoothDevice, val requestId: Int, val offset: Int, val characteristic: BluetoothGattCharacteristic, val respond: (CharacteristicResponse) -> Unit) : ServiceEvent(device)
class CharacteristicWriteEvent(device: BluetoothDevice, val requestId: Int, val characteristic: BluetoothGattCharacteristic, val preparedWrite: Boolean, val responseNeeded: Boolean, val offset: Int, val value: ByteArray, val respond: (Int) -> Unit) : ServiceEvent(device)
class CharacteristicResponse(val status: Int, val offset: Int, val value: ByteArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import kotlin.jvm.Throws

class PPoGPacketWriter(private val scope: CoroutineScope, private val stateManager: PPoGSession.StateManager, private val onTimeout: () -> Unit): Closeable {
private var metaWaitingToSend: GATTPacket? = null
private val dataWaitingToSend: LinkedList<GATTPacket> = LinkedList()
private val inflightPackets: LinkedList<GATTPacket> = LinkedList()
val dataWaitingToSend: LinkedList<GATTPacket> = LinkedList()
val inflightPackets: LinkedList<GATTPacket> = LinkedList()
var txWindow = 1
private var timeoutJob: Job? = null
private val _packetWriteFlow = MutableSharedFlow<GATTPacket>()
Expand Down Expand Up @@ -55,22 +55,23 @@ class PPoGPacketWriter(private val scope: CoroutineScope, private val stateManag
break
}
}
if (!inflightPackets.contains(packet)) {
if (inflightPackets.find { it.sequence == packet.sequence } == null) {
Timber.w("Received ACK for packet not in flight")
return
}
var ackedPacket: GATTPacket? = null

// remove packets until the acked packet
while (ackedPacket != packet) {
while (ackedPacket?.sequence != packet.sequence) {
ackedPacket = inflightPackets.poll()
check(ackedPacket != null) { "Polled inflightPackets to empty" }
}
sendNextPacket()
rescheduleTimeout()
}

@Throws(SecurityException::class)
private suspend fun sendNextPacket() {
suspend fun sendNextPacket() {
if (metaWaitingToSend == null && dataWaitingToSend.isEmpty()) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PPoGPebblePacketAssembler {

if (!data!!.hasRemaining()) {
data!!.flip()
val packet = PebblePacket.deserialize(data!!.array().asUByteArray())
val packet = data!!.array().clone()
emit(packet)
clear()
}
Expand Down
Loading

0 comments on commit 73265b9

Please sign in to comment.