Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unstick stuck uploads #2844

Merged
merged 7 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class RoomLocationOfInterestStore @Inject internal constructor() : LocalLocation
mutations.filter { it.surveyId == survey.id }.map { it.toModelObject() }
}

override fun getAllMutationsFlow(): Flow<List<LocationOfInterestMutation>> =
locationOfInterestMutationDao.getAllMutationsFlow().map { mutations ->
mutations.map { it.toModelObject() }
}

override suspend fun findByLocationOfInterestId(
id: String,
vararg states: MutationEntitySyncStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import kotlinx.collections.immutable.toPersistentList
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import timber.log.Timber

/** Manages access to [Submission] objects persisted in local storage. */
Expand All @@ -60,6 +62,7 @@
@Inject lateinit var submissionDao: SubmissionDao
@Inject lateinit var submissionMutationDao: SubmissionMutationDao
@Inject lateinit var userStore: RoomUserStore
@Inject lateinit var surveyStore: RoomSurveyStore

/**
* Attempts to retrieve the [Submission] associated with the given ID and [LocationOfInterest].
Expand Down Expand Up @@ -223,7 +226,18 @@
.map { mutations ->
mutations.filter { it.surveyId == survey.id }.map { it.toModelObject(survey) }
}
.catch { Timber.e("ignoring invalid submission mutation:\n\t${it.message}") }
.catch { Timber.e("Ignoring invalid submission mutation", it) }

override fun getAllMutationsFlow(): Flow<List<SubmissionMutation>> =
submissionMutationDao
.getAllMutationsFlow()
.map { it.mapNotNull { mutation -> convertMutation(mutation) } }
.catch { Timber.e("ignoring invalid submission mutation", it) }

private suspend fun convertMutation(mutation: SubmissionMutationEntity): SubmissionMutation? {
val survey = surveyStore.getSurveyById(mutation.surveyId)

Check warning on line 238 in ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt

View check run for this annotation

Codecov / codecov/patch

ground/src/main/java/com/google/android/ground/persistence/local/room/stores/RoomSubmissionStore.kt#L237-L238

Added lines #L237 - L238 were not covered by tests
return survey?.let { mutation.toModelObject(survey) }
}

override suspend fun findByLocationOfInterestId(
loidId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ interface LocalLocationOfInterestStore :
*/
fun getAllSurveyMutations(survey: Survey): Flow<List<LocationOfInterestMutation>>

fun getAllMutationsFlow(): Flow<List<LocationOfInterestMutation>>

suspend fun findByLocationOfInterestId(
id: String,
vararg states: MutationEntitySyncStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ interface LocalSubmissionStore : LocalMutationStore<SubmissionMutation, Submissi
*/
fun getAllSurveyMutationsFlow(survey: Survey): Flow<List<SubmissionMutation>>

fun getAllMutationsFlow(): Flow<List<SubmissionMutation>>

suspend fun findByLocationOfInterestId(
loidId: String,
vararg states: MutationEntitySyncStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import androidx.work.WorkerParameters
import com.google.android.ground.FirebaseCrashLogger
import com.google.android.ground.model.User
import com.google.android.ground.model.mutation.Mutation
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.FAILED
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.IN_PROGRESS
import com.google.android.ground.persistence.local.room.fields.MutationEntitySyncStatus.PENDING
import com.google.android.ground.persistence.local.stores.LocalUserStore
import com.google.android.ground.persistence.remote.RemoteDataStore
import com.google.android.ground.persistence.sync.LocalMutationSyncWorker.Companion.createInputData
Expand Down Expand Up @@ -60,7 +62,7 @@ constructor(

private suspend fun doWorkInternal(): Result =
try {
val mutations = getPendingOrEligibleFailedMutations()
val mutations = getIncompleteMutations()
Timber.d("Syncing ${mutations.size} changes for LOI $locationOfInterestId")
val result = processMutations(mutations)
mediaUploadWorkManager.enqueueSyncWorker(locationOfInterestId)
Expand All @@ -71,16 +73,12 @@ constructor(
}

/**
* Attempts to fetch all mutations from the [MutationRepository] that are in `PENDING` state or in
* `FAILED` state but eligible for retry.
* Attempts to fetch all mutations from the [MutationRepository] that are `PENDING`, `FAILED`, or
* `IN_PROGRESS` state. The latter should never occur since only on worker should be scheduled per
* LOI at a given time.
*/
private suspend fun getPendingOrEligibleFailedMutations(): List<Mutation> {
val pendingMutations =
mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.PENDING)
val failedMutationsEligibleForRetry =
mutationRepository.getMutations(locationOfInterestId, MutationEntitySyncStatus.FAILED)
return pendingMutations + failedMutationsEligibleForRetry
}
private suspend fun getIncompleteMutations(): List<Mutation> =
mutationRepository.getMutations(locationOfInterestId, PENDING, FAILED, IN_PROGRESS)

/**
* Groups mutations by user id, loads each user, applies mutations, and removes processed
Expand Down Expand Up @@ -123,10 +121,10 @@ constructor(
// Mark all mutations as having failed since the remote datastore only commits when all
// mutations have succeeded.
Timber.d(t, "Local mutation sync failed")
mutationRepository.markAsFailed(mutations, t)
val crashlytics = FirebaseCrashLogger()
crashlytics.setSelectedSurveyId(mutations.first().surveyId)
crashlytics.logException(t)
mutationRepository.markAsFailed(mutations, t)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ constructor(
mutationRepository.getSubmissionMutations(
loiId,
MutationEntitySyncStatus.MEDIA_UPLOAD_PENDING,
MutationEntitySyncStatus.MEDIA_UPLOAD_IN_PROGRESS,
MutationEntitySyncStatus.MEDIA_UPLOAD_AWAITING_RETRY,
)
val results = uploadMedia(mutations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,21 @@ constructor(
* subscribe and a new list on each subsequent change.
*/
fun getSurveyMutationsFlow(survey: Survey): Flow<List<Mutation>> {
// TODO: Show mutations for all surveys, not just current one.
// TODO(https://github.com/google/ground-android/issues/2838): Show mutations for all surveys,
// not just current one.
val locationOfInterestMutations = localLocationOfInterestStore.getAllSurveyMutations(survey)
val submissionMutations = localSubmissionStore.getAllSurveyMutationsFlow(survey)

return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations)
}

fun getAllMutationsFlow(): Flow<List<Mutation>> {
val locationOfInterestMutations = localLocationOfInterestStore.getAllMutationsFlow()
val submissionMutations = localSubmissionStore.getAllMutationsFlow()

return locationOfInterestMutations.combine(submissionMutations, this::combineAndSortMutations)
}

/**
* Returns all local submission mutations associated with the the given LOI ID that have one of
* the provided sync statues.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import androidx.lifecycle.viewModelScope
import com.google.android.ground.model.mutation.Mutation.SyncStatus.COMPLETED
import com.google.android.ground.model.submission.DraftSubmission
import com.google.android.ground.persistence.local.LocalValueStore
import com.google.android.ground.persistence.sync.MediaUploadWorkManager
import com.google.android.ground.persistence.sync.MutationSyncWorkManager
import com.google.android.ground.repository.MutationRepository
import com.google.android.ground.repository.OfflineAreaRepository
import com.google.android.ground.repository.SubmissionRepository
import com.google.android.ground.repository.SurveyRepository
Expand All @@ -43,6 +47,9 @@
private val navigator: Navigator,
private val offlineAreaRepository: OfflineAreaRepository,
private val submissionRepository: SubmissionRepository,
private val mutationRepository: MutationRepository,
private val mutationSyncWorkManager: MutationSyncWorkManager,
private val mediaUploadWorkManager: MediaUploadWorkManager,
val surveyRepository: SurveyRepository,
val userRepository: UserRepository,
) : AbstractViewModel() {
Expand All @@ -53,6 +60,26 @@
// TODO(#1730): Allow tile source configuration from a non-survey accessible source.
val showOfflineAreaMenuItem: LiveData<Boolean> = MutableLiveData(true)

init {
viewModelScope.launch { kickLocalMutationSyncWorkers() }
}

/**
* Enqueue data and photo upload workers for all pending mutations when home screen is first
* opened as a workaround the get stuck mutations (i.e., PENDING or FAILED mutations with no
* scheduled workers) going again. Workaround for
* https://github.com/google/ground-android/issues/2751.
*/
private suspend fun kickLocalMutationSyncWorkers() {
val mutations = mutationRepository.getAllMutationsFlow().first()
val incompleteLoiIds =
mutations.filter { it.syncStatus != COMPLETED }.map { it.locationOfInterestId }.toSet()
incompleteLoiIds.forEach { loiId ->
mutationSyncWorkManager.enqueueSyncWorker(loiId)
mediaUploadWorkManager.enqueueSyncWorker(loiId)
}

Check warning on line 80 in ground/src/main/java/com/google/android/ground/ui/home/HomeScreenViewModel.kt

View check run for this annotation

Codecov / codecov/patch

ground/src/main/java/com/google/android/ground/ui/home/HomeScreenViewModel.kt#L78-L80

Added lines #L78 - L80 were not covered by tests
}

/** Attempts to return draft submission for the currently active survey. */
suspend fun getDraftSubmission(): DraftSubmission? {
val draftId = localValueStore.draftSubmissionId
Expand Down