From 796e4d6e0f188475bf4d32c6ac59a042a536beb4 Mon Sep 17 00:00:00 2001 From: Emily Dixon Date: Fri, 21 Jul 2023 15:21:29 -0700 Subject: [PATCH] Releases/v0.4.1 (#71) --- .../vod/demo/backend/ImaginaryBackend.kt | 2 +- .../upload/PlainViewListExampleActivity.kt | 18 +- .../vod/demo/upload/UploadListAdapter.kt | 10 +- .../demo/upload/screen/CreateUploadScreen.kt | 16 +- .../demo/upload/screen/UploadListScreen.kt | 2 +- .../upload/viewmodel/CreateUploadViewModel.kt | 1 + library/build.gradle | 1 + .../java/com/mux/video/upload/MuxUploadSdk.kt | 4 +- .../com/mux/video/upload/api/MuxUpload.kt | 152 +++-- .../mux/video/upload/api/MuxUploadManager.kt | 42 +- .../com/mux/video/upload/api/UploadStatus.kt | 76 +++ .../upload/internal/TranscoderContext.kt | 622 ++++++++++++++++++ .../mux/video/upload/internal/UploadInfo.kt | 29 +- .../video/upload/internal/UploadJobFactory.kt | 109 ++- .../video/upload/internal/UploadMetrics.kt | 38 +- .../upload/internal/UploadPersistence.kt | 13 +- .../com/mux/video/upload/internal/Util.kt | 108 +++ .../upload/internal/UploadPersistenceTests.kt | 12 +- 18 files changed, 1101 insertions(+), 154 deletions(-) create mode 100644 library/src/main/java/com/mux/video/upload/api/UploadStatus.kt create mode 100644 library/src/main/java/com/mux/video/upload/internal/TranscoderContext.kt diff --git a/app/src/main/java/com/mux/video/vod/demo/backend/ImaginaryBackend.kt b/app/src/main/java/com/mux/video/vod/demo/backend/ImaginaryBackend.kt index 84274130..c8570760 100644 --- a/app/src/main/java/com/mux/video/vod/demo/backend/ImaginaryBackend.kt +++ b/app/src/main/java/com/mux/video/vod/demo/backend/ImaginaryBackend.kt @@ -56,7 +56,7 @@ object ImaginaryBackend { // note: You shouldn't do basic auth with hard-coded keys in a real app private fun basicCredential(): String = Credentials.basic(ACCESS_TOKEN_ID, ACCESS_TOKEN_SECRET) - private const val ACCESS_TOKEN_ID = "YOUR ACCESS TOKEN ID HERE" + private const val ACCESS_TOKEN_ID = "YOUR TOKEN ID HERE" private const val ACCESS_TOKEN_SECRET = "YOUR TOKEN SECRET HERE" } diff --git a/app/src/main/java/com/mux/video/vod/demo/upload/PlainViewListExampleActivity.kt b/app/src/main/java/com/mux/video/vod/demo/upload/PlainViewListExampleActivity.kt index 459a3bd1..5219c794 100644 --- a/app/src/main/java/com/mux/video/vod/demo/upload/PlainViewListExampleActivity.kt +++ b/app/src/main/java/com/mux/video/vod/demo/upload/PlainViewListExampleActivity.kt @@ -27,6 +27,7 @@ class PlainViewListExampleActivity : AppCompatActivity() { { grantedPermissions -> if (!grantedPermissions.containsKey(android.Manifest.permission.READ_EXTERNAL_STORAGE) || grantedPermissions.containsKey(android.Manifest.permission.READ_MEDIA_VIDEO) + || grantedPermissions.containsKey(android.Manifest.permission.WRITE_EXTERNAL_STORAGE) ) { maybeRequestPermissionsApi33() } @@ -84,10 +85,18 @@ class PlainViewListExampleActivity : AppCompatActivity() { this, android.Manifest.permission.READ_EXTERNAL_STORAGE ) == PackageManager.PERMISSION_GRANTED + val hasWriteStorage = + ActivityCompat.checkSelfPermission( + this, + android.Manifest.permission.WRITE_EXTERNAL_STORAGE + ) == PackageManager.PERMISSION_GRANTED if (!hasReadStorage) { requestPermissions.launch(arrayOf(android.Manifest.permission.READ_EXTERNAL_STORAGE)) } + if (!hasWriteStorage) { + requestPermissions.launch(arrayOf(android.Manifest.permission.WRITE_EXTERNAL_STORAGE)) + } } @TargetApi(Build.VERSION_CODES.TIRAMISU) @@ -101,12 +110,17 @@ class PlainViewListExampleActivity : AppCompatActivity() { this, android.Manifest.permission.READ_MEDIA_VIDEO ) == PackageManager.PERMISSION_GRANTED + val hasWriteStorage = ActivityCompat.checkSelfPermission( + this, + android.Manifest.permission.WRITE_EXTERNAL_STORAGE + ) == PackageManager.PERMISSION_GRANTED - if (!hasReadVideo || !hasReadStorage) { + if (!hasReadVideo || !hasReadStorage || ! hasWriteStorage) { requestPermissions.launch( arrayOf( android.Manifest.permission.READ_EXTERNAL_STORAGE, - android.Manifest.permission.READ_MEDIA_VIDEO + android.Manifest.permission.READ_MEDIA_VIDEO, + android.Manifest.permission.WRITE_EXTERNAL_STORAGE ) ) } diff --git a/app/src/main/java/com/mux/video/vod/demo/upload/UploadListAdapter.kt b/app/src/main/java/com/mux/video/vod/demo/upload/UploadListAdapter.kt index 373e0f7d..f5e6bf7c 100644 --- a/app/src/main/java/com/mux/video/vod/demo/upload/UploadListAdapter.kt +++ b/app/src/main/java/com/mux/video/vod/demo/upload/UploadListAdapter.kt @@ -23,15 +23,15 @@ class UploadListAdapter( override fun onBindViewHolder(holder: MediaStoreVideoViewHolder, position: Int) { val listItem = items[position] - val elapsedTime = listItem.currentState.updatedTime - listItem.currentState.startTime - val bytesPerMs = listItem.currentState.bytesUploaded / elapsedTime.toDouble() - val stateMsg = if (listItem.currentState.bytesUploaded >= listItem.currentState.totalBytes) { + val elapsedTime = listItem.currentProgress.updatedTime - listItem.currentProgress.startTime + val bytesPerMs = listItem.currentProgress.bytesUploaded / elapsedTime.toDouble() + val stateMsg = if (listItem.currentProgress.bytesUploaded >= listItem.currentProgress.totalBytes) { "done" } else { "not done" } val progressPercent = - (listItem.currentState.bytesUploaded / listItem.currentState.totalBytes.toDouble()) * 100.0 + (listItem.currentProgress.bytesUploaded / listItem.currentProgress.totalBytes.toDouble()) * 100.0 val df = DecimalFormat("#.00") val formattedRate = df.format(bytesPerMs) @@ -40,7 +40,7 @@ class UploadListAdapter( holder.viewBinding.mediastoreVideoProgress.max = 100 holder.viewBinding.mediastoreVideoFilename.text = listItem.videoFile.absolutePath holder.viewBinding.mediastoreVideoDate.text = - "${listItem.currentState.bytesUploaded} bytes in ${elapsedTime / 1000F} s elapsed " + "${listItem.currentProgress.bytesUploaded} bytes in ${elapsedTime / 1000F} s elapsed " holder.viewBinding.mediastoreVideoFilesize.text = "${formattedRate} KBytes/s" if (listItem.isRunning) { diff --git a/app/src/main/java/com/mux/video/vod/demo/upload/screen/CreateUploadScreen.kt b/app/src/main/java/com/mux/video/vod/demo/upload/screen/CreateUploadScreen.kt index 63ee9556..c3709ac1 100644 --- a/app/src/main/java/com/mux/video/vod/demo/upload/screen/CreateUploadScreen.kt +++ b/app/src/main/java/com/mux/video/vod/demo/upload/screen/CreateUploadScreen.kt @@ -86,12 +86,16 @@ private fun RequestPermissionsEffect(context: Context) { launcher.launch( arrayOf( Manifest.permission.READ_EXTERNAL_STORAGE, - Manifest.permission.READ_MEDIA_VIDEO + Manifest.permission.READ_MEDIA_VIDEO, + Manifest.permission.WRITE_EXTERNAL_STORAGE ) ) } else { launcher.launch( - arrayOf(Manifest.permission.READ_EXTERNAL_STORAGE) + arrayOf( + Manifest.permission.READ_EXTERNAL_STORAGE, + Manifest.permission.WRITE_EXTERNAL_STORAGE + ) ) } } // MainScope().launch { @@ -293,10 +297,18 @@ private fun hasPermissions(context: Context): Boolean { context, Manifest.permission.READ_MEDIA_VIDEO ) == PackageManager.PERMISSION_GRANTED + ContextCompat.checkSelfPermission( + context, + Manifest.permission.WRITE_EXTERNAL_STORAGE + ) == PackageManager.PERMISSION_GRANTED } else { ContextCompat.checkSelfPermission( context, Manifest.permission.READ_EXTERNAL_STORAGE ) == PackageManager.PERMISSION_GRANTED + ContextCompat.checkSelfPermission( + context, + Manifest.permission.WRITE_EXTERNAL_STORAGE + ) == PackageManager.PERMISSION_GRANTED } } diff --git a/app/src/main/java/com/mux/video/vod/demo/upload/screen/UploadListScreen.kt b/app/src/main/java/com/mux/video/vod/demo/upload/screen/UploadListScreen.kt index e1340a46..39d5f7b5 100644 --- a/app/src/main/java/com/mux/video/vod/demo/upload/screen/UploadListScreen.kt +++ b/app/src/main/java/com/mux/video/vod/demo/upload/screen/UploadListScreen.kt @@ -182,7 +182,7 @@ private fun ListItemContent(upload: MuxUpload) { ErrorOverlay(modifier = Modifier.fillMaxSize()) } else if (upload.isRunning) { ProgressOverlay( - upload.currentState, + upload.currentProgress, modifier = Modifier .align(Alignment.BottomStart) .fillMaxWidth() diff --git a/app/src/main/java/com/mux/video/vod/demo/upload/viewmodel/CreateUploadViewModel.kt b/app/src/main/java/com/mux/video/vod/demo/upload/viewmodel/CreateUploadViewModel.kt index 0c451112..a0588743 100644 --- a/app/src/main/java/com/mux/video/vod/demo/upload/viewmodel/CreateUploadViewModel.kt +++ b/app/src/main/java/com/mux/video/vod/demo/upload/viewmodel/CreateUploadViewModel.kt @@ -194,4 +194,5 @@ class CreateUploadViewModel(private val app: Application) : AndroidViewModel(app val thumbnail: Bitmap? = null, val uploadUri: Uri? = null ) + } diff --git a/library/build.gradle b/library/build.gradle index d9590f84..e1332656 100644 --- a/library/build.gradle +++ b/library/build.gradle @@ -56,6 +56,7 @@ muxDistribution { dependencies { implementation "com.squareup.okhttp3:logging-interceptor:4.11.0" implementation "com.squareup.okhttp3:okhttp:4.11.0" + implementation "io.github.crow-misia.libyuv:libyuv-android:0.27.0" implementation 'androidx.core:core-ktx:1.10.1' implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.2" diff --git a/library/src/main/java/com/mux/video/upload/MuxUploadSdk.kt b/library/src/main/java/com/mux/video/upload/MuxUploadSdk.kt index ede20dd2..8e0c1408 100644 --- a/library/src/main/java/com/mux/video/upload/MuxUploadSdk.kt +++ b/library/src/main/java/com/mux/video/upload/MuxUploadSdk.kt @@ -66,11 +66,11 @@ object MuxUploadSdk { @Suppress("unused") @JvmOverloads fun initialize(appContext: Context, resumeStoppedUploads: Boolean = true) { + MuxUploadManager.appContext = appContext; initializeUploadPersistence(appContext) UploadMetrics.initialize(appContext) - if (resumeStoppedUploads) { - val upl = MuxUploadManager.resumeAllCachedJobs() + MuxUploadManager.resumeAllCachedJobs() } } diff --git a/library/src/main/java/com/mux/video/upload/api/MuxUpload.kt b/library/src/main/java/com/mux/video/upload/api/MuxUpload.kt index 4a466ae5..5e6ef6e6 100644 --- a/library/src/main/java/com/mux/video/upload/api/MuxUpload.kt +++ b/library/src/main/java/com/mux/video/upload/api/MuxUpload.kt @@ -31,23 +31,33 @@ import java.io.File * @see MuxUploadManager */ class MuxUpload private constructor( - private var uploadInfo: UploadInfo, private val autoManage: Boolean = true + private var uploadInfo: UploadInfo, + private val autoManage: Boolean = true, + initialStatus: UploadStatus = UploadStatus.Ready ) { /** * File containing the video to be uploaded */ - val videoFile: File get() = uploadInfo.file + val videoFile: File get() = uploadInfo.inputFile /** * The current state of the upload. To be notified of state updates, you can use * [setProgressListener] and [setResultListener] */ - val currentState: Progress - get() = lastKnownState ?: uploadInfo.progressFlow?.replayCache?.firstOrNull() ?: Progress( + val currentProgress: Progress + get() = lastKnownProgress ?: uploadInfo.statusFlow?.value?.getProgress() ?: Progress( totalBytes = videoFile.length() ) + /** + * The current status of this upload. + * + * To be notified of status updates (including upload progress), use [setStatusListener] + */ + @Suppress("MemberVisibilityCanBePrivate") + val uploadStatus: UploadStatus + /** * True when the upload is running, false if it's paused, failed, or canceled */ @@ -56,7 +66,7 @@ class MuxUpload private constructor( /** * If the upload has failed, gets the error associated with the failure */ - val error get() = _error ?: uploadInfo.errorFlow?.replayCache?.firstOrNull() + val error get() = _error ?: uploadInfo.statusFlow?.value?.getError() private var _error: Exception? = null /** @@ -67,15 +77,17 @@ class MuxUpload private constructor( private var resultListener: UploadEventListener>? = null private var progressListener: UploadEventListener? = null + private var statusListener: UploadEventListener? = null private var observerJob: Job? = null - private var lastKnownState: Progress? = null + private var currentStatus: UploadStatus = UploadStatus.Ready + private val lastKnownProgress: Progress? get() = currentStatus.getProgress() private val callbackScope: CoroutineScope = MainScope() private val logger get() = MuxUploadSdk.logger init { // Catch Events if an upload was already in progress - maybeObserveUpload(uploadInfo) + observeUpload(uploadInfo) } /** @@ -112,8 +124,8 @@ class MuxUpload private constructor( /*uploadInfo =*/ MuxUploadSdk.uploadJobFactory().createUploadJob(uploadInfo, coroutineScope) } - logger.i("MuxUpload", "started upload: ${uploadInfo.file}") - maybeObserveUpload(uploadInfo) + logger.i("MuxUpload", "started upload: ${uploadInfo.inputFile}") + observeUpload(uploadInfo) } /** @@ -128,17 +140,14 @@ class MuxUpload private constructor( @Throws @Suppress("unused") @JvmSynthetic - suspend fun awaitSuccess(): Result { - val result = uploadInfo.successFlow?.replayCache?.firstOrNull() - return if (result != null) { - Result.success(result) // If we succeeded already, don't start again + suspend fun awaitSuccess(): Result { + val status = uploadStatus // base our logic on a stable snapshot of the status + return if (status is UploadStatus.UploadSuccess) { + Result.success(status) // If we succeeded already, don't start again } else { coroutineScope { startInner(coroutineScope = this) - uploadInfo.uploadJob?.let { job -> - val result = job.await() - result - } ?: Result.failure(Exception("Upload failed to start")) + uploadInfo.uploadJob?.await() ?: Result.failure(Exception("Upload failed to start")) } } } @@ -158,12 +167,10 @@ class MuxUpload private constructor( uploadInfo.uploadJob?.cancel() /*uploadInfo =*/ uploadInfo.update( uploadJob = null, - successFlow = null, - errorFlow = null, - progressFlow = null, + statusFlow = null, ) } - lastKnownState?.let { state -> progressListener?.onEvent(state) } + lastKnownProgress?.let { state -> progressListener?.onEvent(state) } } /** @@ -177,79 +184,99 @@ class MuxUpload private constructor( } else { uploadInfo.uploadJob?.cancel("user requested cancel") } - lastKnownState = null observerJob?.cancel("user requested cancel") } /** * Sets a listener for progress updates on this upload + * + * @see setStatusListener */ @MainThread fun setProgressListener(listener: UploadEventListener?) { progressListener = listener - lastKnownState?.let { listener?.onEvent(it) } + lastKnownProgress?.let { listener?.onEvent(it) } } /** * Sets a listener for success or failure updates on this upload + * + * @see setStatusListener */ @MainThread fun setResultListener(listener: UploadEventListener>) { resultListener = listener - lastKnownState?.let { + lastKnownProgress?.let { if (it.bytesUploaded >= it.totalBytes) { listener.onEvent(Result.success(it)) } } } + /** + * Set a listener for the overall status of this upload. + * + * @see UploadStatus + */ + @MainThread + fun setStatusListener(listener: UploadEventListener?) { + statusListener = listener + listener?.onEvent(currentStatus) + } + /** * Clears all listeners set on this object */ + @Suppress("unused") @MainThread fun clearListeners() { resultListener = null progressListener = null + statusListener = null } private fun newObserveProgressJob(upload: UploadInfo): Job { - // This job has up to three children, one for each of the state flows on UploadInfo + // Job that collects and notifies state updates on the main thread (suspending on main is safe) return callbackScope.launch { - upload.errorFlow?.let { flow -> + upload.statusFlow?.let { flow -> launch { - flow.collect { error -> - if (error !is CancellationException) { // Canceled uploads shouldn't generate events - _error = error - resultListener?.onEvent(Result.failure(error)) + flow.collect { status -> + // Update the status of our upload + currentStatus = status + statusListener?.onEvent(status) + + // Notify the old listeners + when (status) { + is UploadStatus.Uploading -> { progressListener?.onEvent(status.uploadProgress) } + is UploadStatus.UploadPaused -> { progressListener?.onEvent(status.uploadProgress) } + is UploadStatus.UploadSuccess -> { + progressListener?.onEvent(status.uploadProgress) + resultListener?.onEvent(Result.success(status.uploadProgress)) + } + is UploadStatus.UploadFailed -> { + progressListener?.onEvent(status.uploadProgress) // Make sure we're most up-to-date + if (status.exception !is CancellationException) { + _error = status.exception + resultListener?.onEvent(Result.failure(status.exception)) + } + } + else -> { } // no relevant info } } } } - upload.successFlow?.let { flow -> - launch { - flow.collect { state -> - lastKnownState = state - _successful = true - resultListener?.onEvent(Result.success(state)) - } - } - } - upload.progressFlow?.let { flow -> - launch { - flow.collect { state -> - lastKnownState = state - progressListener?.onEvent(state) - } - } - } } } - private fun maybeObserveUpload(uploadInfo: UploadInfo) { + private fun observeUpload(uploadInfo: UploadInfo) { observerJob?.cancel("switching observers") observerJob = newObserveProgressJob(uploadInfo) } + init { + uploadStatus = initialStatus + } + /** * The current progress of an upload, in terms of time elapsed and data transmitted */ @@ -293,31 +320,39 @@ class MuxUpload private constructor( * @param videoFile a File that represents the video file you want to upload */ @Suppress("unused") - constructor(uploadUri: String, videoFile: File) : this(Uri.parse(uploadUri), videoFile) + constructor(uploadUri: String, videoFile: File) + : this(Uri.parse(uploadUri), videoFile) private var manageTask: Boolean = true private var uploadInfo: UploadInfo = UploadInfo( // Default values remoteUri = uploadUri, - file = videoFile, + inputFile = videoFile, chunkSize = 8 * 1024 * 1024, // GCP recommends at least 8M chunk size retriesPerChunk = 3, + standardizationRequested = true, optOut = false, uploadJob = null, - successFlow = null, - progressFlow = null, - errorFlow = null + statusFlow = null, ) - /** * Allow Mux to manage and remember the state of this upload */ @Suppress("unused") fun manageUploadTask(autoManage: Boolean): Builder { - manageTask = autoManage; + manageTask = autoManage return this } + /** + * If requested, the Upload SDK will try to standardize the input file in order to optimize it + * for use with Mux Video + */ + @Suppress("unused") + fun standardizationRequested(enabled: Boolean) { + uploadInfo.update(standardizationRequested = enabled) + } + /** * The Upload SDK will upload your file in smaller chunks, which can be more reliable in adverse * network conditions. @@ -361,7 +396,12 @@ class MuxUpload private constructor( } internal companion object { + /** + * Internal constructor-like method for creating instances of this class from the + * [MuxUploadManager] + */ @JvmSynthetic - internal fun create(uploadInfo: UploadInfo) = MuxUpload(uploadInfo) + internal fun create(uploadInfo: UploadInfo, initialStatus: UploadStatus = UploadStatus.Ready) + = MuxUpload(uploadInfo = uploadInfo, initialStatus = initialStatus) } } diff --git a/library/src/main/java/com/mux/video/upload/api/MuxUploadManager.kt b/library/src/main/java/com/mux/video/upload/api/MuxUploadManager.kt index 6c6cfd26..fc1c42c0 100644 --- a/library/src/main/java/com/mux/video/upload/api/MuxUploadManager.kt +++ b/library/src/main/java/com/mux/video/upload/api/MuxUploadManager.kt @@ -1,9 +1,14 @@ package com.mux.video.upload.api +import android.content.Context import androidx.annotation.MainThread import com.mux.video.upload.MuxUploadSdk +import com.mux.video.upload.api.MuxUploadManager.allUploadJobs +import com.mux.video.upload.api.MuxUploadManager.findUploadByFile +import com.mux.video.upload.api.MuxUploadManager.resumeAllCachedJobs import com.mux.video.upload.internal.* import kotlinx.coroutines.* +import kotlinx.coroutines.flow.filter import java.io.File /** @@ -20,6 +25,7 @@ import java.io.File */ object MuxUploadManager { + public var appContext: Context? = null; private val mainScope = MainScope() private val uploadsByFilename: MutableMap = mutableMapOf() @@ -94,15 +100,13 @@ object MuxUploadManager { internal fun pauseJob(upload: UploadInfo): UploadInfo { assertMainThread() // Paused jobs stay in the manager and remain persisted - uploadsByFilename[upload.file.absolutePath]?.let { + uploadsByFilename[upload.inputFile.absolutePath]?.let { cancelJobInner(it) val pausedUpload = upload.update( uploadJob = null, - progressFlow = null, - errorFlow = null, - successFlow = null, + statusFlow = null, ) - uploadsByFilename[pausedUpload.file.absolutePath] = pausedUpload + uploadsByFilename[pausedUpload.inputFile.absolutePath] = pausedUpload return pausedUpload } notifyListListeners() @@ -113,10 +117,10 @@ object MuxUploadManager { @MainThread internal fun cancelJob(upload: UploadInfo) { assertMainThread() - uploadsByFilename[upload.file.absolutePath]?.let { - observerJobsByFilename.remove(upload.file.absolutePath)?.cancel() + uploadsByFilename[upload.inputFile.absolutePath]?.let { + observerJobsByFilename.remove(upload.inputFile.absolutePath)?.cancel() cancelJobInner(it) - uploadsByFilename -= it.file.absolutePath + uploadsByFilename -= it.inputFile.absolutePath forgetUploadState(upload) } notifyListListeners() @@ -126,8 +130,8 @@ object MuxUploadManager { @MainThread internal fun jobFinished(upload: UploadInfo, forgetJob: Boolean = true) { assertMainThread() - observerJobsByFilename.remove(upload.file.absolutePath)?.cancel() - uploadsByFilename -= upload.file.absolutePath + observerJobsByFilename.remove(upload.inputFile.absolutePath)?.cancel() + uploadsByFilename -= upload.inputFile.absolutePath if (forgetJob) { forgetUploadState(upload) } @@ -146,7 +150,7 @@ object MuxUploadManager { } private fun insertOrUpdateUpload(upload: UploadInfo, restart: Boolean): UploadInfo { - val filename = upload.file.absolutePath + val filename = upload.inputFile.absolutePath var newUpload = uploadsByFilename[filename] // Use the old job if possible (unless requested otherwise) if (newUpload?.uploadJob == null) { @@ -160,16 +164,22 @@ object MuxUploadManager { newUpload = startUploadJob(upload) } } - uploadsByFilename += upload.file.absolutePath to newUpload - observerJobsByFilename[upload.file.absolutePath]?.cancel() - observerJobsByFilename += upload.file.absolutePath to newObserveProgressJob(newUpload) + uploadsByFilename += upload.inputFile.absolutePath to newUpload + observerJobsByFilename[upload.inputFile.absolutePath]?.cancel() + observerJobsByFilename += upload.inputFile.absolutePath to newObserveProgressJob(newUpload) return newUpload } private fun newObserveProgressJob(upload: UploadInfo): Job { - // This job has up to three children, one for each of the state flows on UploadInfo + // Clear finished uploads from cache and storage return mainScope.launch { - upload.successFlow?.let { flow -> launch { flow.collect { jobFinished(upload) } } } + upload.statusFlow?.let { statusFlow -> + launch { + statusFlow + .filter { it is UploadStatus.UploadSuccess } + .collect { jobFinished(upload) } + } + } } } diff --git a/library/src/main/java/com/mux/video/upload/api/UploadStatus.kt b/library/src/main/java/com/mux/video/upload/api/UploadStatus.kt new file mode 100644 index 00000000..365350ae --- /dev/null +++ b/library/src/main/java/com/mux/video/upload/api/UploadStatus.kt @@ -0,0 +1,76 @@ +package com.mux.video.upload.api + +/** + * The current state of the upload. Uploads are first examined, potentially processed, then uploaded + * to Mux Video. + */ +sealed class UploadStatus { + + // Java Compatibility + + /** + * The progress, in bytes, of the upload. If the file isn't uploading yet, this will be null. + * + * This is provided for java callers. Kotlin callers can use [UploadStatus] as a sealed class as + * normal + */ + open fun getProgress(): MuxUpload.Progress? = null + + /** + * If this upload failed, returns the error that caused the failure + * + * This is provided for java callers. Kotlin callers can use [UploadStatus] as a sealed class as + * normal + */ + open fun getError(): Exception? = null + + // Subclasses + + /** + * This upload hos not been started. It is ready to start by calling [MuxUpload.start] + */ + object Ready: UploadStatus() + + /** + * This upload has been started via [MuxUpload.start] but has not yet started processing anything + */ + object Started: UploadStatus() + + /** + * This upload is being prepared. If standardization is required, it is done during this step + * + * @see MuxUpload.Builder.standardizationRequested + */ + object Preparing: UploadStatus() + + /** + * The upload is currently being sent to Mux Video. The progress is available + */ + class Uploading(val uploadProgress: MuxUpload.Progress): UploadStatus() { + override fun getProgress(): MuxUpload.Progress = uploadProgress + } + + /** + * The upload is currently paused. Part of the video file may have already been uploaded to Mux + * Video, but no data is currently being sent + */ + class UploadPaused(val uploadProgress: MuxUpload.Progress): UploadStatus() { + override fun getProgress(): MuxUpload.Progress = uploadProgress + } + + /** + * The upload has failed. Part of the file may have already been uploaded, and this upload can be + * resumed from this state via [MuxUpload.start] + */ + class UploadFailed(val exception: Exception, val uploadProgress: MuxUpload.Progress): UploadStatus() { + override fun getError(): Exception = exception + override fun getProgress(): MuxUpload.Progress = uploadProgress + } + + /** + * The upload succeeded. The file has been uploaded to Mux Video and will be processed shortly + */ + class UploadSuccess(val uploadProgress: MuxUpload.Progress): UploadStatus() { + override fun getProgress(): MuxUpload.Progress = uploadProgress + } +} diff --git a/library/src/main/java/com/mux/video/upload/internal/TranscoderContext.kt b/library/src/main/java/com/mux/video/upload/internal/TranscoderContext.kt new file mode 100644 index 00000000..fcd9093b --- /dev/null +++ b/library/src/main/java/com/mux/video/upload/internal/TranscoderContext.kt @@ -0,0 +1,622 @@ +package com.mux.video.upload.internal + +import android.content.Context +import android.media.* +import android.media.MediaCodec.BufferInfo +import android.media.MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420SemiPlanar +import android.os.Build +import androidx.annotation.RequiresApi +import com.mux.video.upload.MuxUploadSdk +import io.github.crow_misia.libyuv.FilterMode +import io.github.crow_misia.libyuv.Nv12Buffer +import java.io.File +import java.nio.ByteBuffer +import java.util.* +import kotlin.experimental.and + + +@RequiresApi(Build.VERSION_CODES.LOLLIPOP) +internal class TranscoderContext private constructor( + private var uploadInfo: UploadInfo, + private val appContext: Context +) { + private val logger get() = MuxUploadSdk.logger + + val MAX_ALLOWED_BITRATE = 8000000 + val MAX_ALLOWED_FRAMERATE = 120; + val MAX_ALLOWED_WIDTH = 1920 + val MAX_ALLOWED_HEIGTH = 1080 + val OPTIMAL_FRAMERATE = 30 + val I_FRAME_INTERVAL = 5 // in seconds + + private val extractor: MediaExtractor = MediaExtractor() + private var muxer: MediaMuxer? = null + private var videoTrackIndex = -10 + private var audioTrackIndex = -10 + private var outputVideoTrackIndex = -1 + private var outputAudioTrackIndex = -1 + // Used to configure decoders + private var inputAudioFormat: MediaFormat? = null + private var inputVideoFormat: MediaFormat? = null + + // Used to configure encoders + private var outputAudioFormat: MediaFormat? = null + private var outputVideoFormat: MediaFormat? = null + + // This is what decoder actually provide as an output, bit different then what we used to configure it + private var videoDecoderOutputFormat: MediaFormat? = null + private var decodedFrameWidth: Int = -1; + private var decodedFrameHeight: Int = -1; + private var targetedWidth = -1 + private var targetedHeight = -1 + private var targetedFramerate = -1 + private var targetedBitrate = -1 + private var scaledSizeYuv: Nv12Buffer? = null + val audioFrames = ArrayList() + + // Input parameters + private var inputWidth = -1 + private var inputHeighth = -1 + private var inputBitrate = -1 + private var inputFramerate = -1 + + // Wait indefinetly for negative value, exit imidetly on 0, or timeout after a given us+ + private var dequeueTimeout:Long = 0; + private var eofReached: Boolean = false; + private var transcodeAudio = false; + private var muxerConfigured = false; + private var numberOfDecodedFrames = 0; + private var numberOfEncodedFrames = 0; + private var numberOfInputFrames = -1; + + private var videoDecoder:MediaCodec? = null + private var audioDecoder:MediaCodec? = null + private var videoEncoder:MediaCodec? = null + private var audioEncoder:MediaCodec? = null + var fileTranscoded = false + private var configured = false + + companion object { + const val LOG_TAG = "TranscoderContext" + + @JvmSynthetic + internal fun create(uploadInfo: UploadInfo, appContext: Context): TranscoderContext { + return TranscoderContext(uploadInfo, appContext) + } + } + + private fun getHWCapableEncoders(mimeType: String): ArrayList { + val list = MediaCodecList(MediaCodecList.REGULAR_CODECS); + var result:ArrayList = ArrayList(); + for(codecInfo in list.codecInfos) { + logger.v("CodecInfo", codecInfo.name) + if(codecInfo.name.contains(mimeType) && codecInfo.isEncoder && codecInfo.isHardwareAcceleratedCompat) { + result.add(codecInfo); + } + } + return result; + } + + private fun configure() { + val cacheDir = File(appContext.cacheDir, "mux-upload") + cacheDir.mkdirs() + val destFile = File(cacheDir, UUID.randomUUID().toString() + ".mp4") + destFile.createNewFile() + + muxer = MediaMuxer(destFile.absolutePath, MediaMuxer.OutputFormat.MUXER_OUTPUT_MPEG_4) + uploadInfo = uploadInfo.update(standardizedFile = destFile) + + try { + extractor.setDataSource(uploadInfo.inputFile.absolutePath) + configureDecoders() + configured = true + } catch (e:Exception) { + logger.e(LOG_TAG, "Failed to initialize.", e) + } + } + + private fun checkIfTranscodingIsNeeded(): Boolean { + var shouldStandardize = false + for (i in 0 until extractor.trackCount) { + val format = extractor.getTrackFormat(i) + val mime = format.getString(MediaFormat.KEY_MIME) + var inputDuration:Long = -1; + if (mime?.lowercase()?.contains("video") == true) { + inputWidth = format.getInteger(MediaFormat.KEY_WIDTH) + inputHeighth = format.getInteger(MediaFormat.KEY_HEIGHT) + // Check if resolution is greater then 720p + if ((inputWidth > MAX_ALLOWED_WIDTH && inputHeighth > MAX_ALLOWED_HEIGTH) + || (inputHeighth > MAX_ALLOWED_WIDTH && inputWidth > MAX_ALLOWED_HEIGTH)) { + logger.v(LOG_TAG, "Should standardize because the size is incorrect") + shouldStandardize = true + if(inputWidth > inputHeighth) { + targetedWidth = MAX_ALLOWED_WIDTH + targetedHeight = targetedWidth * (inputHeighth / inputWidth) + } else { + targetedHeight = MAX_ALLOWED_WIDTH + targetedWidth = targetedHeight * (inputWidth / inputHeighth) + } + } else { + targetedWidth = inputWidth + targetedHeight = inputHeighth + } + scaledSizeYuv = Nv12Buffer.allocate(targetedWidth, targetedHeight) + + // Check if compersion is h264 + if (!mime.equals(MediaFormat.MIMETYPE_VIDEO_AVC)) { + logger.v(LOG_TAG, "Should standardize because the input is not h.264") + shouldStandardize = true + } + inputBitrate = format.getIntegerCompat(MediaFormat.KEY_BIT_RATE, -1) + inputDuration = format.getLongCompat(MediaFormat.KEY_DURATION, -1) + if (inputBitrate == -1 && inputDuration != -1L) { + inputBitrate = ((uploadInfo.inputFile.length() * 8) / (inputDuration / 1000000)).toInt() + } + if (inputBitrate > MAX_ALLOWED_BITRATE) { + logger.v(LOG_TAG, "Should standardize because the input bitrate is too high") + shouldStandardize = true + targetedBitrate = MAX_ALLOWED_BITRATE + } + inputFramerate = format.getIntegerCompat(MediaFormat.KEY_FRAME_RATE, -1) + if (inputFramerate > MAX_ALLOWED_FRAMERATE) { + logger.v(LOG_TAG, "Should standardize because the input frame rate is too high") + shouldStandardize = true + targetedFramerate = OPTIMAL_FRAMERATE + } else { + targetedFramerate = inputFramerate + } + videoTrackIndex = i; + inputVideoFormat = format; + extractor.selectTrack(i) + } + if (mime?.lowercase()?.contains("audio") == true) { + // TODO check if audio need to be standardized + audioTrackIndex = i; + inputAudioFormat = format; + extractor.selectTrack(i) + } + } + + return shouldStandardize + } + + private fun configureDecoders() { + // Init decoders and encoders + numberOfInputFrames = inputVideoFormat!!.getIntegerCompat("frame-count", -1) + videoDecoder = + MediaCodec.createDecoderByType(inputVideoFormat!!.getString(MediaFormat.KEY_MIME)!!) + videoDecoder!!.configure(inputVideoFormat, null, null, 0) + videoDecoder!!.start() + if (transcodeAudio) { + audioDecoder = + MediaCodec.createDecoderByType(inputAudioFormat!!.getString(MediaFormat.KEY_MIME)!!) + audioDecoder!!.configure(inputAudioFormat, null, null, 0) + audioDecoder!!.start() + } + } + + private fun configureEncoders() { + // We will need this when we apply the image resize + decodedFrameWidth = videoDecoderOutputFormat!!.getInteger(MediaFormat.KEY_WIDTH) + decodedFrameHeight = videoDecoderOutputFormat!!.getInteger(MediaFormat.KEY_HEIGHT) + + outputVideoFormat = MediaFormat.createVideoFormat(MediaFormat.MIMETYPE_VIDEO_AVC, targetedWidth, targetedHeight) + // This is NV12 actually + outputVideoFormat!!.setInteger(MediaFormat.KEY_COLOR_FORMAT, + COLOR_FormatYUV420SemiPlanar) + + outputVideoFormat!!.setInteger(MediaFormat.KEY_ROTATION, inputVideoFormat!!.getInteger(MediaFormat.KEY_ROTATION)) + outputVideoFormat!!.setInteger( + MediaFormat.KEY_FRAME_RATE, + targetedFramerate + ) + outputVideoFormat!!.setInteger("slice-height", targetedHeight + targetedHeight/2); + outputVideoFormat!!.setInteger("stride", targetedWidth); + outputVideoFormat!!.setInteger(MediaFormat.KEY_I_FRAME_INTERVAL, I_FRAME_INTERVAL); + outputVideoFormat!!.setInteger(MediaFormat.KEY_BITRATE_MODE, MediaCodecInfo.EncoderCapabilities.BITRATE_MODE_VBR) + outputVideoFormat!!.setInteger(MediaFormat.KEY_BIT_RATE, targetedBitrate) + // configure output audio format, if input format is already AAC, then just do copy + transcodeAudio = !inputAudioFormat!!.getString(MediaFormat.KEY_MIME)?.contains(MediaFormat.MIMETYPE_AUDIO_AAC)!! + if (transcodeAudio) { + outputAudioFormat = MediaFormat.createAudioFormat(MediaFormat.MIMETYPE_AUDIO_AAC, 48000, 2) + outputAudioFormat!!.setString(MediaFormat.KEY_AAC_PROFILE, "2") + outputAudioFormat!!.setString(MediaFormat.KEY_PROFILE, "2") + } else { + outputAudioFormat = inputAudioFormat + } + + val encoders = getHWCapableEncoders("avc") + for (encoder in encoders) { + try { + val codecCap = encoder.getCapabilitiesForType("video/avc") + for (profile in codecCap.profileLevels ) { + if (profile.profile == MediaCodecInfo.CodecProfileLevel.AVCProfileHigh ) { + outputVideoFormat!!.setInteger(MediaFormat.KEY_PROFILE, profile.profile); + outputVideoFormat!!.setInteger(MediaFormat.KEY_LEVEL, profile.level) + break + } + } + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.P) { + outputVideoFormat!!.setInteger(MediaFormat.KEY_QUALITY, codecCap.encoderCapabilities.qualityRange.upper) + } + videoEncoder = MediaCodec.createByCodecName(encoder.name) + // Check if B-frame encoding is supported + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { + if (codecCap.isFeatureSupported("FEATURE_B_FRAME")) { + // Enable B-frames by setting the appropriate parameter + outputVideoFormat!!.setInteger(MediaFormat.KEY_MAX_B_FRAMES, 2) + outputVideoFormat!!.setInteger(MediaFormat.KEY_OUTPUT_REORDER_DEPTH, 2) + } + } + videoEncoder!!.configure(outputVideoFormat,null, null, MediaCodec.CONFIGURE_FLAG_ENCODE) + break; + } catch (err:java.lang.Exception) { + logger.w(LOG_TAG, "Couldn't evaluate encoder ${encoder.name}. Skipping it", err) + } + } + videoEncoder!!.start() + if (transcodeAudio) { + audioEncoder = + MediaCodec.createEncoderByType(outputAudioFormat!!.getString(MediaFormat.KEY_MIME)!!) + audioEncoder!!.configure(outputAudioFormat, null, null, 0) + audioEncoder!!.start() + } + } + + private fun releaseCodecs() { + logger.v(LOG_TAG, "releaseCodecs(): called") + videoDecoder?.safeDispose() + audioDecoder?.safeDispose() + videoEncoder?.safeDispose() + audioEncoder?.safeDispose() + extractor.safeDispose() + muxer?.safeDispose() + } + + private fun configureMuxer() { + outputVideoTrackIndex = muxer!!.addTrack(videoEncoder!!.outputFormat) + muxer!!.setOrientationHint(inputVideoFormat!!.getInteger(MediaFormat.KEY_ROTATION)) + if (inputAudioFormat != null) { + outputAudioTrackIndex = muxer!!.addTrack(outputAudioFormat!!) + } + muxer!!.start() + } + + @JvmSynthetic + internal fun process(): UploadInfo { + logger.v(LOG_TAG, "process() starting") + if (!checkIfTranscodingIsNeeded()) { + logger.i(LOG_TAG, "Standardization was not required. Skipping") + return uploadInfo + } + + logger.i(LOG_TAG, "Standardizing input") + configure() + if (!configured) { + logger.e( + LOG_TAG, + "Skipped: Components could not be configured. Check the logs for errors" + ) + return uploadInfo; + } + + val started = System.currentTimeMillis() + try { + extractor.selectTrack(videoTrackIndex) + while (!eofReached) { + if (extractor.sampleTrackIndex == audioTrackIndex) { + muxAudioFrame() + } else { + muxVideoFrame() + } + } + // Get queued frames from encoder/decoder when we reach EOF + videoDecoder!!.flush() + videoEncoder!!.flush() + muxVideoFrame() + } catch (err:Exception) { + logger.e(LOG_TAG, "Failed to standardize input file ${uploadInfo.inputFile}", err) + } finally { + releaseCodecs() + fileTranscoded = true + } + val duration = System.currentTimeMillis() - started + logger.i(LOG_TAG, "Transcoding duration time: $duration") + logger.i(LOG_TAG, "Original file size: ${uploadInfo.inputFile.length()}") + logger.i(LOG_TAG, "Transcoded file size: ${uploadInfo.standardizedFile?.length()}") + + return uploadInfo + } + + private fun muxVideoFrame() { + val frames = getVideoFrames() + for (frame in frames) { +// if (frame.isBFrame()) { +// Log.i("Muxer", "We got B frame"); +// } + if (frame.isKeyFrame()) { + logger.i( + "Muxer", "Muxed video sample, size: " + frame.info.size + + ", pts: " + frame.info.presentationTimeUs + ) + } + muxer!!.writeSampleData(outputVideoTrackIndex, frame.buff, frame.info) + } + } + + private fun muxAudioFrame() { + val audioFrame = getNextAudioFrame() + // This is an audio frame, for now just copy, in the future, transcode maybe + if (outputAudioTrackIndex == -1) { + // Muxer not initialized yet, store these and mux later +// Log.i( +// "Muxer", "Not ready, save audio frame for later muxing, pts: " +// + audioFrame!!.info.presentationTimeUs +// ) + audioFrames.add(audioFrame!!) + } else { + // if we have some accumulated audio samples write them first + for (audioFrame in audioFrames) { +// Log.i( +// "Muxer", "Muxing accumulated audio frame, pts: " +// + audioFrame.info.presentationTimeUs +// ) + muxAudioFrame(audioFrame) + } + audioFrames.clear() + muxAudioFrame(audioFrame!!) + } + } + + private fun muxAudioFrame(frame:AVFrame) { + muxer!!.writeSampleData( + outputAudioTrackIndex, + frame.buff, + frame.info + ) +// Log.i( +// "Muxer", "Muxed audio sample, size: " + frame.info.size +// + ", pts: " + frame.info.presentationTimeUs +// ) + } + + private fun getVideoFrames() : ArrayList { + // TODO if EOF is reached maybe call flush on decoder and encoder some frames may still be in there + if (!eofReached) { + // This will advance the extractor + feedVideoDecoder() + } + val decodedFrames = getDecodedVideoFrame() + for (decoded in decodedFrames ) { + feedVideoEncoder(decoded); + decoded.release() + } + return getEncodedVideoFrames() + } + + private fun getNextAudioFrame(): AVFrame? { + val extractorBuffer:ByteBuffer = ByteBuffer.allocate(1024) + val extractedFrame = AVFrame(-1, extractorBuffer, BufferInfo(), isRaw = false) + val sampleSize = extractor.readSampleData(extractorBuffer, 0); + if (sampleSize == -1) { + eofReached = true; + // TODO fuls encoders / decoders + return null; + } else { + extractedFrame.info.size = sampleSize + extractedFrame.info.presentationTimeUs = extractor.sampleTime + extractor.advance() + } + return extractedFrame; + } + + private fun feedVideoDecoder() { + val inIndex: Int = videoDecoder!!.dequeueInputBuffer(dequeueTimeout) + if (inIndex >= 0) { + val buffer: ByteBuffer = videoDecoder!!.getInputBuffer(inIndex)!!; + val sampleSize = extractor.readSampleData(buffer, 0) + if (sampleSize < 0) { + // We have reached the end of video + eofReached = true; +// return null + } else { + videoDecoder!!.queueInputBuffer(inIndex, 0, sampleSize, extractor.sampleTime, 0) + extractor.advance() + numberOfInputFrames++ + } + } + } + + private fun getDecodedVideoFrame():ArrayList { + var info = BufferInfo() + var outputBuffer:ByteBuffer? = null + val result = ArrayList() + var outIndex = videoDecoder!!.dequeueOutputBuffer(info, dequeueTimeout); + while(outIndex > 0) { + outputBuffer = videoDecoder!!.getOutputBuffer(outIndex); + numberOfDecodedFrames++; + result.add(AVFrame( + outIndex, outputBuffer!!, info, decodedFrameWidth, decodedFrameHeight, + videoDecoder!!, true + )) + numberOfDecodedFrames++ + outIndex = videoDecoder!!.dequeueOutputBuffer(info, dequeueTimeout); + } + when (outIndex) { + MediaCodec.INFO_OUTPUT_FORMAT_CHANGED -> { + // This give us real image height, to avoid corruptions in video + videoDecoderOutputFormat = videoDecoder!!.outputFormat; + configureEncoders() + } + MediaCodec.INFO_TRY_AGAIN_LATER -> { + // Timedout also not good + } + } + return result + } + + private fun findAnnexBPosition(buff:ByteBuffer, startSearchAt:Int, buffSize:Int): Int { + // We are assuming integer is 4 bytes on every device, we also assume anexB is 4 bytes long + // instead of 3 which is also possible sometimes + for(i in startSearchAt..buffSize - 4) { + if (buff.getInt(i) == 1) { + return i; + } + } + return -1 + } + + private fun convertAnnexBtoAvcc(buff:ByteBuffer, buffSize:Int) { + val positions = ArrayList() + var annexBPos = findAnnexBPosition(buff, 0, buffSize) + while (annexBPos != -1) { + positions.add(annexBPos) + annexBPos = findAnnexBPosition(buff, annexBPos + 4, buffSize) + } + for (i in 0..positions.size -1) { + var naluLength = 0 + if (i == positions.size -1) { + // This is the last position + naluLength = buffSize - positions.get(i) - 4 + } else { + naluLength = positions.get(i + 1) - positions.get(i) -4; + } + buff.position(positions.get(i)) + buff.putInt(naluLength) + } + } + + private fun feedVideoEncoder(rawInput:AVFrame) { + val inIndex: Int = videoEncoder!!.dequeueInputBuffer(dequeueTimeout) + if (inIndex >= 0) { + // Scale input to match output + rawInput.yuvBuffer!!.scale(scaledSizeYuv!!, FilterMode.BILINEAR) + val buffer: ByteBuffer = videoEncoder!!.getInputBuffer(inIndex)!!; + buffer.clear() + scaledSizeYuv!!.write(buffer) + videoEncoder!!.queueInputBuffer(inIndex, 0, + buffer.capacity(), rawInput.info.presentationTimeUs, 0) + } + } + + fun getEncodedVideoFrames():ArrayList { + val result = ArrayList() + if (videoEncoder == null) { + return result; + } + var info = BufferInfo() + var outIndex = videoEncoder!!.dequeueOutputBuffer(info, dequeueTimeout) + var outputBuffer:ByteBuffer? + while (outIndex >= 0) { + if (!muxerConfigured) { + configureMuxer() + muxerConfigured = true; + } + outputBuffer = videoEncoder!!.getOutputBuffer(outIndex) + val buff = ByteBuffer.allocate(info.size) + outputBuffer!!.get(buff.array(), 0, info.size) + result.add(AVFrame(outIndex, buff, info, 0, 0, + videoEncoder, true, false)) + numberOfEncodedFrames++ + videoEncoder!!.releaseOutputBuffer(outIndex, false) + info = BufferInfo() + outIndex = videoEncoder!!.dequeueOutputBuffer(info, dequeueTimeout) + } + return result; + } + + private fun encodeVideoFrame(rawInput:AVFrame): AVFrame? { + val result:AVFrame? = null; + val inIndex: Int = videoEncoder!!.dequeueInputBuffer(dequeueTimeout) + if (inIndex >= 0) { + // Scale input to match output + rawInput.yuvBuffer!!.scale(scaledSizeYuv!!, FilterMode.BILINEAR) + val buffer: ByteBuffer = videoEncoder!!.getInputBuffer(inIndex)!!; + buffer.clear() + scaledSizeYuv!!.write(buffer) + videoEncoder!!.queueInputBuffer(inIndex, 0, + buffer.capacity(), rawInput.info.presentationTimeUs, 0) + } + + var info = BufferInfo() + var outIndex = videoEncoder!!.dequeueOutputBuffer(info, dequeueTimeout) + var outputBuffer:ByteBuffer? + val encodedBuffers:ArrayList = ArrayList() + var totalBufferSize:Int = 0 + while (outIndex >= 0) { + outputBuffer = videoEncoder!!.getOutputBuffer(outIndex) + totalBufferSize += info.size + encodedBuffers.add(AVFrame(outIndex, outputBuffer!!, info, 0, 0, + videoEncoder, true, false)) + numberOfEncodedFrames++ + info = BufferInfo() + outIndex = videoEncoder!!.dequeueOutputBuffer(info, dequeueTimeout) + } + if (encodedBuffers.size > 0) { + val outputBuffer = ByteBuffer.allocate(totalBufferSize) + var offset = 0 + val info = BufferInfo() + for (frame in encodedBuffers) { + // TODO maybe convert annexB to avcc, pay attention, sps and pps are in single buffer +// frame.buff.position(4) + frame.buff.get(outputBuffer.array(), offset, frame.info.size) + offset += frame.info.size + info.flags = info.flags or frame.info.flags + info.presentationTimeUs = frame.info.presentationTimeUs + frame.release() + } + + info.offset = 0 + info.size = totalBufferSize + return AVFrame(outIndex, outputBuffer, info, targetedWidth, + targetedHeight, videoEncoder!!, false, false) + } + return result; + } + + class AVFrame constructor(val index:Int, val buff:ByteBuffer, val info:BufferInfo, + val width:Int = 0, val heigth:Int = 0, + val codec:MediaCodec? = null, val shouldRelease:Boolean = true, val isRaw:Boolean = true){ + + // TODO support other color formats, NV12 is default decoder format for AVC + var yuvBuffer:Nv12Buffer? = null; + + init { + if (isRaw) { + yuvBuffer = Nv12Buffer.wrap(buff, width, heigth) + } + } + + fun release() { + if (shouldRelease) { + codec?.releaseOutputBuffer(index, false); + } + } + + fun getNalType(): Int { + return (buff.get(4) and 0x1F).toInt() + } + + fun isBFrame(): Boolean { + val nalType = getNalType() + return (nalType == 2 || nalType == 3 || nalType == 4) + } + + fun isKeyFrame(): Boolean { + val nalType = getNalType() + // Sometimes key frame is packed with pps and sps + return (nalType == 5 || nalType == 7 || nalType == 8) + } + + fun clone():AVFrame { + val buffCopy = ByteBuffer.allocate(info.size) + buffCopy.get(buff.array(), 0, info.size) + val infoCopy = BufferInfo() + infoCopy.size = info.size + infoCopy.offset = info.offset + infoCopy.presentationTimeUs = info.presentationTimeUs + infoCopy.flags = info.flags + return AVFrame(index, buffCopy, infoCopy, width, heigth, codec, shouldRelease, isRaw) + } + } +} diff --git a/library/src/main/java/com/mux/video/upload/internal/UploadInfo.kt b/library/src/main/java/com/mux/video/upload/internal/UploadInfo.kt index 332d45a9..a0d31406 100644 --- a/library/src/main/java/com/mux/video/upload/internal/UploadInfo.kt +++ b/library/src/main/java/com/mux/video/upload/internal/UploadInfo.kt @@ -1,9 +1,10 @@ package com.mux.video.upload.internal import android.net.Uri +import com.mux.video.upload.api.UploadStatus import com.mux.video.upload.api.MuxUpload import kotlinx.coroutines.Deferred -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow import java.io.File /** @@ -17,15 +18,15 @@ import java.io.File * Job and Flows populated */ internal data class UploadInfo( + @JvmSynthetic internal val standardizationRequested: Boolean = true, @JvmSynthetic internal val remoteUri: Uri, - @JvmSynthetic internal val file: File, + @JvmSynthetic internal val inputFile: File, + @JvmSynthetic internal val standardizedFile: File? = null, @JvmSynthetic internal val chunkSize: Int, @JvmSynthetic internal val retriesPerChunk: Int, @JvmSynthetic internal val optOut: Boolean, - @JvmSynthetic internal val uploadJob: Deferred>?, - @JvmSynthetic internal val successFlow: SharedFlow?, - @JvmSynthetic internal val progressFlow: SharedFlow?, - @JvmSynthetic internal val errorFlow: SharedFlow?, + @JvmSynthetic internal val uploadJob: Deferred>?, + @JvmSynthetic internal val statusFlow: StateFlow?, ) { fun isRunning(): Boolean = uploadJob?.isActive ?: false } @@ -36,23 +37,23 @@ internal data class UploadInfo( */ @JvmSynthetic internal fun UploadInfo.update( + standardizationRequested: Boolean = this.standardizationRequested, remoteUri: Uri = this.remoteUri, - file: File = this.file, + file: File = this.inputFile, + standardizedFile: File? = this.standardizedFile, chunkSize: Int = this.chunkSize, retriesPerChunk: Int = this.retriesPerChunk, optOut: Boolean = this.optOut, - uploadJob: Deferred>? = this.uploadJob, - successFlow: SharedFlow? = this.successFlow, - progressFlow: SharedFlow? = this.progressFlow, - errorFlow: SharedFlow? = this.errorFlow, + uploadJob: Deferred>? = this.uploadJob, + statusFlow: StateFlow? = this.statusFlow, ) = UploadInfo( + standardizationRequested, remoteUri, file, + standardizedFile, chunkSize, retriesPerChunk, optOut, uploadJob, - successFlow, - progressFlow, - errorFlow + statusFlow, ) diff --git a/library/src/main/java/com/mux/video/upload/internal/UploadJobFactory.kt b/library/src/main/java/com/mux/video/upload/internal/UploadJobFactory.kt index 9471964b..67f6a1d5 100644 --- a/library/src/main/java/com/mux/video/upload/internal/UploadJobFactory.kt +++ b/library/src/main/java/com/mux/video/upload/internal/UploadJobFactory.kt @@ -1,15 +1,20 @@ package com.mux.video.upload.internal +import android.os.Build import android.util.Log import com.mux.video.upload.BuildConfig import com.mux.video.upload.MuxUploadSdk import com.mux.video.upload.api.MuxUpload import com.mux.video.upload.api.MuxUploadManager +import com.mux.video.upload.api.UploadStatus import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asStateFlow import java.io.BufferedInputStream import java.io.FileInputStream +import java.io.InputStream import java.util.* /** @@ -56,47 +61,80 @@ internal class UploadJobFactory private constructor( fun createUploadJob(uploadInfo: UploadInfo, outerScope: CoroutineScope): UploadInfo { logger - val successFlow = callbackFlow() - val overallProgressFlow = callbackFlow() - val errorFlow = callbackFlow() - val fileStream = BufferedInputStream(FileInputStream(uploadInfo.file)) - val fileSize = uploadInfo.file.length() + val statusFlow = MutableStateFlow(UploadStatus.Ready) + + var fileStream: InputStream = BufferedInputStream(FileInputStream(uploadInfo.inputFile)) + var fileSize = uploadInfo.inputFile.length() val metrics = UploadMetrics.create() val uploadJob = outerScope.async { + // Inside the async { }, we have officially started + statusFlow.value = UploadStatus.Started + + // This UploadInfo never gets sent outside this coroutine. It contains info related to + // standardizing the the client doesn't need to know/can't know synchronously + var innerUploadInfo = uploadInfo + val startTime = System.currentTimeMillis() try { - var totalBytesSent: Long = getAlreadyTransferredBytes(uploadInfo) + // See if the file need to be converted to a standard input. + if (uploadInfo.standardizationRequested + && Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP + ) { + statusFlow.value = UploadStatus.Preparing + val tcx = TranscoderContext.create(innerUploadInfo, MuxUploadManager.appContext!!) + innerUploadInfo = tcx.process() + if (tcx.fileTranscoded) { + fileStream = withContext(Dispatchers.IO) { + BufferedInputStream(FileInputStream(innerUploadInfo.standardizedFile)) + } + // This !! is safe by contract: process() will set the standardizedFile if it transcoded + fileSize = innerUploadInfo.standardizedFile!!.length() + } + } + + // Now that we've standardized (or not), start the upload + + var totalBytesSent: Long = getAlreadyTransferredBytes(innerUploadInfo) Log.d("UploadJobFactory", "totalBytesSent: $totalBytesSent") - val chunkBuffer = ByteArray(uploadInfo.chunkSize) + val chunkBuffer = ByteArray(innerUploadInfo.chunkSize) // If we're resuming, we must skip to the current file pos if (totalBytesSent != 0L) { withContext(Dispatchers.IO) { fileStream.skip(totalBytesSent) } } + statusFlow.value = UploadStatus.Uploading( + MuxUpload.Progress( + bytesUploaded = totalBytesSent, + totalBytes = fileSize, + startTime = startTime, + updatedTime = System.currentTimeMillis() + ) + ) + // Upload each chunk starting from the current head of the stream do { // The last chunk will almost definitely be smaller than a whole chunk val bytesLeft = fileSize - totalBytesSent - val chunkSize = if (uploadInfo.chunkSize > bytesLeft) { + val thisChunkSize = if (innerUploadInfo.chunkSize > bytesLeft) { bytesLeft.toInt() } else { - uploadInfo.chunkSize + innerUploadInfo.chunkSize } - + logger.i("UploadJob", "Trying to read $thisChunkSize bytes") //read-in a chunk val fileReadSize = withContext(Dispatchers.IO) { - fileStream.read(chunkBuffer, 0, chunkSize) + fileStream.read(chunkBuffer, 0, thisChunkSize) } - if (fileReadSize != chunkSize) { // Guaranteed unless the file was changed under us or sth - throw IllegalStateException("expected to read $chunkSize bytes, but read $fileReadSize") + if (fileReadSize != thisChunkSize) { // Guaranteed unless the file was changed under us or sth + throw IllegalStateException("expected to read $thisChunkSize bytes, but read $fileReadSize") } val chunk = ChunkWorker.Chunk( - contentLength = chunkSize, + contentLength = thisChunkSize, startByte = totalBytesSent, - endByte = totalBytesSent + chunkSize - 1, + endByte = totalBytesSent + thisChunkSize - 1, totalFileSize = fileSize, sliceData = chunkBuffer, ) @@ -107,19 +145,20 @@ internal class UploadJobFactory private constructor( // Bounce progress updates to callers updateProgressJob = launch { chunkProgressFlow.collect { chunkProgress -> - overallProgressFlow.emit( + statusFlow.value = UploadStatus.Uploading( MuxUpload.Progress( bytesUploaded = chunkProgress.bytesUploaded + totalBytesSent, totalBytes = fileSize, startTime = startTime, updatedTime = chunkProgress.updatedTime, ) - ) // overallProgressChannel.emit( + ) // statusFlow.value = ... ( } // chunkProgressChannel.collect { } - val chunkFinalState = createWorker(chunk, uploadInfo, chunkProgressFlow).upload() + val chunkFinalState = createWorker(chunk, innerUploadInfo, chunkProgressFlow).upload() + // Done with a chunk, so update the state again to capture all progress before looping totalBytesSent += chunkFinalState.bytesUploaded val intermediateProgress = MuxUpload.Progress( bytesUploaded = totalBytesSent, @@ -127,47 +166,47 @@ internal class UploadJobFactory private constructor( updatedTime = chunkFinalState.updatedTime, startTime = startTime, ) - overallProgressFlow.emit(intermediateProgress) + statusFlow.value = UploadStatus.Uploading(intermediateProgress) } finally { updateProgressJob?.cancel() } } while (totalBytesSent < fileSize) // We made it! - val finalState = createFinalState(fileSize, startTime) + val finalProgress = createFinalState(fileSize, startTime) // report this upload asynchronously (unless a debug build of the SDK) @Suppress("KotlinConstantConditions") - if (BuildConfig.BUILD_TYPE != "debug" && !uploadInfo.optOut) { + if (BuildConfig.BUILD_TYPE != "debug" && !innerUploadInfo.optOut) { launch { metrics.reportUpload( - startTimeMillis = finalState.startTime, - endTimeMillis = finalState.updatedTime, - uploadInfo = uploadInfo, + startTimeMillis = finalProgress.startTime, + endTimeMillis = finalProgress.updatedTime, + uploadInfo = innerUploadInfo, ) } } // finish up - MainScope().launch { MuxUploadManager.jobFinished(uploadInfo) } - successFlow.emit(finalState) - Result.success(finalState) + MainScope().launch { MuxUploadManager.jobFinished(innerUploadInfo) } + val success = UploadStatus.UploadSuccess(finalProgress) + statusFlow.value = success + Result.success(success) } catch (e: Exception) { - MuxUploadSdk.logger.e("MuxUpload", "Upload of ${uploadInfo.file} failed", e) + MuxUploadSdk.logger.e("MuxUpload", "Upload of ${innerUploadInfo.inputFile} failed", e) val finalState = createFinalState(fileSize, startTime) - overallProgressFlow.emit(finalState) - errorFlow.emit(e) - MainScope().launch { MuxUploadManager.jobFinished(uploadInfo, false) } + val failStatus = UploadStatus.UploadFailed(e, finalState) + statusFlow.value = failStatus + MainScope().launch { MuxUploadManager.jobFinished(innerUploadInfo, false) } Result.failure(e) } finally { @Suppress("BlockingMethodInNonBlockingContext") // the streams we use don't block on close fileStream.close() + innerUploadInfo.standardizedFile?.delete() } - } + } // val uploadJob = ... return uploadInfo.update( - successFlow = successFlow, - progressFlow = overallProgressFlow, - errorFlow = errorFlow, + statusFlow = statusFlow.asStateFlow(), uploadJob = uploadJob, ) } diff --git a/library/src/main/java/com/mux/video/upload/internal/UploadMetrics.kt b/library/src/main/java/com/mux/video/upload/internal/UploadMetrics.kt index 824bf705..b7409ae6 100644 --- a/library/src/main/java/com/mux/video/upload/internal/UploadMetrics.kt +++ b/library/src/main/java/com/mux/video/upload/internal/UploadMetrics.kt @@ -3,19 +3,24 @@ package com.mux.video.upload.internal import android.content.Context import android.content.pm.PackageManager import android.media.MediaMetadataRetriever +import android.net.Uri import android.os.Build import com.mux.video.upload.BuildConfig import com.mux.video.upload.MuxUploadSdk import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext +import okhttp3.Interceptor import okhttp3.MediaType.Companion.toMediaType import okhttp3.Request import okhttp3.RequestBody.Companion.toRequestBody import org.json.JSONObject import java.util.* +import java.util.concurrent.TimeUnit internal class UploadMetrics private constructor() { + private val logger get() = MuxUploadSdk.logger + @JvmSynthetic internal suspend fun reportUpload( startTimeMillis: Long, @@ -25,7 +30,7 @@ internal class UploadMetrics private constructor() { val videoDuration = withContext(Dispatchers.IO) { try { MediaMetadataRetriever().use { retriever -> - retriever.setDataSource(uploadInfo.file.absolutePath) + retriever.setDataSource(uploadInfo.inputFile.absolutePath) retriever.extractMetadata(MediaMetadataRetriever.METADATA_KEY_DURATION)?.toInt() } } catch (e: Exception) { @@ -37,8 +42,9 @@ internal class UploadMetrics private constructor() { val eventJson = UploadEvent( startTimeMillis = startTimeMillis, endTimeMillis = endTimeMillis, - fileSize = uploadInfo.file.length(), + fileSize = uploadInfo.inputFile.length(), videoDuration = videoDuration ?: 0, + uploadURL = uploadInfo.remoteUri.toString(), sdkVersion = BuildConfig.LIB_VERSION, osName = "Android", osVersion = Build.VERSION.RELEASE, @@ -48,10 +54,30 @@ internal class UploadMetrics private constructor() { regionCode = Locale.getDefault().country ).toJson() - // For this case, redirect-following is desired val httpClient = MuxUploadSdk.httpClient().newBuilder() - .followRedirects(true) - .followSslRedirects(true) + // The SDK's http client is configured for uploading. We want the tighter default timeouts + .callTimeout(0, TimeUnit.SECONDS) + .writeTimeout(10, TimeUnit.SECONDS) + // We need to do a non-compliant redirect: 301 or 302 but preserving method and body + .addInterceptor(Interceptor { chain -> + val response = chain.proceed(chain.request()) + if (response.code in 301..302) { + val redirectUri = response.headers("Location").firstOrNull()?.let { Uri.parse(it) } + // If 'Location' was present and was a real URL, redirect to it + if (redirectUri == null) { + logger.w("UploadMetrics", "redirect with invalid or blank url. ignoring") + response + } else { + response.close() // Required before starting a new request in the chain + val redirectedReq = response.request.newBuilder() + .url(redirectUri.toString()) + .build() + chain.proceed(redirectedReq) + } + } else { + response + } + }) .build() val request = Request.Builder() .url("https://mobile.muxanalytics.com") @@ -93,6 +119,7 @@ private data class UploadEvent( val endTimeMillis: Long, val fileSize: Long, val videoDuration: Int, + val uploadURL: String, // Device-Derived val sdkVersion: String, val osName: String, @@ -109,6 +136,7 @@ private data class UploadEvent( put("end_time", endTimeMillis / 1000.0) put("file_size", fileSize) put("video_duration", videoDuration) + put("upload_url", uploadURL) put("sdk_version", sdkVersion) put("os_name", osName) put("os_version", osVersion) diff --git a/library/src/main/java/com/mux/video/upload/internal/UploadPersistence.kt b/library/src/main/java/com/mux/video/upload/internal/UploadPersistence.kt index dfe93aeb..00d6d8bf 100644 --- a/library/src/main/java/com/mux/video/upload/internal/UploadPersistence.kt +++ b/library/src/main/java/com/mux/video/upload/internal/UploadPersistence.kt @@ -3,7 +3,6 @@ package com.mux.video.upload.internal import android.content.Context import android.content.SharedPreferences import android.net.Uri -import android.util.Log import com.mux.video.upload.api.MuxUpload import org.json.JSONArray import org.json.JSONObject @@ -19,7 +18,7 @@ internal fun initializeUploadPersistence(appContext: Context) { internal fun writeUploadState(uploadInfo: UploadInfo, state: MuxUpload.Progress) { UploadPersistence.write( UploadEntry( - file = uploadInfo.file, + file = uploadInfo.inputFile, url = uploadInfo.remoteUri.toString(), savedAtLocalMs = Date().time, state = if (uploadInfo.isRunning()) { @@ -37,7 +36,7 @@ internal fun writeUploadState(uploadInfo: UploadInfo, state: MuxUpload.Progress) @JvmSynthetic internal fun readLastByteForFile(upload: UploadInfo): Long { - return UploadPersistence.readEntries()[upload.file.absolutePath]?.bytesSent ?: 0 + return UploadPersistence.readEntries()[upload.inputFile.absolutePath]?.bytesSent ?: 0 } @JvmSynthetic @@ -52,14 +51,12 @@ internal fun readAllCachedUploads(): List { .map { UploadInfo( remoteUri = Uri.parse(it.url), - file = it.file, + inputFile = it.file, chunkSize = it.chunkSize, retriesPerChunk = it.retriesPerChunk, optOut = it.optOut, uploadJob = null, - successFlow = null, - progressFlow = null, - errorFlow = null, + statusFlow = null, ) } } @@ -91,7 +88,7 @@ private object UploadPersistence { fun removeForFile(upload: UploadInfo) { checkInitialized() val entries = readEntries() - entries -= upload.file.absolutePath + entries -= upload.inputFile.absolutePath writeEntries(entries) } diff --git a/library/src/main/java/com/mux/video/upload/internal/Util.kt b/library/src/main/java/com/mux/video/upload/internal/Util.kt index a879c2fa..2b76e5e1 100644 --- a/library/src/main/java/com/mux/video/upload/internal/Util.kt +++ b/library/src/main/java/com/mux/video/upload/internal/Util.kt @@ -1,6 +1,13 @@ package com.mux.video.upload.internal +import android.media.MediaCodec +import android.media.MediaCodecInfo +import android.media.MediaExtractor +import android.media.MediaFormat +import android.media.MediaMuxer +import android.os.Build import android.os.Looper +import com.mux.video.upload.MuxUploadSdk /** * Asserts that we are on the main thread, crashing if not. @@ -14,3 +21,104 @@ internal fun assertMainThread() { throw IllegalStateException("This can only be called from the main thread") } } + +/** + * Gets an Integer from a [MediaFormat] with the given key. If the key is missing or null, returns + * a default value + * + * On API < Q, this method must branch to safely check for the key + */ +@JvmSynthetic +internal fun MediaFormat.getIntegerCompat(key: String, default: Int): Int { + return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) { + getInteger(key, default) + // Before Q, we have to check for the key manually. Not checking can result in an exception + } else if (containsKey(key)) { + getInteger(key) + } else { + default + } +} + +/** + * Gets a Long from a [MediaFormat] with the given key. If the key is missing or null, returns + * a default value + * + * On API < Q, this method must branch to safely check for the key + */ +@JvmSynthetic +internal fun MediaFormat.getLongCompat(key: String, default: Long): Long { + return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) { + getLong(key, default) + } else if (containsKey(key)) { + getLong(key) + } else { + default + } +} + +/** + * Returns true if the codec described is hardware-accelerated, or else it returns false. + * + * On API < Q, this information is not available so this method returns true. This behavior could + * potentially be expanded in the future + */ +internal val MediaCodecInfo.isHardwareAcceleratedCompat: Boolean get() { + return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) { + isHardwareAccelerated + } else { + // On < Q, we can't tell. Return true because there's no point in filtering codecs in this case + true + } +} + +/** + * Safely dispose of a MediaCodec, stopping it first unless requested otherwise + * If an exception is thrown during disposal, it will be logged and swallowed + * + * @param stop Stop the codec before releasing it. Default is true + */ +@JvmSynthetic +internal fun MediaCodec.safeDispose(stop: Boolean = true) { + try { + if (stop) { + stop() + } + release() + } catch (e: Exception) { + MuxUploadSdk.logger.w("TranscoderContext", "Failed to dispose of MediaCodec", e) + } +} + +/** + * Safely dispose of a MediaMuxer. Stopps it first unless requested otherwise (on API 18+) + * If an exception is thrown during disposal, it will be logged and swallowed + * + * @param stop Stop the codec before releasing it. Default is true + */ +@JvmSynthetic +internal fun MediaMuxer.safeDispose(stop: Boolean = true) { + try { + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR2) { + if (stop) { + stop() + } + release() + } + } catch (e: Exception) { + MuxUploadSdk.logger.w("TranscoderContext", "Failed to dispose of MediaCodec", e) + } +} + +/** + * Safely dispose of a MediaExtractor + * If an exception is thrown during disposal, it will be logged and swallowed + */ +@JvmSynthetic +internal fun MediaExtractor.safeDispose() { + try { + release() + } catch (e: Exception) { + MuxUploadSdk.logger.w("TranscoderContext", "Failed to dispose of MediaCodec", e) + } +} diff --git a/library/src/test/java/com/mux/video/upload/internal/UploadPersistenceTests.kt b/library/src/test/java/com/mux/video/upload/internal/UploadPersistenceTests.kt index 3021992e..61e9606a 100644 --- a/library/src/test/java/com/mux/video/upload/internal/UploadPersistenceTests.kt +++ b/library/src/test/java/com/mux/video/upload/internal/UploadPersistenceTests.kt @@ -42,12 +42,12 @@ class UploadPersistenceTests : AbsRobolectricTest() { 1, uploadsOut.size ) - val uploadOutA = uploadsOut.find { it.file.absolutePath == uploadInfoInA.file.absolutePath } + val uploadOutA = uploadsOut.find { it.inputFile.absolutePath == uploadInfoInA.inputFile.absolutePath } assertNull( "file A should NOT be in the output", uploadOutA ) - val uploadOutB = uploadsOut.find { it.file.absolutePath == uploadInfoInB.file.absolutePath } + val uploadOutB = uploadsOut.find { it.inputFile.absolutePath == uploadInfoInB.inputFile.absolutePath } assertNotNull( "upload B should be in the output", uploadOutB, @@ -81,7 +81,7 @@ class UploadPersistenceTests : AbsRobolectricTest() { 2, uploadsOut.size ) - val uploadOutA = uploadsOut.find { it.file.absolutePath == uploadInfoInA.file.absolutePath } + val uploadOutA = uploadsOut.find { it.inputFile.absolutePath == uploadInfoInA.inputFile.absolutePath } assertNotNull( "file A should be in the output", uploadOutA @@ -129,15 +129,13 @@ class UploadPersistenceTests : AbsRobolectricTest() { } private fun uploadInfo(name: String = "a/file") = UploadInfo( - file = File(name).absoluteFile, + inputFile = File(name).absoluteFile, remoteUri = Uri.parse("https://www.mux.com/$name"), chunkSize = 2, retriesPerChunk = 3, optOut = false, uploadJob = null, - progressFlow = null, - successFlow = null, - errorFlow = null + statusFlow = null, ) private fun mockContext(): Context {