Skip to content

Commit

Permalink
Merge pull request #64 from ably-labs/feature/room-lifecycle-manager
Browse files Browse the repository at this point in the history
[ECO-4953][Integration] Implement RoomLifecycleManager + Async Rooms.get and Rooms.release
  • Loading branch information
sacOO7 authored Nov 26, 2024
2 parents 45c7156 + 3a2525a commit 03e04bb
Show file tree
Hide file tree
Showing 41 changed files with 4,194 additions and 340 deletions.
93 changes: 93 additions & 0 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.ably.chat

import java.util.concurrent.PriorityBlockingQueue
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.launch

/**
* AtomicCoroutineScope is a thread safe wrapper to run multiple operations mutually exclusive.
* All operations are atomic and run with given priority.
* Accepts scope as a constructor parameter to run operations under the given scope.
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
*/
class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) {

private val sequentialScope: CoroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))

private class Job<T : Any>(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> T,
val deferredResult: CompletableDeferred<T>,
val queuedPriority: Int,
) : Comparable<Job<*>> {
override fun compareTo(other: Job<*>) = when {
this.priority == other.priority -> this.queuedPriority.compareTo(other.queuedPriority)
else -> this.priority.compareTo(other.priority)
}
}

// Handles jobs of any type
private val jobs: PriorityBlockingQueue<Job<*>> = PriorityBlockingQueue() // Accessed from both sequentialScope and async method
private var isRunning = false // Only accessed from sequentialScope
private var queueCounter = 0 // Only accessed from synchronized method

val finishedProcessing: Boolean
get() = jobs.isEmpty() && !isRunning

val pendingJobCount: Int
get() = jobs.size

/**
* Defines priority for the operation execution and
* executes given coroutineBlock mutually exclusive under given scope.
*/
@Synchronized
fun <T : Any> async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<T>()
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
sequentialScope.launch {
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
safeExecute(it)
}
}
isRunning = false
}
}
return deferredResult
}

private suspend fun <T : Any> safeExecute(job: Job<T>) {
try {
// Appends coroutineContext to cancel current/pending jobs when AtomicCoroutineScope is cancelled
scope.launch(coroutineContext) {
try {
val result = job.coroutineBlock(this)
job.deferredResult.complete(result)
} catch (t: Throwable) {
job.deferredResult.completeExceptionally(t)
}
}.join()
} catch (t: Throwable) {
job.deferredResult.completeExceptionally(t)
}
}

/**
* Cancels ongoing and pending operations with given error.
* See [Coroutine cancellation](https://kt.academy/article/cc-cancellation#cancellation-in-a-coroutine-scope) for more information.
*/
@Synchronized
fun cancel(message: String?, cause: Throwable? = null) {
queueCounter = 0
sequentialScope.coroutineContext.cancelChildren(CancellationException(message, cause))
}
}
12 changes: 6 additions & 6 deletions chat-android/src/main/java/com/ably/chat/ChatApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal class ChatApi(
metadata = params.metadata ?: mapOf(),
headers = params.headers ?: mapOf(),
)
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Send message endpoint returned empty value", HttpStatusCodes.InternalServerError))
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Send message endpoint returned empty value", HttpStatusCode.InternalServerError))
}

private fun validateSendMessageParams(params: SendMessageParams) {
Expand All @@ -90,8 +90,8 @@ internal class ChatApi(
throw AblyException.fromErrorInfo(
ErrorInfo(
"Metadata contains reserved 'ably-chat' key",
HttpStatusCodes.BadRequest,
ErrorCodes.InvalidRequestBody,
HttpStatusCode.BadRequest,
ErrorCode.InvalidRequestBody.code,
),
)
}
Expand All @@ -101,8 +101,8 @@ internal class ChatApi(
throw AblyException.fromErrorInfo(
ErrorInfo(
"Headers contains reserved key with reserved 'ably-chat' prefix",
HttpStatusCodes.BadRequest,
ErrorCodes.InvalidRequestBody,
HttpStatusCode.BadRequest,
ErrorCode.InvalidRequestBody.code,
),
)
}
Expand All @@ -117,7 +117,7 @@ internal class ChatApi(
connections = it.requireInt("connections"),
presenceMembers = it.requireInt("presenceMembers"),
)
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Occupancy endpoint returned empty value", HttpStatusCodes.InternalServerError))
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Occupancy endpoint returned empty value", HttpStatusCode.InternalServerError))
}

private suspend fun makeAuthorizedRequest(
Expand Down
96 changes: 96 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Connection.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,73 @@
package com.ably.chat

import io.ably.lib.types.ErrorInfo

/**
* Default timeout for transient states before we attempt handle them as a state change.
*/
const val TRANSIENT_TIMEOUT = 5000

/**
* The different states that the connection can be in through its lifecycle.
*/
enum class ConnectionStatus(val stateName: String) {
/**
* A temporary state for when the library is first initialized.
*/
Initialized("initialized"),

/**
* The library is currently connecting to Ably.
*/
Connecting("connecting"),

/**
* The library is currently connected to Ably.
*/
Connected("connected"),

/**
* The library is currently disconnected from Ably, but will attempt to reconnect.
*/
Disconnected("disconnected"),

/**
* The library is in an extended state of disconnection, but will attempt to reconnect.
*/
Suspended("suspended"),

/**
* The library is currently disconnected from Ably and will not attempt to reconnect.
*/
Failed("failed"),
}

/**
* Represents a change in the status of the connection.
*/
data class ConnectionStatusChange(
/**
* The new status of the connection.
*/
val current: ConnectionStatus,

/**
* The previous status of the connection.
*/
val previous: ConnectionStatus,

/**
* An error that provides a reason why the connection has
* entered the new status, if applicable.
*/
val error: ErrorInfo?,

/**
* The time in milliseconds that the client will wait before attempting to reconnect.
*/
val retryIn: Long?,
)

/**
* Represents a connection to Ably.
*/
Expand All @@ -8,4 +76,32 @@ interface Connection {
* The current status of the connection.
*/
val status: ConnectionStatus

/**
* The current error, if any, that caused the connection to enter the current status.
*/
val error: ErrorInfo?

/**
* Registers a listener that will be called whenever the connection status changes.
* @param listener The function to call when the status changes.
* @returns An object that can be used to unregister the listener.
*/
fun onStatusChange(listener: Listener): Subscription

/**
* An interface for listening to changes for the connection status
*/
fun interface Listener {
/**
* A function that can be called when the connection status changes.
* @param change The change in status.
*/
fun connectionStatusChanged(change: ConnectionStatusChange)
}

/**
* Removes all listeners that were added by the `onStatusChange` method.
*/
fun offAllStatusChange()
}
101 changes: 0 additions & 101 deletions chat-android/src/main/java/com/ably/chat/ConnectionStatus.kt

This file was deleted.

Loading

0 comments on commit 03e04bb

Please sign in to comment.