Skip to content

Commit

Permalink
WIP - clarify behavior of LocalMutationSyncWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
gino-m committed Feb 8, 2024
1 parent 6fc3672 commit 1481d0f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ import androidx.hilt.work.HiltWorker
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.ListenableWorker.Result.retry
import androidx.work.ListenableWorker.Result.success
import androidx.work.WorkerParameters
import com.google.android.ground.model.User
import com.google.android.ground.model.mutation.LocationOfInterestMutation
import com.google.android.ground.model.mutation.Mutation
import com.google.android.ground.model.mutation.Mutation.SyncStatus.FAILED
import com.google.android.ground.model.mutation.Mutation.Type.CREATE
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.persistence.local.stores.LocalUserStore
import com.google.android.ground.persistence.remote.RemoteDataStore
import com.google.android.ground.persistence.sync.LocalMutationSyncWorker.Companion.createInputData
import com.google.android.ground.repository.MutationRepository
import com.google.android.ground.system.auth.AuthenticationManager
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.Dispatchers
Expand All @@ -49,92 +52,94 @@ constructor(
private val mutationRepository: MutationRepository,
private val localUserStore: LocalUserStore,
private val remoteDataStore: RemoteDataStore,
private val mediaUploadWorkManager: MediaUploadWorkManager
private val mediaUploadWorkManager: MediaUploadWorkManager,
private val authenticationManager: AuthenticationManager
) : CoroutineWorker(context, params) {

private val locationOfInterestId: String =
params.inputData.getString(LOCATION_OF_INTEREST_ID_PARAM_KEY)!!

override suspend fun doWork(): Result = withContext(Dispatchers.IO) { doWorkInternal() }

private suspend fun doWorkInternal(): Result =
try {
val mutations = getPendingOrEligibleFailedMutations()
Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId")
val result = processMutations(mutations)
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)
if (result) success() else retry()
} catch (t: Throwable) {
Timber.e(t, "Failed to sync changes for LOI $locationOfInterestId")
retry()
override suspend fun doWork(): Result =
withContext(Dispatchers.IO) {
try {
// TODO: Move into new UploadUserDataUseCase class.
val currentUser = authenticationManager.getAuthenticatedUser()
// TODO: Check new mutation states to know whether to retry.
processMutations(getPendingOrEligibleFailedMutations(currentUser), currentUser)
} catch (t: Throwable) {
Timber.e(t, "Failed to sync changes for LOI $locationOfInterestId")
retry()
}
}

/**
* Attempts to fetch all mutations from the [MutationRepository] that are in `PENDING` state or in
* `FAILED` state but eligible for retry.
* `FAILED` state but eligible for retry. Ignores mutations not owned by the currently signed in
* user.
*/
private suspend fun getPendingOrEligibleFailedMutations(): List<Mutation> {
// TODO: Move into mutation repository.
private suspend fun getPendingOrEligibleFailedMutations(user: User): List<Mutation> {
// TODO: Sort by mutation timestamp.
val pendingMutations =
mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.PENDING)
val failedMutationsEligibleForRetry =
mutationRepository
.getMutations(locationOfInterestId, MutationEntitySyncStatus.FAILED)
.filter { it.retryCount < MAX_RETRY_COUNT }
return pendingMutations + failedMutationsEligibleForRetry
}

/**
* Groups mutations by user id, loads each user, applies mutations, and removes processed
* mutations.
*
* @return `true` if all mutations are applied successfully, else `false`
*/
private suspend fun processMutations(allMutations: List<Mutation>): Boolean {
val mutationsByUserId = allMutations.groupBy { it.userId }
val userIds = mutationsByUserId.keys
var noErrors = true
for (userId in userIds) {
val mutations = mutationsByUserId[userId]
val user = getUser(userId)
if (mutations == null || user == null) {
continue
}
val result = processMutations(mutations, user)
if (!result) {
noErrors = false
}
val mutations = pendingMutations + failedMutationsEligibleForRetry
val currentUsersMutations = mutations.filter { it.userId == user.id }
if (mutations.size != currentUsersMutations.size) {
Timber.w(
"${mutations.size - currentUsersMutations.size} mutations found for " +
"another user. These should have been removed at sign-out"
)
}
return noErrors
return currentUsersMutations
}

private suspend fun processMutations(mutations: List<Mutation>, user: User): Result {
Timber.v("Syncing ${mutations.size} changes for LOI $locationOfInterestId")

// Split up create LOI mutations and other mutations.
val (createLoiMutations, otherMutations) =
mutations.filterIsInstance<LocationOfInterestMutation>().partition { it.type == CREATE }

// All data is associated with an LOIs, so create those first if found. Stop and retry if
// failed.
if (createLoiMutations.lastOrNull()?.let { processMutation(it, user)?.syncStatus == FAILED ) return retry()

// Then process all other LOI and submission mutations.
val updatedMutations = otherMutations.map { processMutation(it, user) }

// Queue media worker if any submission mutations call for it.
if (updatedMutations.any { it.syncStatus == Mutation.SyncStatus.MEDIA_UPLOAD_PENDING })
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)

// If any mutations failed, queue retry.
return if (updatedMutations.any { it.syncStatus == FAILED }) retry() else Result.success()
}

/**
* Applies mutations to remote data store. Once successful, removes them from the local db.
*
* @return `true` if the mutations were successfully synced with [RemoteDataStore].
*/
private suspend fun processMutations(mutations: List<Mutation>, user: User): Boolean {
check(mutations.isNotEmpty()) { "List of mutations is empty" }

return try {
mutationRepository.markAsInProgress(mutations)
remoteDataStore.applyMutations(mutations, user)
mutationRepository.finalizePendingMutationsForMediaUpload(mutations)
true
} catch (t: Throwable) {
mutationRepository.markAsFailed(mutations, t)
false
}
}

private suspend fun getUser(userId: String): User? {
val user = localUserStore.getUserOrNull(userId)
if (user == null) {
Timber.e("User account removed before mutation processed")
private suspend fun processMutation(mutation: Mutation, user: User): Boolean {
val mutations = listOf(mutation)
return try {
mutationRepository.markAsInProgress(mutations)
remoteDataStore.applyMutations(mutations, user)
mutationRepository.finalizePendingMutationsForMediaUpload(mutations)
true
} catch (t: Throwable) {
mutationRepository.markAsFailed(mutations, t)
false
}
}
return user
}

companion object {
// TODO: Move to Config class.
private const val MAX_RETRY_COUNT = 10

internal const val LOCATION_OF_INTEREST_ID_PARAM_KEY = "locationOfInterestId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ constructor(
* specified id.
*/
suspend fun getMutations(
loidId: String,
loiId: String,
vararg entitySyncStatus: MutationEntitySyncStatus
): List<Mutation> {
val loiMutations =
localLocationOfInterestStore
.findByLocationOfInterestId(loidId, *entitySyncStatus)
.findByLocationOfInterestId(loiId, *entitySyncStatus)
.map(LocationOfInterestMutationEntity::toModelObject)
val submissionMutations =
localSubmissionStore.findByLocationOfInterestId(loidId, *entitySyncStatus).map {
localSubmissionStore.findByLocationOfInterestId(loiId, *entitySyncStatus).map {
it.toSubmissionMutation()
}
return loiMutations + submissionMutations
Expand Down

0 comments on commit 1481d0f

Please sign in to comment.