Skip to content

Commit

Permalink
Fix file download cancellation handling (#275)
Browse files Browse the repository at this point in the history
* Enable the test, change how download is aborted

* Naming

* WIP

* Add test for single changeset download cancellation

* Remove test since the changeset is downloaded in a single chunk

* Code cleanup

* Lint

* Add comment

* Add changelog

* Renaming

* Remove console logging
  • Loading branch information
austeja-bentley authored Aug 19, 2024
1 parent 0df4690 commit 1cd2074
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ export type ProgressCallback = (downloaded: number, total: number) => void;

/** Common parameters for progress reporting download operations. */
export interface DownloadProgressParam {
/** Function called to report download progress. */
/**
* Function called to report download progress.
* IMPORTANT: This function should never throw an error or cancel the download, otherwise it may result in unexpected behavior.
*/
progressCallback?: ProgressCallback;
/** Abort signal for cancelling download. */
abortSignal?: GenericAbortSignal;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/imodels-access-backend",
"comment": "Fix file download cancellation handling.",
"type": "none"
}
],
"packageName": "@itwin/imodels-access-backend"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/imodels-client-authoring",
"comment": "Add a documentation comment to inform users to not throw errors inside progress callback functions passed to operations that download files.",
"type": "none"
}
],
"packageName": "@itwin/imodels-client-authoring"
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {

import { getV1CheckpointSize, queryCurrentOrPrecedingV1Checkpoint, queryCurrentOrPrecedingV2Checkpoint } from "./CheckpointHelperFunctions";
import { ClientToPlatformAdapter } from "./interface-adapters/ClientToPlatformAdapter";
import { PlatformToClientAdapter } from "./interface-adapters/PlatformToClientAdapter";
import { DownloadCancellationMonitorFunc, PlatformToClientAdapter } from "./interface-adapters/PlatformToClientAdapter";

export class BackendIModelsAccess implements BackendHubAccess {
protected readonly _iModelsClient: IModelsClient;
Expand All @@ -49,17 +49,23 @@ export class BackendIModelsAccess implements BackendHubAccess {
};
downloadParams.urlParams = PlatformToClientAdapter.toChangesetRangeUrlParams(arg.range);

const [progressCallback, abortSignal] = PlatformToClientAdapter.toProgressCallback(arg.progressCallback) ?? [];
downloadParams.progressCallback = progressCallback;
downloadParams.abortSignal = abortSignal;

const downloadedChangesets: DownloadedChangeset[] = await handleAPIErrors(
async () => this._iModelsClient.changesets.downloadList(downloadParams),
"downloadChangesets"
);
let downloadCancellationMonitorFunc: DownloadCancellationMonitorFunc | undefined;
try {
const downloadProgressParams = PlatformToClientAdapter.toDownloadProgressParam(arg.progressCallback);
downloadCancellationMonitorFunc = downloadProgressParams?.downloadCancellationMonitorFunc;
downloadParams.progressCallback = downloadProgressParams?.progressCallback;
downloadParams.abortSignal = downloadProgressParams?.abortSignal;

const downloadedChangesets: DownloadedChangeset[] = await handleAPIErrors(
async () => this._iModelsClient.changesets.downloadList(downloadParams),
"downloadChangesets"
);

const result: ChangesetFileProps[] = downloadedChangesets.map(ClientToPlatformAdapter.toChangesetFileProps);
return result;
const result: ChangesetFileProps[] = downloadedChangesets.map(ClientToPlatformAdapter.toChangesetFileProps);
return result;
} finally {
downloadCancellationMonitorFunc?.({ shouldCancel: false });
}
}

public async downloadChangeset(arg: DownloadChangesetArg): Promise<ChangesetFileProps> {
Expand All @@ -69,25 +75,31 @@ export class BackendIModelsAccess implements BackendHubAccess {
targetDirectoryPath: arg.targetDir
};

const [progressCallback, abortSignal] = PlatformToClientAdapter.toProgressCallback(arg.progressCallback) ?? [];
downloadSingleChangesetParams.progressCallback = progressCallback;
downloadSingleChangesetParams.abortSignal = abortSignal;

const downloadedChangeset: DownloadedChangeset = await handleAPIErrors(
async () => {
const stopwatch = new StopWatch(`[${arg.changeset.id}]`, true);
Logger.logInfo("BackendIModelsAccess", `Starting download of changeset with id ${stopwatch.description}`);

const innerResult = await this._iModelsClient.changesets.downloadSingle(downloadSingleChangesetParams);

Logger.logInfo("BackendIModelsAccess", `Downloaded changeset with id ${stopwatch.description} (${stopwatch.elapsedSeconds} seconds)`);
return innerResult;
},
"downloadChangesets"
);
let downloadCancellationMonitorFunc: DownloadCancellationMonitorFunc | undefined;
try {
const downloadProgressParams = PlatformToClientAdapter.toDownloadProgressParam(arg.progressCallback);
downloadCancellationMonitorFunc = downloadProgressParams?.downloadCancellationMonitorFunc;
downloadSingleChangesetParams.progressCallback = downloadProgressParams?.progressCallback;
downloadSingleChangesetParams.abortSignal = downloadProgressParams?.abortSignal;

const downloadedChangeset: DownloadedChangeset = await handleAPIErrors(
async () => {
const stopwatch = new StopWatch(`[${arg.changeset.id}]`, true);
Logger.logInfo("BackendIModelsAccess", `Starting download of changeset with id ${stopwatch.description}`);

const innerResult = await this._iModelsClient.changesets.downloadSingle(downloadSingleChangesetParams);

Logger.logInfo("BackendIModelsAccess", `Downloaded changeset with id ${stopwatch.description} (${stopwatch.elapsedSeconds} seconds)`);
return innerResult;
},
"downloadChangesets"
);

const result: ChangesetFileProps = ClientToPlatformAdapter.toChangesetFileProps(downloadedChangeset);
return result;
const result: ChangesetFileProps = ClientToPlatformAdapter.toChangesetFileProps(downloadedChangeset);
return result;
} finally {
downloadCancellationMonitorFunc?.({ shouldCancel: false });
}
}

public async queryChangeset(arg: ChangesetArg): Promise<ChangesetProps> {
Expand Down Expand Up @@ -242,21 +254,30 @@ export class BackendIModelsAccess implements BackendHubAccess {
throw new IModelError(BriefcaseStatus.VersionNotFound, "V1 checkpoint not found");

const v1CheckpointSize = await getV1CheckpointSize(checkpoint._links.download.href);
const [progressCallback, abortSignal] = PlatformToClientAdapter.toProgressCallback(arg.onProgress) ?? [];
const totalDownloadCallback = progressCallback ? (downloaded: number) => progressCallback?.(downloaded, v1CheckpointSize) : undefined;

const stopwatch = new StopWatch(`[${checkpoint.changesetId}]`, true);
Logger.logInfo("BackendIModelsAccess", `Starting download of checkpoint with id ${stopwatch.description}`);
await downloadFile({
storage: this._iModelsClient.cloudStorage,
url: checkpoint._links.download.href,
localPath: arg.localFile,
totalDownloadCallback,
abortSignal
});
Logger.logInfo("BackendIModelsAccess", `Downloaded checkpoint with id ${stopwatch.description} (${stopwatch.elapsedSeconds} seconds)`);

return { index: checkpoint.changesetIndex, id: checkpoint.changesetId };

let downloadCancellationMonitorFunc: DownloadCancellationMonitorFunc | undefined;
try {
const downloadProgressParams = PlatformToClientAdapter.toDownloadProgressParam(arg.onProgress);
downloadCancellationMonitorFunc = downloadProgressParams?.downloadCancellationMonitorFunc;
const totalDownloadCallback = downloadProgressParams?.progressCallback
? (downloaded: number) => downloadProgressParams.progressCallback?.(downloaded, v1CheckpointSize)
: undefined;

const stopwatch = new StopWatch(`[${checkpoint.changesetId}]`, true);
Logger.logInfo("BackendIModelsAccess", `Starting download of checkpoint with id ${stopwatch.description}`);
await downloadFile({
storage: this._iModelsClient.cloudStorage,
url: checkpoint._links.download.href,
localPath: arg.localFile,
totalDownloadCallback,
abortSignal: downloadProgressParams?.abortSignal
});
Logger.logInfo("BackendIModelsAccess", `Downloaded checkpoint with id ${stopwatch.description} (${stopwatch.elapsedSeconds} seconds)`);

return { index: checkpoint.changesetIndex, id: checkpoint.changesetId };
} finally {
downloadCancellationMonitorFunc?.({ shouldCancel: false });
}
}

public async queryV2Checkpoint(arg: CheckpointProps): Promise<V2CheckpointAccessProps | undefined> {
Expand Down Expand Up @@ -432,7 +453,7 @@ export class BackendIModelsAccess implements BackendHubAccess {
const foundWalFile = IModelJsFs.existsSync(`${baselineFilePath}-wal`);
const db = IModelDb.openDgnDb({ path: baselineFilePath }, OpenMode.ReadWrite);
if (foundWalFile) {
Logger.logWarning("BackendIModelsAccess", "Wal file found while uploading file, performing checkpoint.", {baselineFilePath});
Logger.logWarning("BackendIModelsAccess", "Wal file found while uploading file, performing checkpoint.", { baselineFilePath });
db.performCheckpoint();
}
this.closeFile(db);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
* Copyright (c) Bentley Systems, Incorporated. All rights reserved.
* See LICENSE.md in the project root for license terms and full copyright notice.
*--------------------------------------------------------------------------------------------*/
import { AbortController, AbortSignal } from "@azure/abort-controller";
import { AbortController } from "@azure/abort-controller";
import { CreateNewIModelProps, LockMap, LockState, ProgressFunction, ProgressStatus } from "@itwin/core-backend";
import { RepositoryStatus } from "@itwin/core-bentley";
import { ChangesetFileProps, ChangesetRange, ChangesetType, IModelError, ChangesetIndexOrId as PlatformChangesetIdOrIndex } from "@itwin/core-common";

import {
ChangesetPropertiesForCreate, ChangesetIdOrIndex as ClientChangesetIdOrIndex, ContainingChanges, GetChangesetListUrlParams, IModelProperties,
LockLevel, LockedObjects, ProgressCallback
ChangesetPropertiesForCreate, ChangesetIdOrIndex as ClientChangesetIdOrIndex, ContainingChanges, DownloadProgressParam, GetChangesetListUrlParams, IModelProperties,
LockLevel, LockedObjects
} from "@itwin/imodels-client-authoring";

interface DownloadCancellationMonitorFuncParams { shouldCancel: boolean }
export type DownloadCancellationMonitorFunc = (params: DownloadCancellationMonitorFuncParams) => void;

export class PlatformToClientAdapter {
public static toChangesetPropertiesForCreate(changesetFileProps: ChangesetFileProps, changesetDescription: string): ChangesetPropertiesForCreate {
return {
Expand Down Expand Up @@ -76,19 +79,39 @@ export class PlatformToClientAdapter {
};
}

public static toProgressCallback(progressCallback?: ProgressFunction): [ProgressCallback, AbortSignal] | undefined {
/**
* @returns `progressCallback` and `abortSignal` instances to pass to iModels client functions, and `downloadCancellationMonitorFunc`.
* IMPORTANT: `downloadCancellationMonitorFunc` must be called at least once to not leave pending promises.
*/
public static toDownloadProgressParam(progressCallback?: ProgressFunction): (DownloadProgressParam & { downloadCancellationMonitorFunc: DownloadCancellationMonitorFunc }) | undefined {
if (!progressCallback)
return;

const abortController = new AbortController();

// We construct a promise which, if resolved with `{ shouldCancel: true }`, will cancel the download.
// We have to do this instead of calling `abortController.abort` inside `progressCallback` function because `progressCallback` is called inside "on `data`"
// event handler of the download stream, which is out of the execution context of the `iModelsClient.changesets.downloadList` function. That results in an
// unhandled exception.
let downloadCancellationMonitorFunc: DownloadCancellationMonitorFunc = undefined!;
void new Promise<DownloadCancellationMonitorFuncParams>((resolve) => {
downloadCancellationMonitorFunc = resolve;
}).then((params: DownloadCancellationMonitorFuncParams) => {
if (params.shouldCancel)
abortController.abort();
});

const convertedProgressCallback = (downloaded: number, total: number) => {
const cancel = progressCallback(downloaded, total);

if (cancel !== ProgressStatus.Continue)
abortController.abort();
downloadCancellationMonitorFunc({ shouldCancel: true });
};

return [convertedProgressCallback, abortController.signal];
return {
progressCallback: convertedProgressCallback,
abortSignal: abortController.signal,
downloadCancellationMonitorFunc
};
}

// eslint-disable-next-line deprecation/deprecation
Expand All @@ -97,10 +120,10 @@ export class PlatformToClientAdapter {
// eslint-disable-next-line deprecation/deprecation
case LockState.None:
return LockLevel.None;
// eslint-disable-next-line deprecation/deprecation
// eslint-disable-next-line deprecation/deprecation
case LockState.Shared:
return LockLevel.Shared;
// eslint-disable-next-line deprecation/deprecation
// eslint-disable-next-line deprecation/deprecation
case LockState.Exclusive:
return LockLevel.Exclusive;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ describe("BackendIModelsAccess", () => {
}
});

it.skip("should cancel changesets download and finish downloading missing changesets during next download", async () => {
it("should cancel changesets download and finish downloading missing changesets during next download", async () => {
// Arrange
const downloadChangesetsParams: DownloadChangesetRangeArg = {
accessToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,16 +488,24 @@ describe("[Authoring] ChangesetOperations", () => {
assertProgressReports(progressReports);
});

it.skip("should cancel changesets download", async () => {
it("should cancel changesets download", async () => {
// Arrange
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Promise construction below mimics the cancellation workflow implemented in
// `PlatformToClientAdapter.toDownloadProgressParam` function `@itwin/imodels-access-backend` package.
// Refer to that for an explanation on why this is needed.
let triggerDownloadCancellation: () => void = undefined!;
const triggerDownloadCancellationPromise = new Promise<void>((resolve) => {
triggerDownloadCancellation = resolve;
});

const progressReports: ProgressReport[] = [];
const progressCallback: ProgressCallback = (downloaded, total) => {
progressReports.push({downloaded, total});
if (downloaded > total / 2)
abortController.abort();
triggerDownloadCancellation();
};

const downloadPath = path.join(Constants.TestDownloadDirectoryPath, "[Authoring] ChangesetOperations", "cancel changesets download");
Expand All @@ -512,7 +520,12 @@ describe("[Authoring] ChangesetOperations", () => {
// Act
let thrownError: unknown;
try {
await iModelsClient.changesets.downloadList(downloadChangesetListParams);
const testedFunctionPromise = iModelsClient.changesets.downloadList(downloadChangesetListParams);

await triggerDownloadCancellationPromise;
abortController.abort();

await testedFunctionPromise;
} catch (error: unknown) {
thrownError = error;
}
Expand Down

0 comments on commit 1cd2074

Please sign in to comment.