Skip to content

Commit

Permalink
WIP - clarify behavior of LocalMutationSyncWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
gino-m committed Feb 8, 2024
1 parent 6fc3672 commit e2cbeba
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import androidx.hilt.work.HiltWorker
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.ListenableWorker.Result.retry
import androidx.work.ListenableWorker.Result.success
import androidx.work.WorkerParameters
import com.google.android.ground.model.User
import com.google.android.ground.model.mutation.LocationOfInterestMutation
import com.google.android.ground.model.mutation.Mutation
import com.google.android.ground.model.mutation.Mutation.SyncStatus.FAILED
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.persistence.local.stores.LocalUserStore
import com.google.android.ground.persistence.remote.RemoteDataStore
import com.google.android.ground.persistence.sync.LocalMutationSyncWorker.Companion.createInputData
import com.google.android.ground.repository.MutationRepository
import com.google.android.ground.system.auth.AuthenticationManager
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.Dispatchers
Expand All @@ -49,7 +51,8 @@ constructor(
private val mutationRepository: MutationRepository,
private val localUserStore: LocalUserStore,
private val remoteDataStore: RemoteDataStore,
private val mediaUploadWorkManager: MediaUploadWorkManager
private val mediaUploadWorkManager: MediaUploadWorkManager,
private val authenticationManager: AuthenticationManager
) : CoroutineWorker(context, params) {

private val locationOfInterestId: String =
Expand All @@ -59,80 +62,89 @@ constructor(

private suspend fun doWorkInternal(): Result =
try {
val mutations = getPendingOrEligibleFailedMutations()
Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId")
val result = processMutations(mutations)
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)
if (result) success() else retry()
val currentUser = authenticationManager.getAuthenticatedUser()
// TODO: Sort by mutation timestamp.
processMutations(getPendingOrEligibleFailedMutations(currentUser), currentUser)
} catch (t: Throwable) {
Timber.e(t, "Failed to sync changes for LOI $locationOfInterestId")
retry()
}

private fun processCreateLoiMutations(
createLoiMutations: List<LocationOfInterestMutation>
): LocationOfInterestMutation? {
if (createLoiMutations.size > 1) {
Timber.w("Ignoring duplicate create-LOI mutations found for LOI $locationOfInterestId")
}
return createLoiMutations.lastOrNull()?.let(this::applyMutation)
}

private fun applyMutation(loiMutation: LocationOfInterestMutation): LocationOfInterestMutation {}

/**
* Attempts to fetch all mutations from the [MutationRepository] that are in `PENDING` state or in
* `FAILED` state but eligible for retry.
* `FAILED` state but eligible for retry. Ignores mutations not owned by the currently signed in
* user.
*/
private suspend fun getPendingOrEligibleFailedMutations(): List<Mutation> {
private suspend fun getPendingOrEligibleFailedMutations(user: User): List<Mutation> {
val pendingMutations =
mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.PENDING)
val failedMutationsEligibleForRetry =
mutationRepository
.getMutations(locationOfInterestId, MutationEntitySyncStatus.FAILED)
.filter { it.retryCount < MAX_RETRY_COUNT }
return pendingMutations + failedMutationsEligibleForRetry
}

/**
* Groups mutations by user id, loads each user, applies mutations, and removes processed
* mutations.
*
* @return `true` if all mutations are applied successfully, else `false`
*/
private suspend fun processMutations(allMutations: List<Mutation>): Boolean {
val mutationsByUserId = allMutations.groupBy { it.userId }
val userIds = mutationsByUserId.keys
var noErrors = true
for (userId in userIds) {
val mutations = mutationsByUserId[userId]
val user = getUser(userId)
if (mutations == null || user == null) {
continue
}
val result = processMutations(mutations, user)
if (!result) {
noErrors = false
}
val mutations = pendingMutations + failedMutationsEligibleForRetry
val currentUsersMutations = mutations.filter { it.userId == user.id }
if (mutations.size != currentUsersMutations.size) {
Timber.w(
"${mutations.size - currentUsersMutations.size} mutations found for " +
"another user. These should have been removed at sign-out"
)
}
return noErrors
return currentUsersMutations
}

private suspend fun processMutations(mutations: List<Mutation>, user: User): Result {
Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId")

// Apply create LOI mutation first.
val (createLoiMutations, otherMutations) = mutations.filterIsInstance<LocationOfInterestMutation>().partition { it.type == Mutation.Type.CREATE }

// All data is associated with an LOI, so stop and retry if create LOI failed.
if (processCreateLoiMutations(createLoiMutations)?.syncStatus == FAILED)
return retry()

// Then process all other LOI and submission mutations.
val updatedMutations = otherMutations.map(this::processMutation)

// Queue media worker if any submission mutations call for it.
if (updatedMutations.any { it.syncStatus == Mutation.SyncStatus.MEDIA_UPLOAD_PENDING })
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)

// If any mutations failed, queue retry.
return if (updatedMutations.any { it.syncStatus == FAILED }) retry() else Result.success()
}

// TODO:
/**
* Applies mutations to remote data store. Once successful, removes them from the local db.
*
* @return `true` if the mutations were successfully synced with [RemoteDataStore].
*/
private suspend fun processMutations(mutations: List<Mutation>, user: User): Boolean {
check(mutations.isNotEmpty()) { "List of mutations is empty" }

return try {
mutationRepository.markAsInProgress(mutations)
remoteDataStore.applyMutations(mutations, user)
mutationRepository.finalizePendingMutationsForMediaUpload(mutations)
true
} catch (t: Throwable) {
mutationRepository.markAsFailed(mutations, t)
false
}
}

private suspend fun getUser(userId: String): User? {
val user = localUserStore.getUserOrNull(userId)
if (user == null) {
Timber.e("User account removed before mutation processed")
}
return user
}
// private suspend fun processMutations(mutations: List<Mutation>, user: User): Boolean {
// check(mutations.isNotEmpty()) { "List of mutations is empty" }
//
// return try {
// mutationRepository.markAsInProgress(mutations)
// remoteDataStore.applyMutations(mutations, user)
// mutationRepository.finalizePendingMutationsForMediaUpload(mutations)
// true
// } catch (t: Throwable) {
// mutationRepository.markAsFailed(mutations, t)
// false
// }
// }

companion object {
private const val MAX_RETRY_COUNT = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ constructor(
* specified id.
*/
suspend fun getMutations(
loidId: String,
loiId: String,
vararg entitySyncStatus: MutationEntitySyncStatus
): List<Mutation> {
val loiMutations =
localLocationOfInterestStore
.findByLocationOfInterestId(loidId, *entitySyncStatus)
.findByLocationOfInterestId(loiId, *entitySyncStatus)
.map(LocationOfInterestMutationEntity::toModelObject)
val submissionMutations =
localSubmissionStore.findByLocationOfInterestId(loidId, *entitySyncStatus).map {
localSubmissionStore.findByLocationOfInterestId(loiId, *entitySyncStatus).map {
it.toSubmissionMutation()
}
return loiMutations + submissionMutations
Expand Down

0 comments on commit e2cbeba

Please sign in to comment.