Skip to content

Commit

Permalink
fix full reconnection attempts blocking forever
Browse files Browse the repository at this point in the history
  • Loading branch information
crc-32 committed Jul 4, 2024
1 parent eb0b29b commit df37df4
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 63 deletions.
2 changes: 1 addition & 1 deletion android/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def androidxTestVersion = "1.5.0"

dependencies {
coreLibraryDesugaring 'com.android.tools:desugar_jdk_libs:2.0.4'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutinesVersion"
implementation libs.kotlinx.coroutines.android
implementation libs.kotlinx.serialization.json
implementation libs.libpebblecommon
implementation "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,73 +69,80 @@ class ConnectionLooper @Inject constructor(
@RequiresPermission(android.Manifest.permission.BLUETOOTH_CONNECT)
fun connectToWatch(macAddress: String) {
coroutineScope.launch {
try {
lastConnectedWatch = macAddress

currentConnection?.cancelAndJoin()
currentConnection = coroutineContext[Job]
lastConnectedWatch = macAddress

launchRestartOnBluetoothOff(macAddress)

var retryTime = HALF_OF_INITAL_RETRY_TIME
var retries = 0
val reconnectionSocketServer = ReconnectionSocketServer(BluetoothAdapter.getDefaultAdapter()!!)
reconnectionSocketServer.start().onEach {
Timber.d("Reconnection socket server received connection from $it")
signalWatchPresence(macAddress)
}.launchIn(this)
while (isActive) {
if (BluetoothAdapter.getDefaultAdapter()?.isEnabled != true) {
Timber.d("Bluetooth is off. Waiting until it is on Cancel connection attempt.")
try {
withTimeout(2000) {
currentConnection?.cancelAndJoin()
}
} catch (_: TimeoutCancellationException) {
Timber.w("Failed to cancel connection in time")
}
currentConnection = launch {
try {
launchRestartOnBluetoothOff(macAddress)

var retryTime = HALF_OF_INITAL_RETRY_TIME
var retries = 0
val reconnectionSocketServer = ReconnectionSocketServer(BluetoothAdapter.getDefaultAdapter()!!)
reconnectionSocketServer.start().onEach {
Timber.d("Reconnection socket server received connection from $it")
signalWatchPresence(macAddress)
}.launchIn(this + CoroutineName("ReconnectionSocketServer"))
while (isActive) {
if (BluetoothAdapter.getDefaultAdapter()?.isEnabled != true) {
Timber.d("Bluetooth is off. Waiting until it is on Cancel connection attempt.")

_connectionState.value = ConnectionState.WaitingForTransport(
BluetoothAdapter.getDefaultAdapter()?.getRemoteDevice(macAddress)?.let { BluetoothPebbleDevice(it, it.address) }
)

getBluetoothStatus(context).first { bluetoothOn -> bluetoothOn }
}

_connectionState.value = ConnectionState.WaitingForTransport(
BluetoothAdapter.getDefaultAdapter()?.getRemoteDevice(macAddress)?.let { BluetoothPebbleDevice(it, it.address) }
)
try {
blueCommon.startSingleWatchConnection(macAddress).collect {
if (it is SingleConnectionStatus.Connected && connectionState.value !is ConnectionState.Connected && connectionState.value !is ConnectionState.RecoveryMode) {
// initial connection, wait on negotiation
_connectionState.value = ConnectionState.Negotiating(it.watch)
} else {
Timber.d("Not waiting for negotiation")
_connectionState.value = it.toConnectionStatus()
}
if (it is SingleConnectionStatus.Connected) {
retryTime = HALF_OF_INITAL_RETRY_TIME
retries = 0
}
}
} catch (_: CancellationException) {
// Do nothing. Cancellation is OK
} catch (e: Exception) {
Timber.e(e, "Watch connection error")
}

getBluetoothStatus(context).first { bluetoothOn -> bluetoothOn }
}
if (isActive) {
val lastWatch = connectionState.value.watchOrNull

try {
blueCommon.startSingleWatchConnection(macAddress).collect {
if (it is SingleConnectionStatus.Connected && connectionState.value !is ConnectionState.Connected && connectionState.value !is ConnectionState.RecoveryMode) {
// initial connection, wait on negotiation
_connectionState.value = ConnectionState.Negotiating(it.watch)
} else {
Timber.d("Not waiting for negotiation")
_connectionState.value = it.toConnectionStatus()
retryTime = min(retryTime + HALF_OF_INITAL_RETRY_TIME, MAX_RETRY_TIME)
retries++
Timber.d("Watch connection failed, waiting and reconnecting after $retryTime ms (retry: $retries)")
_connectionState.value = ConnectionState.WaitingForReconnect(lastWatch)
delayJob = launch(CoroutineName("DelayJob")) {
delay(retryTime)
}
if (it is SingleConnectionStatus.Connected) {
try {
delayJob?.join()
} catch (_: CancellationException) {
Timber.i("Reconnect delay interrupted")
retryTime = HALF_OF_INITAL_RETRY_TIME
retries = 0
}
}
} catch (_: CancellationException) {
// Do nothing. Cancellation is OK
} catch (e: Exception) {
Timber.e(e, "Watch connection error")
}


val lastWatch = connectionState.value.watchOrNull

retryTime = min(retryTime + HALF_OF_INITAL_RETRY_TIME, MAX_RETRY_TIME)
retries++
Timber.d("Watch connection failed, waiting and reconnecting after $retryTime ms (retry: $retries)")
_connectionState.value = ConnectionState.WaitingForReconnect(lastWatch)
delayJob = launch {
delay(retryTime)
}
try {
delayJob?.join()
} catch (_: CancellationException) {
Timber.i("Reconnect delay interrupted")
retryTime = HALF_OF_INITAL_RETRY_TIME
retries = 0
}
} finally {
_connectionState.value = ConnectionState.Disconnected
lastConnectedWatch = null
}
} finally {
_connectionState.value = ConnectionState.Disconnected
lastConnectedWatch = null
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions android/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[versions]
android-minSdk = "29"
android-targetSdk = "34"
coroutinesVersion = "1.8.1"
daggerVersion = "2.51.1"
gradle = "8.5.0"
koinVersion = "3.2.0"
Expand Down Expand Up @@ -38,6 +39,9 @@ androidx-room-compiler = { module = "androidx.room:room-compiler", version.ref =
androidx-datastore = { module = "androidx.datastore:datastore", version.ref = "datastore" }
androidx-datastore-preferences = { module = "androidx.datastore:datastore-preferences", version.ref = "datastore" }
koin-core = { module = "io.insert-koin:koin-core", version.ref = "koinVersion" }
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutinesVersion" }
kotlinx-coroutines-debug = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-debug", version.ref = "coroutinesVersion" }
kotlinx-coroutines-android = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-android", version.ref = "coroutinesVersion" }
kotlinx-datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version.ref = "kotlinxDatetime" }
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinxSerializationJson" }
kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "kotlinxSerializationJson" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.rebble.libpebblecommon.protocolhelpers.ProtocolEndpoint
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext
import kotlinx.coroutines.runInterruptible
import timber.log.Timber
import java.io.IOException
import java.io.InputStream
Expand Down Expand Up @@ -62,7 +62,7 @@ class ProtocolIO(
}
}

suspend fun write(bytes: ByteArray) = withContext(Dispatchers.IO) {
suspend fun write(bytes: ByteArray) = runInterruptible(Dispatchers.IO) {
//Timber.d("Sending packet of EP ${PebblePacket(bytes.toUByteArray()).endpoint}")
outputStream.write(bytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BlueSerialDriver(
}
}

val sendLoop = launch {
val sendLoop = launch(CoroutineName("SendLoop")) {
protocolHandler.startPacketSendingLoop(::sendPacket)
}

Expand All @@ -57,7 +57,6 @@ class BlueSerialDriver(

private suspend fun sendPacket(bytes: UByteArray): Boolean {
val protocolIO = protocolIO ?: return false
@Suppress("BlockingMethodInNonBlockingContext")
protocolIO.write(bytes.toByteArray())
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package io.rebble.cobble.bluetooth.classic
import android.bluetooth.BluetoothAdapter
import androidx.annotation.RequiresPermission
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.isActive
import kotlinx.coroutines.isActive
import kotlinx.coroutines.runInterruptible
import timber.log.Timber
import java.io.IOException
import java.util.UUID
import kotlin.coroutines.CoroutineContext

Expand All @@ -22,8 +26,15 @@ class ReconnectionSocketServer(private val adapter: BluetoothAdapter, private va
serverSocket.use {
Timber.d("Starting reconnection socket server")
while (true) {
val socket = runInterruptible {
it.accept()
//XXX: This is a blocking call, but runInterruptible doesn't seem to work here
val socket = try {
serverSocket.accept(2000)
} catch (e: IOException) {
if (currentCoroutineContext().isActive) {
continue
} else {
break
}
} ?: break
Timber.d("Accepted connection from ${socket.remoteDevice.address}")
emit(socket.remoteDevice.address)
Expand Down

0 comments on commit df37df4

Please sign in to comment.