Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4953][Integration] Implement RoomLifecycleManager + Async Rooms.get and Rooms.release #64

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.ably.chat

import java.util.concurrent.PriorityBlockingQueue
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.launch

/**
* AtomicCoroutineScope is a thread safe wrapper to run multiple operations mutually exclusive.
* All operations are atomic and run with given priority.
* Accepts scope as a constructor parameter to run operations under the given scope.
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
*/
class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) {

private val sequentialScope: CoroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))

private class Job<T : Any>(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> T,
val deferredResult: CompletableDeferred<T>,
val queuedPriority: Int,
) : Comparable<Job<*>> {
override fun compareTo(other: Job<*>) = when {
this.priority == other.priority -> this.queuedPriority.compareTo(other.queuedPriority)
else -> this.priority.compareTo(other.priority)
}
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
}

// Handles jobs of any type
private val jobs: PriorityBlockingQueue<Job<*>> = PriorityBlockingQueue() // Accessed from both sequentialScope and async method
private var isRunning = false // Only accessed from sequentialScope
private var queueCounter = 0 // Only accessed from synchronized method

val finishedProcessing: Boolean
get() = jobs.isEmpty() && !isRunning

val pendingJobCount: Int
get() = jobs.size

/**
* Defines priority for the operation execution and
* executes given coroutineBlock mutually exclusive under given scope.
*/
@Synchronized
fun <T : Any> async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<T>()
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
sequentialScope.launch {
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
safeExecute(it)
}
}
isRunning = false
}
}
return deferredResult
}
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved

private suspend fun <T : Any> safeExecute(job: Job<T>) {
try {
// Appends coroutineContext to cancel current/pending jobs when AtomicCoroutineScope is cancelled
scope.launch(coroutineContext) {
try {
val result = job.coroutineBlock(this)
job.deferredResult.complete(result)
} catch (t: Throwable) {
job.deferredResult.completeExceptionally(t)
}
}.join()
} catch (t: Throwable) {
job.deferredResult.completeExceptionally(t)
}
}

/**
* Cancels ongoing and pending operations with given error.
* See [Coroutine cancellation](https://kt.academy/article/cc-cancellation#cancellation-in-a-coroutine-scope) for more information.
*/
@Synchronized
fun cancel(message: String?, cause: Throwable? = null) {
queueCounter = 0
sequentialScope.coroutineContext.cancelChildren(CancellationException(message, cause))
}
}
12 changes: 6 additions & 6 deletions chat-android/src/main/java/com/ably/chat/ChatApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal class ChatApi(
metadata = params.metadata ?: mapOf(),
headers = params.headers ?: mapOf(),
)
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Send message endpoint returned empty value", HttpStatusCodes.InternalServerError))
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Send message endpoint returned empty value", HttpStatusCode.InternalServerError))
}

private fun validateSendMessageParams(params: SendMessageParams) {
Expand All @@ -90,8 +90,8 @@ internal class ChatApi(
throw AblyException.fromErrorInfo(
ErrorInfo(
"Metadata contains reserved 'ably-chat' key",
HttpStatusCodes.BadRequest,
ErrorCodes.InvalidRequestBody,
HttpStatusCode.BadRequest,
ErrorCode.InvalidRequestBody.code,
),
)
}
Expand All @@ -101,8 +101,8 @@ internal class ChatApi(
throw AblyException.fromErrorInfo(
ErrorInfo(
"Headers contains reserved key with reserved 'ably-chat' prefix",
HttpStatusCodes.BadRequest,
ErrorCodes.InvalidRequestBody,
HttpStatusCode.BadRequest,
ErrorCode.InvalidRequestBody.code,
),
)
}
Expand All @@ -117,7 +117,7 @@ internal class ChatApi(
connections = it.requireInt("connections"),
presenceMembers = it.requireInt("presenceMembers"),
)
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Occupancy endpoint returned empty value", HttpStatusCodes.InternalServerError))
} ?: throw AblyException.fromErrorInfo(ErrorInfo("Occupancy endpoint returned empty value", HttpStatusCode.InternalServerError))
}

private suspend fun makeAuthorizedRequest(
Expand Down
96 changes: 96 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Connection.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,73 @@
package com.ably.chat

import io.ably.lib.types.ErrorInfo

/**
* Default timeout for transient states before we attempt handle them as a state change.
*/
const val TRANSIENT_TIMEOUT = 5000

/**
* The different states that the connection can be in through its lifecycle.
*/
enum class ConnectionStatus(val stateName: String) {
/**
* A temporary state for when the library is first initialized.
*/
Initialized("initialized"),

/**
* The library is currently connecting to Ably.
*/
Connecting("connecting"),

/**
* The library is currently connected to Ably.
*/
Connected("connected"),

/**
* The library is currently disconnected from Ably, but will attempt to reconnect.
*/
Disconnected("disconnected"),

/**
* The library is in an extended state of disconnection, but will attempt to reconnect.
*/
Suspended("suspended"),

/**
* The library is currently disconnected from Ably and will not attempt to reconnect.
*/
Failed("failed"),
}

/**
* Represents a change in the status of the connection.
*/
data class ConnectionStatusChange(
/**
* The new status of the connection.
*/
val current: ConnectionStatus,

/**
* The previous status of the connection.
*/
val previous: ConnectionStatus,

/**
* An error that provides a reason why the connection has
* entered the new status, if applicable.
*/
val error: ErrorInfo?,

/**
* The time in milliseconds that the client will wait before attempting to reconnect.
*/
val retryIn: Long?,
)

/**
* Represents a connection to Ably.
*/
Expand All @@ -8,4 +76,32 @@ interface Connection {
* The current status of the connection.
*/
val status: ConnectionStatus

/**
* The current error, if any, that caused the connection to enter the current status.
*/
val error: ErrorInfo?

/**
* Registers a listener that will be called whenever the connection status changes.
* @param listener The function to call when the status changes.
* @returns An object that can be used to unregister the listener.
*/
fun onStatusChange(listener: Listener): Subscription
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved

/**
* An interface for listening to changes for the connection status
*/
fun interface Listener {
/**
* A function that can be called when the connection status changes.
* @param change The change in status.
*/
fun connectionStatusChanged(change: ConnectionStatusChange)
}

/**
* Removes all listeners that were added by the `onStatusChange` method.
*/
fun offAllStatusChange()
}
101 changes: 0 additions & 101 deletions chat-android/src/main/java/com/ably/chat/ConnectionStatus.kt

This file was deleted.

Loading