Skip to content

Commit

Permalink
feat(orchestrator): add endpoint to retrigger workflow in error state (
Browse files Browse the repository at this point in the history
…janus-idp#1343)

* feat(orchestrator): add endpoint to retrigger workflow in error state

* Minor improvements
  • Loading branch information
caponetto authored Apr 25, 2024
1 parent ff1629a commit 328d23a
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 58 deletions.
147 changes: 133 additions & 14 deletions plugins/orchestrator-backend/src/service/OrchestratorService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ const workflowOverview = createWorkflowOverviewMock(1);
const workflowOverviews = createWorkflowOverviewsMock(3);
const instance = createInstanceMock(1);
const instances = createInstancesMock(3);
const serviceUrl = 'http://localhost';
const inputData = { foo: 'bar' };

// Mocked dependencies
const sonataFlowServiceMock = {} as SonataFlowService;
const workflowCacheServiceMock = {} as WorkflowCacheService;
const dataIndexServiceMock = {} as DataIndexService;

// Target service
// Target
const orchestratorService = new OrchestratorService(
sonataFlowServiceMock,
dataIndexServiceMock,
Expand Down Expand Up @@ -87,10 +89,8 @@ describe('OrchestratorService', () => {

expect(
dataIndexServiceMock.fetchDefinitionIdByInstanceId,
).toHaveBeenCalledWith(instanceId);
expect(dataIndexServiceMock.abortWorkflowInstance).toHaveBeenCalledWith(
instanceId,
);
).toHaveBeenCalled();
expect(dataIndexServiceMock.abortWorkflowInstance).toHaveBeenCalled();
});

it('should skip and not execute the operation when the workflow is not available', async () => {
Expand All @@ -106,7 +106,7 @@ describe('OrchestratorService', () => {

expect(
dataIndexServiceMock.fetchDefinitionIdByInstanceId,
).toHaveBeenCalledWith(instanceId);
).toHaveBeenCalled();
expect(dataIndexServiceMock.abortWorkflowInstance).not.toHaveBeenCalled();
});

Expand All @@ -129,11 +129,8 @@ describe('OrchestratorService', () => {

expect(
dataIndexServiceMock.fetchDefinitionIdByInstanceId,
).toHaveBeenCalledWith(instanceId);
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalledWith(
definitionId,
'throw',
);
).toHaveBeenCalled();
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(dataIndexServiceMock.abortWorkflowInstance).not.toHaveBeenCalled();
});
});
Expand Down Expand Up @@ -433,7 +430,6 @@ describe('OrchestratorService', () => {
});

describe('fetchWorkflowInfoOnService', () => {
const serviceUrl = 'http://localhost';
beforeEach(() => {
jest.clearAllMocks();
});
Expand Down Expand Up @@ -602,8 +598,6 @@ describe('OrchestratorService', () => {
});

describe('executeWorkflow', () => {
const serviceUrl = 'http://localhost';
const inputData = {};
const executeResponse: WorkflowExecutionResponse = {
id: createInstanceIdMock(1),
};
Expand Down Expand Up @@ -661,4 +655,129 @@ describe('OrchestratorService', () => {
expect(sonataFlowServiceMock.executeWorkflow).not.toHaveBeenCalled();
});
});

describe('retriggerInstanceInError', () => {
beforeEach(() => {
jest.clearAllMocks();
});

it('should execute the operation when the workflow is available', async () => {
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(true);
sonataFlowServiceMock.retriggerInstanceInError = jest
.fn()
.mockResolvedValue(true);

await orchestratorService.retriggerInstanceInError({
definitionId,
serviceUrl,
instanceId,
cacheHandler: 'skip',
});

expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(sonataFlowServiceMock.retriggerInstanceInError).toHaveBeenCalled();
});

it('should skip and not execute the operation when the workflow is not available', async () => {
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(false);

await orchestratorService.retriggerInstanceInError({
definitionId,
serviceUrl,
instanceId,
cacheHandler: 'skip',
});

expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(
sonataFlowServiceMock.retriggerInstanceInError,
).not.toHaveBeenCalled();
});

it('should throw an error and not execute the operation when the workflow is not available', async () => {
workflowCacheServiceMock.isAvailable = jest
.fn()
.mockImplementation(() => {
throw new Error();
});

const promise = orchestratorService.retriggerInstanceInError({
definitionId,
serviceUrl,
instanceId,
cacheHandler: 'throw',
});

await expect(promise).rejects.toThrow();

expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(
sonataFlowServiceMock.retriggerInstanceInError,
).not.toHaveBeenCalled();
});
});

describe('updateInstanceInputData', () => {
beforeEach(() => {
jest.clearAllMocks();
});

it('should execute the operation when the workflow is available', async () => {
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(true);
sonataFlowServiceMock.updateInstanceInputData = jest
.fn()
.mockResolvedValue(true);

await orchestratorService.updateInstanceInputData({
definitionId,
serviceUrl,
instanceId,
inputData,
cacheHandler: 'skip',
});

expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(sonataFlowServiceMock.updateInstanceInputData).toHaveBeenCalled();
});

it('should skip and not execute the operation when the workflow is not available', async () => {
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(false);

await orchestratorService.updateInstanceInputData({
definitionId,
serviceUrl,
instanceId,
inputData,
cacheHandler: 'skip',
});

expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(
sonataFlowServiceMock.updateInstanceInputData,
).not.toHaveBeenCalled();
});

it('should throw an error and not execute the operation when the workflow is not available', async () => {
workflowCacheServiceMock.isAvailable = jest
.fn()
.mockImplementation(() => {
throw new Error();
});

const promise = orchestratorService.updateInstanceInputData({
definitionId,
serviceUrl,
instanceId,
inputData,
cacheHandler: 'throw',
});

await expect(promise).rejects.toThrow();

expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
expect(
sonataFlowServiceMock.updateInstanceInputData,
).not.toHaveBeenCalled();
});
});
});
39 changes: 38 additions & 1 deletion plugins/orchestrator-backend/src/service/OrchestratorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export class OrchestratorService {
public async executeWorkflow(args: {
definitionId: string;
serviceUrl: string;
inputData: Record<string, string>;
inputData: ProcessInstanceVariables;
businessKey?: string;
cacheHandler?: CacheHandler;
}): Promise<WorkflowExecutionResponse | undefined> {
Expand All @@ -188,4 +188,41 @@ export class OrchestratorService {
? await this.sonataFlowService.fetchWorkflowOverview(definitionId)
: undefined;
}

public async retriggerInstanceInError(args: {
definitionId: string;
serviceUrl: string;
instanceId: string;
cacheHandler?: CacheHandler;
}): Promise<boolean> {
const { definitionId, cacheHandler } = args;

const isWorkflowAvailable = this.workflowCacheService.isAvailable(
definitionId,
cacheHandler,
);

return isWorkflowAvailable
? await this.sonataFlowService.retriggerInstanceInError(args)
: false;
}

public async updateInstanceInputData(args: {
definitionId: string;
serviceUrl: string;
instanceId: string;
inputData: ProcessInstanceVariables;
cacheHandler?: CacheHandler;
}): Promise<boolean> {
const { definitionId, cacheHandler } = args;

const isWorkflowAvailable = this.workflowCacheService.isAvailable(
definitionId,
cacheHandler,
);

return isWorkflowAvailable
? await this.sonataFlowService.updateInstanceInputData(args)
: false;
}
}
49 changes: 44 additions & 5 deletions plugins/orchestrator-backend/src/service/SonataFlowService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
getWorkflowCategory,
ProcessInstance,
ProcessInstanceStateValues,
ProcessInstanceVariables,
WorkflowDefinition,
WorkflowExecutionResponse,
WorkflowInfo,
Expand All @@ -14,7 +15,6 @@ import {

import { Pagination } from '../types/pagination';
import { DataIndexService } from './DataIndexService';
import { executeWithRetry } from './Helper';

export class SonataFlowService {
constructor(
Expand All @@ -28,7 +28,7 @@ export class SonataFlowService {
}): Promise<WorkflowInfo | undefined> {
try {
const urlToFetch = `${args.serviceUrl}/management/processes/${args.definitionId}`;
const response = await executeWithRetry(() => fetch(urlToFetch));
const response = await fetch(urlToFetch);

if (response.ok) {
const json = await response.json();
Expand Down Expand Up @@ -93,15 +93,15 @@ export class SonataFlowService {
public async executeWorkflow(args: {
definitionId: string;
serviceUrl: string;
inputData: Record<string, string>;
inputData: ProcessInstanceVariables;
businessKey?: string;
}): Promise<WorkflowExecutionResponse | undefined> {
try {
const workflowEndpoint = args.businessKey
const urlToFetch = args.businessKey
? `${args.serviceUrl}/${args.definitionId}?businessKey=${args.businessKey}`
: `${args.serviceUrl}/${args.definitionId}`;

const result = await fetch(workflowEndpoint, {
const result = await fetch(urlToFetch, {
method: 'POST',
body: JSON.stringify(args.inputData),
headers: { 'content-type': 'application/json' },
Expand Down Expand Up @@ -195,4 +195,43 @@ export class SonataFlowService {
}
return false;
}

public async retriggerInstanceInError(args: {
definitionId: string;
serviceUrl: string;
instanceId: string;
}): Promise<boolean> {
const { definitionId, serviceUrl, instanceId } = args;
try {
const urlToFetch = `${serviceUrl}/management/processes/${definitionId}/instances/${instanceId}/retrigger`;
const response = await fetch(urlToFetch, {
method: 'POST',
});
return response.ok;
} catch (error) {
this.logger.error(`Error when retriggering workflow in error: ${error}`);
}
return false;
}

public async updateInstanceInputData(args: {
definitionId: string;
serviceUrl: string;
instanceId: string;
inputData: ProcessInstanceVariables;
}): Promise<boolean> {
const { definitionId, serviceUrl, instanceId, inputData } = args;
try {
const urlToFetch = `${serviceUrl}/${definitionId}/${instanceId}`;
const response = await fetch(urlToFetch, {
method: 'PATCH',
body: JSON.stringify(inputData),
headers: { 'content-type': 'application/json' },
});
return response.ok;
} catch (error) {
this.logger.error(`Error when updating instance input data: ${error}`);
}
return false;
}
}
Loading

0 comments on commit 328d23a

Please sign in to comment.