diff --git a/ground/src/main/java/com/google/android/ground/persistence/sync/LocalMutationSyncWorker.kt b/ground/src/main/java/com/google/android/ground/persistence/sync/LocalMutationSyncWorker.kt index fb93025d71..1e9d2b8a5e 100644 --- a/ground/src/main/java/com/google/android/ground/persistence/sync/LocalMutationSyncWorker.kt +++ b/ground/src/main/java/com/google/android/ground/persistence/sync/LocalMutationSyncWorker.kt @@ -20,15 +20,18 @@ import androidx.hilt.work.HiltWorker import androidx.work.CoroutineWorker import androidx.work.Data import androidx.work.ListenableWorker.Result.retry -import androidx.work.ListenableWorker.Result.success import androidx.work.WorkerParameters import com.google.android.ground.model.User +import com.google.android.ground.model.mutation.LocationOfInterestMutation import com.google.android.ground.model.mutation.Mutation +import com.google.android.ground.model.mutation.Mutation.SyncStatus.FAILED +import com.google.android.ground.model.mutation.Mutation.Type.CREATE import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus import com.google.android.ground.persistence.local.stores.LocalUserStore import com.google.android.ground.persistence.remote.RemoteDataStore import com.google.android.ground.persistence.sync.LocalMutationSyncWorker.Companion.createInputData import com.google.android.ground.repository.MutationRepository +import com.google.android.ground.system.auth.AuthenticationManager import dagger.assisted.Assisted import dagger.assisted.AssistedInject import kotlinx.coroutines.Dispatchers @@ -49,62 +52,72 @@ constructor( private val mutationRepository: MutationRepository, private val localUserStore: LocalUserStore, private val remoteDataStore: RemoteDataStore, - private val mediaUploadWorkManager: MediaUploadWorkManager + private val mediaUploadWorkManager: MediaUploadWorkManager, + private val authenticationManager: AuthenticationManager ) : CoroutineWorker(context, params) { private val locationOfInterestId: String = params.inputData.getString(LOCATION_OF_INTEREST_ID_PARAM_KEY)!! - override suspend fun doWork(): Result = withContext(Dispatchers.IO) { doWorkInternal() } - - private suspend fun doWorkInternal(): Result = - try { - val mutations = getPendingOrEligibleFailedMutations() - Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId") - val result = processMutations(mutations) - mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId) - if (result) success() else retry() - } catch (t: Throwable) { - Timber.e(t, "Failed to sync changes for LOI $locationOfInterestId") - retry() + override suspend fun doWork(): Result = + withContext(Dispatchers.IO) { + try { + // TODO: Move into new UploadUserDataUseCase class. + val currentUser = authenticationManager.getAuthenticatedUser() + // TODO: Check new mutation states to know whether to retry. + processMutations(getPendingOrEligibleFailedMutations(currentUser), currentUser) + } catch (t: Throwable) { + Timber.e(t, "Failed to sync changes for LOI $locationOfInterestId") + retry() + } } /** * Attempts to fetch all mutations from the [MutationRepository] that are in `PENDING` state or in - * `FAILED` state but eligible for retry. + * `FAILED` state but eligible for retry. Ignores mutations not owned by the currently signed in + * user. */ - private suspend fun getPendingOrEligibleFailedMutations(): List { + // TODO: Move into mutation repository. + private suspend fun getPendingOrEligibleFailedMutations(user: User): List { + // TODO: Sort by mutation timestamp. val pendingMutations = mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.PENDING) val failedMutationsEligibleForRetry = mutationRepository .getMutations(locationOfInterestId, MutationEntitySyncStatus.FAILED) .filter { it.retryCount < MAX_RETRY_COUNT } - return pendingMutations + failedMutationsEligibleForRetry - } - /** - * Groups mutations by user id, loads each user, applies mutations, and removes processed - * mutations. - * - * @return `true` if all mutations are applied successfully, else `false` - */ - private suspend fun processMutations(allMutations: List): Boolean { - val mutationsByUserId = allMutations.groupBy { it.userId } - val userIds = mutationsByUserId.keys - var noErrors = true - for (userId in userIds) { - val mutations = mutationsByUserId[userId] - val user = getUser(userId) - if (mutations == null || user == null) { - continue - } - val result = processMutations(mutations, user) - if (!result) { - noErrors = false - } + val mutations = pendingMutations + failedMutationsEligibleForRetry + val currentUsersMutations = mutations.filter { it.userId == user.id } + if (mutations.size != currentUsersMutations.size) { + Timber.w( + "${mutations.size - currentUsersMutations.size} mutations found for " + + "another user. These should have been removed at sign-out" + ) } - return noErrors + return currentUsersMutations + } + + private suspend fun processMutations(mutations: List, user: User): Result { + Timber.v("Syncing ${mutations.size} changes for LOI $locationOfInterestId") + + // Split up create LOI mutations and other mutations. + val (createLoiMutations, otherMutations) = + mutations.filterIsInstance().partition { it.type == CREATE } + + // All data is associated with an LOIs, so create those first if found. Stop and retry if + // failed. + if (createLoiMutations.lastOrNull()?.let { processMutation(it, user)?.syncStatus == FAILED ) return retry() + + // Then process all other LOI and submission mutations. + val updatedMutations = otherMutations.map { processMutation(it, user) } + + // Queue media worker if any submission mutations call for it. + if (updatedMutations.any { it.syncStatus == Mutation.SyncStatus.MEDIA_UPLOAD_PENDING }) + mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId) + + // If any mutations failed, queue retry. + return if (updatedMutations.any { it.syncStatus == FAILED }) retry() else Result.success() } /** @@ -112,29 +125,21 @@ constructor( * * @return `true` if the mutations were successfully synced with [RemoteDataStore]. */ - private suspend fun processMutations(mutations: List, user: User): Boolean { - check(mutations.isNotEmpty()) { "List of mutations is empty" } - - return try { - mutationRepository.markAsInProgress(mutations) - remoteDataStore.applyMutations(mutations, user) - mutationRepository.finalizePendingMutationsForMediaUpload(mutations) - true - } catch (t: Throwable) { - mutationRepository.markAsFailed(mutations, t) - false - } - } - - private suspend fun getUser(userId: String): User? { - val user = localUserStore.getUserOrNull(userId) - if (user == null) { - Timber.e("User account removed before mutation processed") + private suspend fun processMutation(mutation: Mutation, user: User): Boolean { + val mutations = listOf(mutation) + return try { + mutationRepository.markAsInProgress(mutations) + remoteDataStore.applyMutations(mutations, user) + mutationRepository.finalizePendingMutationsForMediaUpload(mutations) + true + } catch (t: Throwable) { + mutationRepository.markAsFailed(mutations, t) + false + } } - return user - } companion object { + // TODO: Move to Config class. private const val MAX_RETRY_COUNT = 10 internal const val LOCATION_OF_INTEREST_ID_PARAM_KEY = "locationOfInterestId" diff --git a/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt b/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt index f77c88fcb3..f814c288ee 100644 --- a/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt +++ b/ground/src/main/java/com/google/android/ground/repository/MutationRepository.kt @@ -70,15 +70,15 @@ constructor( * specified id. */ suspend fun getMutations( - loidId: String, + loiId: String, vararg entitySyncStatus: MutationEntitySyncStatus ): List { val loiMutations = localLocationOfInterestStore - .findByLocationOfInterestId(loidId, *entitySyncStatus) + .findByLocationOfInterestId(loiId, *entitySyncStatus) .map(LocationOfInterestMutationEntity::toModelObject) val submissionMutations = - localSubmissionStore.findByLocationOfInterestId(loidId, *entitySyncStatus).map { + localSubmissionStore.findByLocationOfInterestId(loiId, *entitySyncStatus).map { it.toSubmissionMutation() } return loiMutations + submissionMutations