Skip to content

Commit

Permalink
Unstick stuck uploads (#2844)
Browse files Browse the repository at this point in the history
* Set mutation failure state before logging Crashlytics

* Add bug link to TODO

* Also process "in progress" mutations if found

* Force sync of stuck mutations

* Only sync incomplete mutations

* Retry stuck media uploads
  • Loading branch information
gino-m authored Nov 17, 2024
1 parent 8c7af6f commit c47777c
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class RoomLocationOfInterestStore @Inject internal constructor() : LocalLocation
mutations.filter { it.surveyId == survey.id }.map { it.toModelObject() }
}

override fun getAllMutationsFlow(): Flow<List<LocationOfInterestMutation>> =
locationOfInterestMutationDao.getAllMutationsFlow().map { mutations ->
mutations.map { it.toModelObject() }
}

override suspend fun findByLocationOfInterestId(
id: String,
vararg states: MutationEntitySyncStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ import javax.inject.Singleton
import kotlinx.collections.immutable.toPersistentList
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import timber.log.Timber

/** Manages access to [Submission] objects persisted in local storage. */
Expand All @@ -60,6 +62,7 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore
@Inject lateinit var submissionDao: SubmissionDao
@Inject lateinit var submissionMutationDao: SubmissionMutationDao
@Inject lateinit var userStore: RoomUserStore
@Inject lateinit var surveyStore: RoomSurveyStore

/**
* Attempts to retrieve the [Submission] associated with the given ID and [LocationOfInterest].
Expand Down Expand Up @@ -223,7 +226,18 @@ class RoomSubmissionStore @Inject internal constructor() : LocalSubmissionStore
.map { mutations ->
mutations.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) }
}
.catch { Timber.e("ignoring invalid submission mutation:\n\t${it.message}") }
.catch { Timber.e("Ignoring invalid submission mutation", it) }

override fun getAllMutationsFlow(): Flow<List<SubmissionMutation>> =
submissionMutationDao
.getAllMutationsFlow()
.map { it.mapNotNull { mutation -> convertMutation(mutation) } }
.catch { Timber.e("ignoring invalid submission mutation", it) }

private suspend fun convertMutation(mutation: SubmissionMutationEntity): SubmissionMutation? {
val survey = surveyStore.getSurveyById(mutation.surveyId)
return survey?.let { mutation.toModelObject(survey) }
}

override suspend fun findByLocationOfInterestId(
loidId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ interface LocalLocationOfInterestStore :
*/
fun getAllSurveyMutations(survey: Survey): Flow<List<LocationOfInterestMutation>>

fun getAllMutationsFlow(): Flow<List<LocationOfInterestMutation>>

suspend fun findByLocationOfInterestId(
id: String,
vararg states: MutationEntitySyncStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ interface LocalSubmissionStore : LocalMutationStore<SubmissionMutation, Submissi
*/
fun getAllSurveyMutationsFlow(survey: Survey): Flow<List<SubmissionMutation>>

fun getAllMutationsFlow(): Flow<List<SubmissionMutation>>

suspend fun findByLocationOfInterestId(
loidId: String,
vararg states: MutationEntitySyncStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import androidx.work.WorkerParameters
import com.google.android.ground.FirebaseCrashLogger
import com.google.android.ground.model.User
import com.google.android.ground.model.mutation.Mutation
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.FAILED
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.IN_PROGRESS
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.PENDING
import com.google.android.ground.persistence.local.stores.LocalUserStore
import com.google.android.ground.persistence.remote.RemoteDataStore
import com.google.android.ground.persistence.sync.LocalMutationSyncWorker.Companion.createInputData
Expand Down Expand Up @@ -60,7 +62,7 @@ constructor(

private suspend fun doWorkInternal(): Result =
try {
val mutations = getPendingOrEligibleFailedMutations()
val mutations = getIncompleteMutations()
Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId")
val result = processMutations(mutations)
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)
Expand All @@ -71,16 +73,12 @@ constructor(
}

/**
* Attempts to fetch all mutations from the [MutationRepository] that are in `PENDING` state or in
* `FAILED` state but eligible for retry.
* Attempts to fetch all mutations from the [MutationRepository] that are `PENDING`, `FAILED`, or
* `IN_PROGRESS` state. The latter should never occur since only on worker should be scheduled per
* LOI at a given time.
*/
private suspend fun getPendingOrEligibleFailedMutations(): List<Mutation> {
val pendingMutations =
mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.PENDING)
val failedMutationsEligibleForRetry =
mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.FAILED)
return pendingMutations + failedMutationsEligibleForRetry
}
private suspend fun getIncompleteMutations(): List<Mutation> =
mutationRepository.getMutations(locationOfInterestId, PENDING, FAILED, IN_PROGRESS)

/**
* Groups mutations by user id, loads each user, applies mutations, and removes processed
Expand Down Expand Up @@ -123,10 +121,10 @@ constructor(
// Mark all mutations as having failed since the remote datastore only commits when all
// mutations have succeeded.
Timber.d(t, "Local mutation sync failed")
mutationRepository.markAsFailed(mutations, t)
val crashlytics = FirebaseCrashLogger()
crashlytics.setSelectedSurveyId(mutations.first().surveyId)
crashlytics.logException(t)
mutationRepository.markAsFailed(mutations, t)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ constructor(
mutationRepository.getSubmissionMutations(
loiId,
MutationEntitySyncStatus.MEDIA_UPLOAD_PENDING,
MutationEntitySyncStatus.MEDIA_UPLOAD_IN_PROGRESS,
MutationEntitySyncStatus.MEDIA_UPLOAD_AWAITING_RETRY,
)
val results = uploadMedia(mutations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,21 @@ constructor(
* subscribe and a new list on each subsequent change.
*/
fun getSurveyMutationsFlow(survey: Survey): Flow<List<Mutation>> {
// TODO: Show mutations for all surveys, not just current one.
// TODO(https://github.com/google/ground-android/issues/2838): Show mutations for all surveys,
// not just current one.
val locationOfInterestMutations = localLocationOfInterestStore.getAllSurveyMutations(survey)
val submissionMutations = localSubmissionStore.getAllSurveyMutationsFlow(survey)

return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations)
}

fun getAllMutationsFlow(): Flow<List<Mutation>> {
val locationOfInterestMutations = localLocationOfInterestStore.getAllMutationsFlow()
val submissionMutations = localSubmissionStore.getAllMutationsFlow()

return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations)
}

/**
* Returns all local submission mutations associated with the the given LOI ID that have one of
* the provided sync statues.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package com.google.android.ground.ui.home
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import androidx.lifecycle.viewModelScope
import com.google.android.ground.model.mutation.Mutation.SyncStatus.COMPLETED
import com.google.android.ground.model.submission.DraftSubmission
import com.google.android.ground.persistence.local.LocalValueStore
import com.google.android.ground.persistence.sync.MediaUploadWorkManager
import com.google.android.ground.persistence.sync.MutationSyncWorkManager
import com.google.android.ground.repository.MutationRepository
import com.google.android.ground.repository.OfflineAreaRepository
import com.google.android.ground.repository.SubmissionRepository
import com.google.android.ground.repository.SurveyRepository
Expand All @@ -43,6 +47,9 @@ internal constructor(
private val navigator: Navigator,
private val offlineAreaRepository: OfflineAreaRepository,
private val submissionRepository: SubmissionRepository,
private val mutationRepository: MutationRepository,
private val mutationSyncWorkManager: MutationSyncWorkManager,
private val mediaUploadWorkManager: MediaUploadWorkManager,
val surveyRepository: SurveyRepository,
val userRepository: UserRepository,
) : AbstractViewModel() {
Expand All @@ -53,6 +60,26 @@ internal constructor(
// TODO(#1730): Allow tile source configuration from a non-survey accessible source.
val showOfflineAreaMenuItem: LiveData<Boolean> = MutableLiveData(true)

init {
viewModelScope.launch { kickLocalMutationSyncWorkers() }
}

/**
* Enqueue data and photo upload workers for all pending mutations when home screen is first
* opened as a workaround the get stuck mutations (i.e., PENDING or FAILED mutations with no
* scheduled workers) going again. Workaround for
* https://github.com/google/ground-android/issues/2751.
*/
private suspend fun kickLocalMutationSyncWorkers() {
val mutations = mutationRepository.getAllMutationsFlow().first()
val incompleteLoiIds =
mutations.filter { it.syncStatus != COMPLETED }.map { it.locationOfInterestId }.toSet()
incompleteLoiIds.forEach { loiId ->
mutationSyncWorkManager.enqueueSyncWorker(loiId)
mediaUploadWorkManager.enqueueSyncWorker(loiId)
}
}

/** Attempts to return draft submission for the currently active survey. */
suspend fun getDraftSubmission(): DraftSubmission? {
val draftId = localValueStore.draftSubmissionId
Expand Down

0 comments on commit c47777c

Please sign in to comment.