Skip to content

Commit

Permalink
feat: improve bulk API v2 client with event emitters and record count…
Browse files Browse the repository at this point in the history
… aggregation
  • Loading branch information
Codeneos committed Jun 12, 2024
1 parent 176704a commit 1994486
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 71 deletions.
3 changes: 2 additions & 1 deletion packages/salesforce/src/__tests__/bulkIngestJob.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ describe('bulkIngestJob', () => {
} as unknown as IngestJobInfo;
const expectedOptions = { contentType: 'text/csv; charset=utf-8' };
const clientMock = getRestClientMock({
post: jest.fn(() => Promise.resolve<object>({ id: '456' }))
post: jest.fn((job) => Promise.resolve<object>({ ...(job as object), id: '456' })),
patch: jest.fn((job, resource) => Promise.resolve<object>({ id: resource, ...(job as object) }))
});
const data = [{ Id: 1 },{ Id: 2 }];

Expand Down
111 changes: 89 additions & 22 deletions packages/salesforce/src/bulk/bulkIngestJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,23 @@ interface SuccessfulRecord {

export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJobInfo> {

/**
* The maximum size of the CSV data in bytes that is uploaded in a single request. Defaults to 100MB.
* If the encoded data exceeds this size it is split into multiple jobs.
*/
public static chunkDataSize: number = 100 * 1000000;

/**
* When `true` always wraps strings in double quotes even when not required. If `false` only strings
* that contain the delimiter character or double quotes are wrapped in double quotes. Defaults to `false` to save space.
*/
public alwaysQuote: boolean;

/**
* The maximum size of the CSV data in bytes that is uploaded in a single request. Defaults to 100MB.
* If the encoded data exceeds this size it is split into multiple jobs.
* Maximum size in bytes after which the ingest job data is split into multiple chunks. Defaults to 100MB.
* @see {@link BulkIngestJob.chunkDataSize}
*/
public chunkDataSize: number = 100 * 1000000;
public chunkDataSize: number = BulkIngestJob.chunkDataSize;

private numberFormat = new Intl.NumberFormat('en-US', { maximumSignificantDigits: 20 });
private encoding: BufferEncoding = 'utf-8';
Expand All @@ -93,10 +99,44 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
private readonly jobs: Record<string, IngestJobInfo>;

private readonly pendingRecords: TRecord[] = [];
private recordsCount: number = 0;
private isClosed: boolean = false;

/**
* Gets the external ID field name of the bulk ingest job.
* @returns The external ID field name.
*/
public get externalIdFieldName() {
return this.info.externalIdFieldName;
}

/**
* Gets the total number of records in the job and its associated jobs.
*/
public get recordsTotal() {
return this.recordsCount;
}

/**
* Gets the total number of failed records across all jobs.
* @returns The total number of failed records.
*/
public get recordsFailed() {
return Object.values(this.jobs).reduce((total, job) => total + (job.numberRecordsFailed ?? 0), 0);
}

/**
* Gets the IDs of all the jobs created for this ingest job. For small data uploads
* this will be a single job for large data uploads this will be multiple jobs.
* @returns An array of job IDs.
*/
public get ids() {
return Object.keys(this.jobs);
}

constructor(protected client: RestClient, info: IngestJobInfo) {
super(client, info);
this.jobs = { [info.id]: info };
this.jobs = { [info.id]: this.info };
}

/**
Expand All @@ -122,11 +162,16 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
* ```
*/
public async uploadData(data: TRecord[], options?: { keepOpen?: boolean }) {
if (this.state !== 'Open') {
throw new Error(`Cannot upload data to a job that is in ${this.state} state.`);
if (this.isClosed) {
throw new Error('Cannot upload additional data to a job that is already closed, create a new job instead.');
}

for (const record of data) {
this.validateRecord(record);
}

this.pendingRecords.push(...data);
this.recordsCount += data.length;

if (options?.keepOpen !== false) {
return this;
Expand All @@ -143,7 +188,8 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
private async createNewJob() {
await this.post({
object: this.object,
operation: this.operation as IngestOperationType
operation: this.operation as IngestOperationType,
externalIdFieldName: this.externalIdFieldName
});
this.jobs[this.id] = this.info;
}
Expand All @@ -166,8 +212,14 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
* and you can’t add any more job data.
*/
public async close() {
if (this.state !== 'Open') {
throw new Error(`Cannot close a job that is not in Open state. Current state: ${this.state}`);
if (this.isClosed) {
throw new Error('Cannot close a job that is already closed');
}
this.isClosed = true;

if (this.pendingRecords.length === 0) {
this.abort(job => job.state === 'Open');
return this;
}

for (const chunk of this.encodeData(this.pendingRecords.splice(0))) {
Expand All @@ -185,9 +237,12 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
/**
* Abort a job, the job doesn’t get queued or processed.
*/
public async abort() {
for (const job of Object.values(this.jobs)) {
await this.client.patch({ state: 'Aborted' }, job.id);
public async abort(predicate?: (job: IngestJobInfo) => any) {
for (const jobId in this.jobs) {
if (predicate && !predicate(this.jobs[jobId])) {
continue;
}
await this.client.patch({ state: 'Aborted' }, jobId);
}
return this.refresh();
}
Expand All @@ -196,26 +251,22 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
* Deletes a job. To be deleted, a job must have a state of `UploadComplete`, `JobComplete`, `Aborted`, or `Failed`.
*/
public async delete() {
for (const job of Object.values(this.jobs)) {
await this.client.delete(job.id);
for (const jobId in this.jobs) {
await this.client.delete(jobId);
}
return this;
}

/**
* Override to refresh the fn to return the aggregate state of all jobs that are part of this ingest.
*/
protected async refreshJobState() {
await this.refresh();
return this.aggregateJobState();
protected getRecordsProcessed() {
return Object.values(this.jobs).reduce((total, job) => total + (job.numberRecordsProcessed ?? 0), 0);
}

/**
* Get the aggregate state of all jobs that were created for this ingest job.
* Returns the least progressed state of all Salesforce jobs that were created for this ingest job.
*/
private aggregateJobState() {
const jobStateOrder: JobState[] = [ 'Open', 'UploadComplete', 'InProgress', 'Failed', 'JobComplete' ];
protected getJobState() {
const jobStateOrder: readonly JobState[] = [ 'Open', 'UploadComplete', 'InProgress', 'Failed', 'JobComplete' ];
const aggregateState: Record<JobState, IngestJobInfo[]> = groupBy(Object.values(this.jobs), job => job.state);

for (const state of jobStateOrder) {
Expand Down Expand Up @@ -264,6 +315,22 @@ export class BulkIngestJob<TRecord extends object = any> extends BulkJob<IngestJ
return resources;
}

private validateRecord(record: TRecord) {
if (Object.keys(record).length === 0) {
throw new Error('Cannot upload empty records');
}
if (Object.values(record).some(value => !this.isPrimitiveType(value))) {
throw new Error('Only primitive values are supported in records');
}
if (this.operation === 'upsert') {
const idField = this.externalIdFieldName ?? 'id';
const idValue = Object.entries(record).find(([key]) => key.toLowerCase() === idField.toLowerCase())?.[1];
if (idValue === undefined || idValue === null) {
throw new Error(`External ID field (${idField}) must be specified for upsert operations`);
}
}
}

private *encodeData(data: TRecord[]) {
// Detect columns
const columns = this.detectColumns(data);
Expand Down
93 changes: 45 additions & 48 deletions packages/salesforce/src/bulk/bulkJob.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { CancellationToken, setObjectProperty, wait } from "@vlocode/util";
import { RestClient } from '../restClient';
import EventEmitter from "events";
import { info } from "console";

export const ColumnDelimiters = {
'COMMA': ',',
Expand Down Expand Up @@ -99,7 +101,7 @@ export interface BulkJobInfo {
readonly errorMessage?: string;
}

export class BulkJob<T extends BulkJobInfo> {
export class BulkJob<T extends BulkJobInfo> extends EventEmitter {

/**
* Unique ID for this job.
Expand Down Expand Up @@ -129,7 +131,7 @@ export class BulkJob<T extends BulkJobInfo> {
* - `JobComplete` The job was processed by Salesforce.
* - `Failed` Some records in the job failed. Job data that was successfully processed isn’t rolled back.
*/
public get state() { return this.info.state; }
public get state() { return this.getJobState(); }

/**
* Boolean value indicating the job is still processing.
Expand Down Expand Up @@ -160,46 +162,26 @@ export class BulkJob<T extends BulkJobInfo> {
/**
* Number of records ingested or queried so far
*/
public get recordsProcessed() { return this.info.numberRecordsProcessed; }
public get recordsProcessed() { return this.getRecordsProcessed(); }

/**
* Error message when the job is in `Failed` state
*/
public get errorMessage() { return this.info.errorMessage; }
public get errorMessage() { return this.getFirstError(); }

/**
* Type of Bulk job
*/
public get type() { return this.info.jobType; }

/**
* Last known state of the job as returned by the API.
*/
public get info() { return this._info; }

/**
* Updates the info for this job.
*/
protected set info(value: T) {
if (!value) {
throw new Error('Job info cannot be set to null or undefined; use refresh() to refresh the job info or delete() to delete the job if the job is');
}
this._info = Object.freeze(value);
}

protected readonly delimiterCharacter: string;
protected readonly lineEndingCharacters: string;

/**
* Internal state detail of the job as last returned by the API.
*/
private _info: Readonly<T>;

constructor(protected client: RestClient, info: T) {
constructor(protected client: RestClient, public info: T) {
super();
if (info.columnDelimiter && !ColumnDelimiters[info.columnDelimiter]) {
throw new Error(`Invalid column delimiter ${info.columnDelimiter}`);
}
this._info = Object.freeze<T>({ ...info });
this.delimiterCharacter = ColumnDelimiters[info.columnDelimiter || 'COMMA'];
this.lineEndingCharacters = info.lineEnding === 'CRLF' ? '\r\n' : '\n';
}
Expand All @@ -223,7 +205,7 @@ export class BulkJob<T extends BulkJobInfo> {
* Refresh the job info and state of this job.
*/
public async refresh() {
this.info = await this.client.get<T>(this.id);
Object.assign(this.info, await this.client.get<T>(this.id));
return this;
}

Expand All @@ -233,7 +215,7 @@ export class BulkJob<T extends BulkJobInfo> {
* @returns Instance of this job
*/
protected async patch(info: Partial<T>) {
this.info = await this.client.patch<T>(info, this.id);
Object.assign(this.info, await this.client.patch<T>(info, this.id));
return this;
}

Expand All @@ -248,7 +230,8 @@ export class BulkJob<T extends BulkJobInfo> {
}

/**
* Poll the bulk API until the job is processed.
* Poll the bulk API until the job is completed. Returns a promise that resolves when the job is completed.
* Optionally registers event listeners for progress, finish and error events. When an error occurs the promise is rejected.
* @param interval Interval is ms when to refresh job data; when not set defaults to polling for job updates every 5 seconds
* @param cancelToken Cancellation token to stop the polling process
* @returns
Expand All @@ -261,35 +244,49 @@ export class BulkJob<T extends BulkJobInfo> {
}

while (!cancelToken?.isCancellationRequested) {
const state = await this.refreshJobState();

if (state === 'JobComplete' || state === 'Aborted') {
return this;
}

if (state === 'Failed') {
const error = this.getFirstError();
if (error) {
throw new Error(error);
await wait(interval ?? 5000, cancelToken);
await this.refresh();

switch (this.getJobState()) {
case 'JobComplete': {
this.emit('finish', this);
} return this;
case 'Aborted': {
this.emit('aborted', this);
} return this;
default: {
this.emit('progress', this);
} break;
case 'Failed': {
const errorMessage = this.getFirstError();
const error = errorMessage
? new Error(errorMessage)
: new Error(`Bulk ${this.info.jobType} job ${this.id} failed to complete; see job details for more information`);
this.emit('error', error);
throw error;
}
throw new Error(`Bulk ${this.info.jobType} job ${this.id} failed to complete; see job details for more information`);
}

await wait(interval ?? 5000, cancelToken);
}

return this;
}

/**
* Refreshes the job state and returns the current state of the job.
* @returns Current state of the job
*/
protected async refreshJobState(): Promise<JobState> {
await this.refresh();
public on(event: 'progress', listener: (job: this) => void): this;
public on(event: 'finish', listener: (job: this) => void): this;
public on(event: 'error', listener: (job: Error) => void): this;
public on(event: 'aborted', listener: (job: this) => void): this;
public on(event: string, listener: (...args: any) => void): this {
return super.on(event, listener);
}

protected getJobState() : JobState {
return this.info.state;
}

protected getRecordsProcessed() {
return this.info.numberRecordsProcessed;
}

protected getFirstError(): string | undefined {
return this.info['errorMessage'];
}
Expand Down

0 comments on commit 1994486

Please sign in to comment.