Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Remove unnecessary bits
Browse files Browse the repository at this point in the history
  • Loading branch information
sbalko committed Jun 6, 2024
1 parent 316c51d commit c8fdf22
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 89 deletions.
28 changes: 1 addition & 27 deletions src/shared/controllable-promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,13 @@ export class ControllablePromise<T> {
return this._resolve;
}

static from<T>(promise: Promise<T>): ControllablePromise<T> {
return new ControllablePromise<T>((resolve, reject) => {
promise.then(resolve).catch(reject);
});
}
private readonly _promise: Promise<T>;
private _reject!: (reason?: Error) => void;
private _resolve!: (value: T) => void;

private _status: SynchronousPromiseStatus = 'pending';

constructor(
promiseCallback?: (resolve: (value: T) => void, reject: (reason?: Error) => void) => void
) {
constructor() {
this._promise = new Promise((resolve, reject) => {
this._resolve = value => {
this._status = 'resolved';
Expand All @@ -36,25 +29,6 @@ export class ControllablePromise<T> {
this._status = 'rejected';
reject(error);
};
if (promiseCallback) {
promiseCallback(this._resolve, this._reject);
}
});
}

/**
* Offers a way to synchronously "peek" the status of the controllable promise.
* Will initially be "pending" and then flip into "resolved" or "rejected".
*/
get status(): SynchronousPromiseStatus {
return this._status;
}

always(callback: () => void): void {
this._promise.then(callback, callback);
}

watch(promise: Promise<T>): Promise<void> {
return promise.then(this._resolve).catch(this._reject);
}
}
66 changes: 4 additions & 62 deletions src/shared/leader-follower.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,76 +13,26 @@ export class LeaderFollowerQueue {
private readonly queue: IFunctionMetadata[] = [];
private runningFunctionMetadata?: IFunctionMetadata;

/**
* Enqueues a function with a promise callback signature and returns a promise. The
* enqueued function is called with a resolve and a reject callback that (1) forward the queue
* to the next queued function and (2) resolves/rejects the returned promise, respectively.
*
* A timeout for when the function automatically rejects and skips to the next queued function
* can be passed in, where a value of 0 signals that no timeout shall be enforced.
*
* @param promiseCallback - a promise callback function that receives a "resolve" and "reject" function parameters.
* @param timeoutMillis - a timeout in milliseconds, where 0 signals that no timeout shall be enforced.
* @param errorMessage - an optional error message that used for the exception that is produced when a timeout is triggered.
*
* @returns a promise that resolves (rejects) after the provided callback was called and has called its
* resolve (reject) parameter, respectively.
*/
createTimedPromise<T>(
createPromise<T>(
promiseCallback: (resolve: (result: T) => void, reject: (error: Error) => void) => void,
timeoutMillis: number = DEFAULT_TIMEOUT_MILLIS,
errorMessage?: string
): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.enqueueFunction(async nextCallback => {
const timeoutId =
timeoutMillis > 0
? setTimeout(() => {
nextCallback();
reject(
new Error(
errorMessage ||
`Queued promise timed out after ${timeoutMillis} millis`
)
);
}, timeoutMillis)
: undefined;

try {
resolve(await new Promise<T>(promiseCallback));
} catch (error) {
reject(error);
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}

nextCallback();
}
});
});
}

/**
* Enqueues a function that returns a promise. This function is only called when the
* queue has finished processing any preceding (ie. ahead in queue) functions.
* The queue skips ahead once the returned promise resolves or rejects. No timeout
* is enforced.
*
* @param callback - a function that returns a promise. The function is only called once
* the queue has finished any preceding functions.
*
* @returns a promise that resolves (rejects) after the promise the was returned by
* the provided callback function resolves (rejects), respectively.
*/
enqueuePromise<T>(callback: () => Promise<T>): Promise<T> {
return this.createTimedPromise<T>((resolve, reject) => {
return this.createPromise<T>((resolve, reject) => {
callback().then(resolve).catch(reject);
}, 0);
}

discardQueued(): void {
this.queue.splice(0, this.queue.length);
});
}

enqueueFunction(enqueuedFunction: EnqueuedFunction): void {
Expand All @@ -91,21 +41,13 @@ export class LeaderFollowerQueue {
func: enqueuedFunction,
};

if (this.isIdle()) {
if (this.runningFunctionMetadata === undefined) {
this.runFunction(newFunctionMetadata);
} else {
this.queue.push(newFunctionMetadata);
}
}

getEnqueuedFunctions(): EnqueuedFunction[] {
return this.queue.map(enqueued => enqueued.func);
}

isIdle(): boolean {
return !this.runningFunctionMetadata;
}

private unwindQueueRecursively(newRecursionDepth: number): void {
const functionMetadata = this.queue.shift();
if (functionMetadata !== undefined) {
Expand Down

0 comments on commit c8fdf22

Please sign in to comment.