Skip to content

Commit

Permalink
Refactor Activity streams to use coroutine (#2072)
Browse files Browse the repository at this point in the history
* Use sharedFlow for handling activity streams

* Use SharedFlow for handling activity results

* Use SharedFlow for requesting permission results

* Remove unused RxCompletable.kt
  • Loading branch information
shobhitagarwal1612 authored Nov 22, 2023
1 parent 8956649 commit 753b06d
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import androidx.lifecycle.lifecycleScope
import androidx.navigation.fragment.NavHostFragment
import com.google.android.ground.databinding.MainActBinding
import com.google.android.ground.repository.UserRepository
import com.google.android.ground.rx.RxAutoDispose.autoDisposable
import com.google.android.ground.system.ActivityStreams
import com.google.android.ground.system.SettingsManager
import com.google.android.ground.ui.common.*
Expand Down Expand Up @@ -57,9 +56,10 @@ class MainActivity : Hilt_MainActivity() {
super.onCreate(savedInstanceState)

// Set up event streams first. Navigator must be listening when auth is first initialized.
activityStreams.activityRequests.`as`(autoDisposable(this)).subscribe {
callback: Consumer<Activity> ->
callback.accept(this)
lifecycleScope.launch {
activityStreams.activityRequests.collect { callback: Consumer<Activity> ->
callback.accept(this@MainActivity)
}
}

lifecycleScope.launch { navigator.getNavigateRequests().collect { onNavigate(it) } }
Expand Down
32 changes: 0 additions & 32 deletions ground/src/main/java/com/google/android/ground/rx/RxCompletable.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,48 @@ package com.google.android.ground.system

import android.app.Activity
import android.content.Intent
import com.google.android.ground.rx.annotations.Hot
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import io.reactivex.subjects.Subject
import com.google.android.ground.coroutines.ApplicationScope
import java8.util.function.Consumer
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch

/** Bridge between the [Activity] and various `Manager` classes. */
@Singleton
class ActivityStreams @Inject constructor() {
class ActivityStreams @Inject constructor(@ApplicationScope private val scope: CoroutineScope) {

/** Emits [Consumer]s to be executed in the context of the [Activity]. */
val activityRequests: @Hot Subject<Consumer<Activity>> = PublishSubject.create()
private val _activityRequestsFlow: MutableSharedFlow<Consumer<Activity>> = MutableSharedFlow()
val activityRequests: SharedFlow<Consumer<Activity>> = _activityRequestsFlow.asSharedFlow()

/** Emits [Activity.onActivityResult] events. */
private val activityResults: @Hot Subject<ActivityResult> = PublishSubject.create()
private val _activityResults: MutableSharedFlow<ActivityResult> = MutableSharedFlow()

/** Emits [Activity.onRequestPermissionsResult] events. */
private val requestPermissionsResults: @Hot Subject<RequestPermissionsResult> =
PublishSubject.create()
private val _requestPermissionsResults: MutableSharedFlow<RequestPermissionsResult> =
MutableSharedFlow()

/**
* Queues the specified [Consumer] for execution. An instance of the current [ ] is provided to
* the `Consumer` when called.
*/
fun withActivity(callback: Consumer<Activity>) = activityRequests.onNext(callback)
fun withActivity(callback: Consumer<Activity>) {
scope.launch { _activityRequestsFlow.emit(callback) }
}

/**
* Callback used to communicate [Activity.onActivityResult] events with various `Manager` classes.
*/
fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) =
activityResults.onNext(ActivityResult(requestCode, resultCode, data))
fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
scope.launch { _activityResults.emit(ActivityResult(requestCode, resultCode, data)) }
}

/**
* Callback used to communicate [Activity.onRequestPermissionsResult] events with various
Expand All @@ -59,28 +68,31 @@ class ActivityStreams @Inject constructor() {
requestCode: Int,
permissions: Array<String>,
grantResults: IntArray
) =
requestPermissionsResults.onNext(
RequestPermissionsResult(requestCode, permissions, grantResults)
)
) {
scope.launch {
_requestPermissionsResults.emit(
RequestPermissionsResult(requestCode, permissions, grantResults)
)
}
}

/** Emits [Activity.onActivityResult] events where `requestCode` matches the specified value. */
fun getActivityResults(requestCode: Int): @Hot Observable<ActivityResult> =
activityResults.filter { it.requestCode == requestCode }
fun getActivityResults(requestCode: Int): Flow<ActivityResult> =
_activityResults.filter { it.requestCode == requestCode }

/**
* Emits the next [Activity.onActivityResult] event where `requestCode` matches the specified
* value.
*/
fun getNextActivityResult(requestCode: Int): @Hot Observable<ActivityResult> =
getActivityResults(requestCode).take(1) // TODO(#723): Define and handle timeouts.
suspend fun getNextActivityResult(requestCode: Int): ActivityResult =
getActivityResults(requestCode).first() // TODO(#723): Define and handle timeouts.

/**
* Emits the next [Activity.onRequestPermissionsResult] event where `requestCode` matches the
* specified value.
*/
fun getNextRequestPermissionsResult(requestCode: Int): Observable<RequestPermissionsResult> =
requestPermissionsResults
suspend fun getNextRequestPermissionsResult(requestCode: Int): RequestPermissionsResult =
_requestPermissionsResults
.filter { it.requestCode == requestCode }
.take(1) // TODO(#723): Define and handle timeouts.
.first() // TODO(#723): Define and handle timeouts.
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package com.google.android.ground.system
import android.content.Context
import com.google.android.gms.common.ConnectionResult
import com.google.android.gms.common.GoogleApiAvailability
import com.google.android.ground.rx.RxCompletable
import dagger.hilt.android.qualifiers.ApplicationContext
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.rx2.await

private val INSTALL_API_REQUEST_CODE = GoogleApiAvailability::class.java.hashCode() and 0xffff

Expand Down Expand Up @@ -56,16 +54,12 @@ constructor(
}
}

private suspend fun getNextResult(requestCode: Int) =
activityStreams
.getNextActivityResult(requestCode)
.flatMapCompletable {
RxCompletable.completeOrError(
{ it.isOk() },
Exception::class.java // TODO: Throw appropriate Exception.
)
}
.await()
private suspend fun getNextResult(requestCode: Int) {
val result = activityStreams.getNextActivityResult(requestCode)
if (!result.isOk()) {
error("Activity result failed: requestCode = $requestCode, result = $result")
}
}

class GooglePlayServicesMissingException : Error("Google play services not available")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import android.content.Context
import android.content.pm.PackageManager
import androidx.core.app.ActivityCompat
import androidx.core.content.ContextCompat.checkSelfPermission
import com.google.android.ground.rx.RxCompletable.completeOrError
import dagger.hilt.android.qualifiers.ApplicationContext
import io.reactivex.Completable
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.rx2.await
import timber.log.Timber

/** Provides access to obtain and check the app's permissions. */
Expand Down Expand Up @@ -72,12 +70,10 @@ constructor(
* with error [PermissionDeniedException] if the requested permission was denied.
*/
private suspend fun getPermissionsResult(permission: String) {
activityStreams
.getNextRequestPermissionsResult(PERMISSIONS_REQUEST_CODE)
.flatMapCompletable { r: RequestPermissionsResult ->
completeOrError({ r.isGranted(permission) }, PermissionDeniedException::class.java)
}
.await()
val result = activityStreams.getNextRequestPermissionsResult(PERMISSIONS_REQUEST_CODE)
if (!result.isGranted(permission)) {
throw PermissionDeniedException()
}
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package com.google.android.ground.system
import com.google.android.gms.common.api.ResolvableApiException
import com.google.android.gms.location.LocationRequest
import com.google.android.gms.location.LocationSettingsRequest
import com.google.android.ground.rx.RxCompletable.completeOrError
import com.google.android.ground.system.rx.RxSettingsClient
import javax.inject.Inject
import javax.inject.Singleton
import kotlinx.coroutines.rx2.await
import timber.log.Timber

val LOCATION_SETTINGS_REQUEST_CODE = SettingsManager::class.java.hashCode() and 0xffff
Expand Down Expand Up @@ -70,13 +68,12 @@ constructor(
activityStreams.withActivity { resolvableException.startResolutionForResult(it, requestCode) }
}

private suspend fun getNextResult(requestCode: Int) =
activityStreams
.getNextActivityResult(requestCode)
.flatMapCompletable {
completeOrError({ it.isOk() }, SettingsChangeRequestCanceled::class.java)
}
.await()
private suspend fun getNextResult(requestCode: Int) {
val result = activityStreams.getNextActivityResult(requestCode)
if (!result.isOk()) {
throw SettingsChangeRequestCanceled()
}
}
}

class SettingsChangeRequestCanceled : Exception()
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import com.google.android.ground.model.User
import com.google.android.ground.rx.annotations.Hot
import com.google.android.ground.system.ActivityResult
import com.google.android.ground.system.ActivityStreams
import com.google.firebase.auth.*
import com.google.firebase.auth.AuthCredential
import com.google.firebase.auth.AuthResult
import com.google.firebase.auth.FirebaseAuth
import com.google.firebase.auth.FirebaseUser
import com.google.firebase.auth.GoogleAuthProvider
import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.Subject
import javax.inject.Inject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.rx2.asFlow
import timber.log.Timber

private val signInRequestCode = AuthenticationManager::class.java.hashCode() and 0xffff
Expand All @@ -59,9 +62,7 @@ constructor(
.build()

externalScope.launch {
activityStreams.getActivityResults(signInRequestCode).asFlow().collect {
onActivityResult(it)
}
activityStreams.getActivityResults(signInRequestCode).collect { onActivityResult(it) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.google.android.ground.BaseHiltTest
import com.google.android.ground.system.PermissionsManager.Companion.PERMISSIONS_REQUEST_CODE
import dagger.hilt.android.testing.BindValue
import dagger.hilt.android.testing.HiltAndroidTest
import io.reactivex.Observable
import javax.inject.Inject
import kotlin.test.assertFailsWith
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -69,16 +68,14 @@ class PermissionsManagerTest : BaseHiltTest() {
}
}

private fun setupPermissionResult(granted: Boolean) {
private fun setupPermissionResult(granted: Boolean) = runWithTestDispatcher {
whenever(activityStreamsMock.getNextRequestPermissionsResult(PERMISSIONS_REQUEST_CODE))
.thenReturn(
Observable.just(
RequestPermissionsResult(
PERMISSIONS_REQUEST_CODE,
arrayOf(testPermission),
intArrayOf(
if (granted) PackageManager.PERMISSION_GRANTED else PackageManager.PERMISSION_DENIED
)
RequestPermissionsResult(
PERMISSIONS_REQUEST_CODE,
arrayOf(testPermission),
intArrayOf(
if (granted) PackageManager.PERMISSION_GRANTED else PackageManager.PERMISSION_DENIED
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import com.google.android.ground.BaseHiltTest
import com.google.android.ground.system.rx.RxSettingsClient
import dagger.hilt.android.testing.BindValue
import dagger.hilt.android.testing.HiltAndroidTest
import io.reactivex.Observable
import javax.inject.Inject
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertThrows
import org.junit.Test
Expand All @@ -37,7 +35,6 @@ import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.robolectric.RobolectricTestRunner

@OptIn(ExperimentalCoroutinesApi::class)
@HiltAndroidTest
@RunWith(RobolectricTestRunner::class)
class SettingsManagerTest : BaseHiltTest() {
Expand All @@ -61,9 +58,7 @@ class SettingsManagerTest : BaseHiltTest() {
@Test
fun `enableLocationSettings() attempts to resolve error if resolvable`() = runWithTestDispatcher {
whenever(activityStreamsMock.getNextActivityResult(LOCATION_SETTINGS_REQUEST_CODE))
.thenReturn(
Observable.just(ActivityResult(LOCATION_SETTINGS_REQUEST_CODE, Activity.RESULT_OK, null))
)
.thenReturn(ActivityResult(LOCATION_SETTINGS_REQUEST_CODE, Activity.RESULT_OK, null))

whenever(settingsClientMock.checkLocationSettings(any())).thenAnswer {
throw ResolvableApiException(Status.RESULT_INTERNAL_ERROR)
Expand Down

0 comments on commit 753b06d

Please sign in to comment.