Skip to content

Commit

Permalink
rewrite notifyOnCompletion as autoRefreshUntilCompletion, fix Process…
Browse files Browse the repository at this point in the history
…DetailComponent, and the ProcessDataService tests
  • Loading branch information
artlowel committed Aug 29, 2023
1 parent bd66487 commit 3be90eb
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ describe('CollectionSourceControlsComponent', () => {
invoke: createSuccessfulRemoteDataObject$(process),
});
processDataService = jasmine.createSpyObj('processDataService', {
notifyOnCompletion: createSuccessfulRemoteDataObject$(process),
autoRefreshUntilCompletion: createSuccessfulRemoteDataObject$(process),
});
bitstreamService = jasmine.createSpyObj('bitstreamService', {
findByHref: createSuccessfulRemoteDataObject$(bitstream),
Expand Down Expand Up @@ -137,7 +137,7 @@ describe('CollectionSourceControlsComponent', () => {
{name: '-i', value: new ContentSourceSetSerializer().Serialize(contentSource.oaiSetId)},
], []);

expect(processDataService.notifyOnCompletion).toHaveBeenCalledWith(process.processId);
expect(processDataService.autoRefreshUntilCompletion).toHaveBeenCalledWith(process.processId);
expect(bitstreamService.findByHref).toHaveBeenCalledWith(process._links.output.href);
expect(notificationsService.info).toHaveBeenCalledWith(jasmine.anything() as any, 'Script text');
});
Expand All @@ -151,7 +151,7 @@ describe('CollectionSourceControlsComponent', () => {
{name: '-r', value: null},
{name: '-c', value: collection.uuid},
], []);
expect(processDataService.notifyOnCompletion).toHaveBeenCalledWith(process.processId);
expect(processDataService.autoRefreshUntilCompletion).toHaveBeenCalledWith(process.processId);
expect(notificationsService.success).toHaveBeenCalled();
});
});
Expand All @@ -164,7 +164,7 @@ describe('CollectionSourceControlsComponent', () => {
{name: '-o', value: null},
{name: '-c', value: collection.uuid},
], []);
expect(processDataService.notifyOnCompletion).toHaveBeenCalledWith(process.processId);
expect(processDataService.autoRefreshUntilCompletion).toHaveBeenCalledWith(process.processId);
expect(notificationsService.success).toHaveBeenCalled();
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class CollectionSourceControlsComponent implements OnDestroy {
}),
// filter out responses that aren't successful since the pinging of the process only needs to happen when the invocation was successful.
filter((rd) => rd.hasSucceeded && hasValue(rd.payload)),
switchMap((rd) => this.processDataService.notifyOnCompletion(rd.payload.processId)),
switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)),
map((rd) => rd.payload)
).subscribe((process: Process) => {
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
Expand Down Expand Up @@ -166,7 +166,7 @@ export class CollectionSourceControlsComponent implements OnDestroy {
}
}),
filter((rd) => rd.hasSucceeded && hasValue(rd.payload)),
switchMap((rd) => this.processDataService.notifyOnCompletion(rd.payload.processId)),
switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)),
map((rd) => rd.payload)
).subscribe((process) => {
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
Expand Down Expand Up @@ -226,7 +226,7 @@ export class CollectionSourceControlsComponent implements OnDestroy {
}
}),
filter((rd) => rd.hasSucceeded && hasValue(rd.payload)),
switchMap((rd) => this.processDataService.notifyOnCompletion(rd.payload.processId)),
switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)),
map((rd) => rd.payload)
).subscribe((process) => {
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
Expand Down
111 changes: 67 additions & 44 deletions src/app/core/data/processes/process-data.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
*/

import { testFindAllDataImplementation } from '../base/find-all-data.spec';
import { ProcessDataService } from './process-data.service';
import { ProcessDataService, TIMER_FACTORY } from './process-data.service';
import { testDeleteDataImplementation } from '../base/delete-data.spec';
import { cold, getTestScheduler } from 'jasmine-marbles';
import { waitForAsync, TestBed } from '@angular/core/testing';
import { waitForAsync, TestBed, fakeAsync, tick } from '@angular/core/testing';
import { RequestService } from '../request.service';
import { RemoteData } from '../remote-data';
import { RequestEntryState } from '../request-entry-state.model';
Expand All @@ -23,23 +22,33 @@ import { HALEndpointService } from '../../shared/hal-endpoint.service';
import { DSOChangeAnalyzer } from '../dso-change-analyzer.service';
import { BitstreamFormatDataService } from '../bitstream-format-data.service';
import { NotificationsService } from '../../../shared/notifications/notifications.service';
import { TestScheduler } from 'rxjs/testing';
import { TestScheduler, RunHelpers } from 'rxjs/testing';
import { cold } from 'jasmine-marbles';
import { of } from 'rxjs';

describe('ProcessDataService', () => {
let testScheduler;

const mockTimer = (fn: () => {}, interval: number) => {
fn();
return 555;
};

describe('composition', () => {
const initService = () => new ProcessDataService(null, null, null, null, null, null, null);
const initService = () => new ProcessDataService(null, null, null, null, null, null, null, null);
testFindAllDataImplementation(initService);
testDeleteDataImplementation(initService);
});

let requestService;
let processDataService;
let remoteDataBuildService;
let scheduler: TestScheduler;

describe('notifyOnCompletion', () => {
describe('autoRefreshUntilCompletion', () => {
beforeEach(waitForAsync(() => {
scheduler = getTestScheduler();
testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
TestBed.configureTestingModule({
imports: [],
providers: [
Expand All @@ -52,57 +61,71 @@ describe('ProcessDataService', () => {
{ provide: DSOChangeAnalyzer, useValue: null },
{ provide: BitstreamFormatDataService, useValue: null },
{ provide: NotificationsService, useValue: null },
{ provide: TIMER_FACTORY, useValue: mockTimer },
]
});

processDataService = TestBed.inject(ProcessDataService);
spyOn(processDataService, 'invalidateByHref');
}));

it('TODO', () => {
let completedProcess = new Process();
completedProcess.processStatus = ProcessStatus.COMPLETED;
it('should not do any polling when the process is already completed', () => {
testScheduler.run(({ cold, expectObservable }) => {
let completedProcess = new Process();
completedProcess.processStatus = ProcessStatus.COMPLETED;

const completedProcessRD = new RemoteData(0, 0, 0, RequestEntryState.Success, null, completedProcess);

spyOn(processDataService, 'findById').and.returnValue(
cold('(c|)', {
'c': new RemoteData(0, 0, 0, RequestEntryState.Success, null, completedProcess)
})
);
spyOn(processDataService, 'findById').and.returnValue(
cold('c', {
'c': completedProcessRD
})
);

let process$ = processDataService.notifyOnCompletion('instantly');
process$.subscribe((rd) => {
expect(processDataService.findById).toHaveBeenCalledTimes(1);
expect(processDataService.invalidateByHref).not.toHaveBeenCalled();
let process$ = processDataService.autoRefreshUntilCompletion('instantly');
expectObservable(process$).toBe('c', {
c: completedProcessRD
});
});
expect(process$).toBeObservable(cold('(c|)', {
'c': new RemoteData(0, 0, 0, RequestEntryState.Success, null, completedProcess)
}));

expect(processDataService.findById).toHaveBeenCalledTimes(1);
expect(processDataService.invalidateByHref).not.toHaveBeenCalled();
});

it('TODO2', () => {
let runningProcess = new Process();
runningProcess.processStatus = ProcessStatus.RUNNING;
let completedProcess = new Process();
completedProcess.processStatus = ProcessStatus.COMPLETED;

spyOn(processDataService, 'findById').and.returnValue(
cold('p 150ms (c|)', {
'p': new RemoteData(0, 0, 0, RequestEntryState.Success, null, runningProcess),
'c': new RemoteData(0, 0, 0, RequestEntryState.Success, null, completedProcess)
})
);

let process$ = processDataService.notifyOnCompletion('foo', 100);
expect(process$).toBeObservable(cold('- 150ms (c|)', {
'c': new RemoteData(0, 0, 0, RequestEntryState.Success, null, completedProcess)
}));
scheduler.flush();
it('should poll until a process completes', () => {
testScheduler.run(({ cold, expectObservable }) => {
const runningProcess = Object.assign(new Process(), {
_links: {
self: {
href: 'https://rest.api/processes/123'
}
}
});
runningProcess.processStatus = ProcessStatus.RUNNING;
const completedProcess = new Process();
completedProcess.processStatus = ProcessStatus.COMPLETED;
const runningProcessRD = new RemoteData(0, 0, 0, RequestEntryState.Success, null, runningProcess);
const completedProcessRD = new RemoteData(0, 0, 0, RequestEntryState.Success, null, completedProcess);

spyOn(processDataService, 'findById').and.returnValue(
cold('r 150ms c', {
'r': runningProcessRD,
'c': completedProcessRD
})
);

let process$ = processDataService.autoRefreshUntilCompletion('foo', 100);
expectObservable(process$).toBe('r 150ms c', {
'r': runningProcessRD,
'c': completedProcessRD
});
});

expect(processDataService.findById).toHaveBeenCalledTimes(1);
expect(processDataService.invalidateByHref).toHaveBeenCalledTimes(1);
});
});

});
});

// /**
// * Tests whether calls to `FindAllData` methods are correctly patched through in a concrete data service that implements it
Expand Down Expand Up @@ -131,4 +154,4 @@ describe('ProcessDataService', () => {
// expect(out).toBe('TEST findAll');
// });
// });
// }
});
118 changes: 85 additions & 33 deletions src/app/core/data/processes/process-data.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Injectable, NgZone } from '@angular/core';
import { Injectable, NgZone, Inject, InjectionToken } from '@angular/core';
import { RequestService } from '../request.service';
import { RemoteDataBuildService } from '../../cache/builders/remote-data-build.service';
import { ObjectCacheService } from '../../cache/object-cache.service';
import { HALEndpointService } from '../../shared/hal-endpoint.service';
import { Process } from '../../../process-page/processes/process.model';
import { PROCESS } from '../../../process-page/processes/process.resource-type';
import { Observable } from 'rxjs';
import { switchMap, filter, take } from 'rxjs/operators';
import { switchMap, filter, distinctUntilChanged, find } from 'rxjs/operators';
import { PaginatedList } from '../paginated-list.model';
import { Bitstream } from '../../shared/bitstream.model';
import { RemoteData } from '../remote-data';
Expand All @@ -21,13 +21,23 @@ import { NotificationsService } from '../../../shared/notifications/notification
import { NoContent } from '../../shared/NoContent.model';
import { getAllCompletedRemoteData } from '../../shared/operators';
import { ProcessStatus } from 'src/app/process-page/processes/process-status.model';
import { hasValue } from '../../../shared/empty.util';

/**
* Create an InjectionToken for the default JS setTimeout function, purely so we can mock it during
* testing. (fakeAsync isn't working for this case)
*/
export const TIMER_FACTORY = new InjectionToken<(callback: (...args: any[]) => void, ms?: number, ...args: any[]) => NodeJS.Timeout>('timer', {
providedIn: 'root',
factory: () => setTimeout
});

@Injectable()
@dataService(PROCESS)
export class ProcessDataService extends IdentifiableDataService<Process> implements FindAllData<Process>, DeleteData<Process> {
private findAllData: FindAllData<Process>;
private deleteData: DeleteData<Process>;
protected activelyBeingPolled: Set<string> = new Set();
protected activelyBeingPolled: Map<string, NodeJS.Timeout> = new Map();

constructor(
protected requestService: RequestService,
Expand All @@ -37,6 +47,7 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
protected bitstreamDataService: BitstreamDataService,
protected notificationsService: NotificationsService,
protected zone: NgZone,
@Inject(TIMER_FACTORY) protected timer: (callback: (...args: any[]) => void, ms?: number, ...args: any[]) => NodeJS.Timeout
) {
super('processes', requestService, rdbService, objectCache, halService);

Expand Down Expand Up @@ -106,42 +117,83 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
return this.deleteData.deleteByHref(href, copyVirtualMetadata);
}

// TODO
public notifyOnCompletion(processId: string, pollingIntervalInMs = 5000): Observable<RemoteData<Process>> {
const process$ = this.findById(processId, false, true, followLink('script'))
/**
* Return true if the given process has the given status
* @protected
*/
protected statusIs(process: Process, status: ProcessStatus): boolean {
return hasValue(process) && process.processStatus === status;
}

/**
* Return true if the given process has the status COMPLETED or FAILED
*/
public hasCompletedOrFailed(process: Process): boolean {
return this.statusIs(process, ProcessStatus.COMPLETED) ||
this.statusIs(process, ProcessStatus.FAILED);
}

/**
* Clear the timeout for the given process, if that timeout exists
* @protected
*/
protected clearCurrentTimeout(processId: string): void {
const timeout = this.activelyBeingPolled.get(processId);
if (hasValue(timeout)) {
clearTimeout(timeout);
}
};

/**
* Poll the process with the given ID, using the given interval, until that process either
* completes successfully or fails
*
* Return an Observable<RemoteData> for the Process. Note that this will also emit while the
* process is still running. It will only emit again when the process (not the RemoteData!) changes
* status. That makes it more convenient to retrieve that process for a component: you can replace
* a findByID call with this method, rather than having to do a separate findById, and then call
* this method
* @param processId
* @param pollingIntervalInMs
*/
public autoRefreshUntilCompletion(processId: string, pollingIntervalInMs = 5000): Observable<RemoteData<Process>> {
const process$ = this.findById(processId, true, true, followLink('script'))
.pipe(
getAllCompletedRemoteData(),
);

// TODO: this is horrible
const statusIs = (process: Process, status: ProcessStatus) =>
process.processStatus === status;

// If we have to wait too long for the result, we should mark the result as stale.
// However, we should make sure this happens only once (in case there are multiple observers waiting
// for the result).
if (!this.activelyBeingPolled.has(processId)) {
this.activelyBeingPolled.add(processId);

// Create a subscription that marks the data as stale if the polling interval time has been exceeded.
const sub = process$.subscribe((rd) => {
const process = rd.payload;
if (statusIs(process, ProcessStatus.COMPLETED) || statusIs(process, ProcessStatus.FAILED)) {
this.activelyBeingPolled.delete(processId);
sub.unsubscribe();
} else {
this.zone.runOutsideAngular(() =>
setTimeout(() => {
this.invalidateByHref(process._links.self.href);
}, pollingIntervalInMs)
);
}
});
}
// Create a subscription that marks the data as stale if the process hasn't been completed and
// the polling interval time has been exceeded.
const sub = process$.pipe(
filter((processRD: RemoteData<Process>) =>
!this.hasCompletedOrFailed(processRD.payload) &&
!this.activelyBeingPolled.has(processId)
)
).subscribe((processRD: RemoteData<Process>) => {
this.clearCurrentTimeout(processId);
const nextTimeout = this.timer(() => {
this.activelyBeingPolled.delete(processId);
this.invalidateByHref(processRD.payload._links.self.href);
}, pollingIntervalInMs);

this.activelyBeingPolled.set(processId, nextTimeout);
});

// When the process completes create a one off subscription (the `find` completes the
// observable) that unsubscribes the previous one, removes the processId from the list of
// processes being polled and clears any running timeouts
process$.pipe(
find((processRD: RemoteData<Process>) => this.hasCompletedOrFailed(processRD.payload))
).subscribe(() => {
this.clearCurrentTimeout(processId);
this.activelyBeingPolled.delete(processId);
sub.unsubscribe();
});

return process$.pipe(
filter(rd => statusIs(rd.payload, ProcessStatus.COMPLETED) || statusIs(rd.payload, ProcessStatus.FAILED)),
take(1)
distinctUntilChanged((previous: RemoteData<Process>, current: RemoteData<Process>) =>
previous.payload.processStatus === current.payload.processStatus
)
);
}
}
Loading

0 comments on commit 3be90eb

Please sign in to comment.