diff --git a/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts b/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts index dba5e5cf32..b6a8fdcf9a 100644 --- a/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts +++ b/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts @@ -52,13 +52,15 @@ const workflowOverview = createWorkflowOverviewMock(1); const workflowOverviews = createWorkflowOverviewsMock(3); const instance = createInstanceMock(1); const instances = createInstancesMock(3); +const serviceUrl = 'http://localhost'; +const inputData = { foo: 'bar' }; // Mocked dependencies const sonataFlowServiceMock = {} as SonataFlowService; const workflowCacheServiceMock = {} as WorkflowCacheService; const dataIndexServiceMock = {} as DataIndexService; -// Target service +// Target const orchestratorService = new OrchestratorService( sonataFlowServiceMock, dataIndexServiceMock, @@ -87,10 +89,8 @@ describe('OrchestratorService', () => { expect( dataIndexServiceMock.fetchDefinitionIdByInstanceId, - ).toHaveBeenCalledWith(instanceId); - expect(dataIndexServiceMock.abortWorkflowInstance).toHaveBeenCalledWith( - instanceId, - ); + ).toHaveBeenCalled(); + expect(dataIndexServiceMock.abortWorkflowInstance).toHaveBeenCalled(); }); it('should skip and not execute the operation when the workflow is not available', async () => { @@ -106,7 +106,7 @@ describe('OrchestratorService', () => { expect( dataIndexServiceMock.fetchDefinitionIdByInstanceId, - ).toHaveBeenCalledWith(instanceId); + ).toHaveBeenCalled(); expect(dataIndexServiceMock.abortWorkflowInstance).not.toHaveBeenCalled(); }); @@ -129,11 +129,8 @@ describe('OrchestratorService', () => { expect( dataIndexServiceMock.fetchDefinitionIdByInstanceId, - ).toHaveBeenCalledWith(instanceId); - expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalledWith( - definitionId, - 'throw', - ); + ).toHaveBeenCalled(); + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); expect(dataIndexServiceMock.abortWorkflowInstance).not.toHaveBeenCalled(); }); }); @@ -433,7 +430,6 @@ describe('OrchestratorService', () => { }); describe('fetchWorkflowInfoOnService', () => { - const serviceUrl = 'http://localhost'; beforeEach(() => { jest.clearAllMocks(); }); @@ -602,8 +598,6 @@ describe('OrchestratorService', () => { }); describe('executeWorkflow', () => { - const serviceUrl = 'http://localhost'; - const inputData = {}; const executeResponse: WorkflowExecutionResponse = { id: createInstanceIdMock(1), }; @@ -661,4 +655,129 @@ describe('OrchestratorService', () => { expect(sonataFlowServiceMock.executeWorkflow).not.toHaveBeenCalled(); }); }); + + describe('retriggerInstanceInError', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should execute the operation when the workflow is available', async () => { + workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(true); + sonataFlowServiceMock.retriggerInstanceInError = jest + .fn() + .mockResolvedValue(true); + + await orchestratorService.retriggerInstanceInError({ + definitionId, + serviceUrl, + instanceId, + cacheHandler: 'skip', + }); + + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); + expect(sonataFlowServiceMock.retriggerInstanceInError).toHaveBeenCalled(); + }); + + it('should skip and not execute the operation when the workflow is not available', async () => { + workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(false); + + await orchestratorService.retriggerInstanceInError({ + definitionId, + serviceUrl, + instanceId, + cacheHandler: 'skip', + }); + + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); + expect( + sonataFlowServiceMock.retriggerInstanceInError, + ).not.toHaveBeenCalled(); + }); + + it('should throw an error and not execute the operation when the workflow is not available', async () => { + workflowCacheServiceMock.isAvailable = jest + .fn() + .mockImplementation(() => { + throw new Error(); + }); + + const promise = orchestratorService.retriggerInstanceInError({ + definitionId, + serviceUrl, + instanceId, + cacheHandler: 'throw', + }); + + await expect(promise).rejects.toThrow(); + + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); + expect( + sonataFlowServiceMock.retriggerInstanceInError, + ).not.toHaveBeenCalled(); + }); + }); + + describe('updateInstanceInputData', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should execute the operation when the workflow is available', async () => { + workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(true); + sonataFlowServiceMock.updateInstanceInputData = jest + .fn() + .mockResolvedValue(true); + + await orchestratorService.updateInstanceInputData({ + definitionId, + serviceUrl, + instanceId, + inputData, + cacheHandler: 'skip', + }); + + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); + expect(sonataFlowServiceMock.updateInstanceInputData).toHaveBeenCalled(); + }); + + it('should skip and not execute the operation when the workflow is not available', async () => { + workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(false); + + await orchestratorService.updateInstanceInputData({ + definitionId, + serviceUrl, + instanceId, + inputData, + cacheHandler: 'skip', + }); + + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); + expect( + sonataFlowServiceMock.updateInstanceInputData, + ).not.toHaveBeenCalled(); + }); + + it('should throw an error and not execute the operation when the workflow is not available', async () => { + workflowCacheServiceMock.isAvailable = jest + .fn() + .mockImplementation(() => { + throw new Error(); + }); + + const promise = orchestratorService.updateInstanceInputData({ + definitionId, + serviceUrl, + instanceId, + inputData, + cacheHandler: 'throw', + }); + + await expect(promise).rejects.toThrow(); + + expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled(); + expect( + sonataFlowServiceMock.updateInstanceInputData, + ).not.toHaveBeenCalled(); + }); + }); }); diff --git a/plugins/orchestrator-backend/src/service/OrchestratorService.ts b/plugins/orchestrator-backend/src/service/OrchestratorService.ts index 3cb70166db..00fbd12322 100644 --- a/plugins/orchestrator-backend/src/service/OrchestratorService.ts +++ b/plugins/orchestrator-backend/src/service/OrchestratorService.ts @@ -161,7 +161,7 @@ export class OrchestratorService { public async executeWorkflow(args: { definitionId: string; serviceUrl: string; - inputData: Record; + inputData: ProcessInstanceVariables; businessKey?: string; cacheHandler?: CacheHandler; }): Promise { @@ -188,4 +188,41 @@ export class OrchestratorService { ? await this.sonataFlowService.fetchWorkflowOverview(definitionId) : undefined; } + + public async retriggerInstanceInError(args: { + definitionId: string; + serviceUrl: string; + instanceId: string; + cacheHandler?: CacheHandler; + }): Promise { + const { definitionId, cacheHandler } = args; + + const isWorkflowAvailable = this.workflowCacheService.isAvailable( + definitionId, + cacheHandler, + ); + + return isWorkflowAvailable + ? await this.sonataFlowService.retriggerInstanceInError(args) + : false; + } + + public async updateInstanceInputData(args: { + definitionId: string; + serviceUrl: string; + instanceId: string; + inputData: ProcessInstanceVariables; + cacheHandler?: CacheHandler; + }): Promise { + const { definitionId, cacheHandler } = args; + + const isWorkflowAvailable = this.workflowCacheService.isAvailable( + definitionId, + cacheHandler, + ); + + return isWorkflowAvailable + ? await this.sonataFlowService.updateInstanceInputData(args) + : false; + } } diff --git a/plugins/orchestrator-backend/src/service/SonataFlowService.ts b/plugins/orchestrator-backend/src/service/SonataFlowService.ts index c79176a676..824d2e0949 100644 --- a/plugins/orchestrator-backend/src/service/SonataFlowService.ts +++ b/plugins/orchestrator-backend/src/service/SonataFlowService.ts @@ -6,6 +6,7 @@ import { getWorkflowCategory, ProcessInstance, ProcessInstanceStateValues, + ProcessInstanceVariables, WorkflowDefinition, WorkflowExecutionResponse, WorkflowInfo, @@ -14,7 +15,6 @@ import { import { Pagination } from '../types/pagination'; import { DataIndexService } from './DataIndexService'; -import { executeWithRetry } from './Helper'; export class SonataFlowService { constructor( @@ -28,7 +28,7 @@ export class SonataFlowService { }): Promise { try { const urlToFetch = `${args.serviceUrl}/management/processes/${args.definitionId}`; - const response = await executeWithRetry(() => fetch(urlToFetch)); + const response = await fetch(urlToFetch); if (response.ok) { const json = await response.json(); @@ -93,15 +93,15 @@ export class SonataFlowService { public async executeWorkflow(args: { definitionId: string; serviceUrl: string; - inputData: Record; + inputData: ProcessInstanceVariables; businessKey?: string; }): Promise { try { - const workflowEndpoint = args.businessKey + const urlToFetch = args.businessKey ? `${args.serviceUrl}/${args.definitionId}?businessKey=${args.businessKey}` : `${args.serviceUrl}/${args.definitionId}`; - const result = await fetch(workflowEndpoint, { + const result = await fetch(urlToFetch, { method: 'POST', body: JSON.stringify(args.inputData), headers: { 'content-type': 'application/json' }, @@ -195,4 +195,43 @@ export class SonataFlowService { } return false; } + + public async retriggerInstanceInError(args: { + definitionId: string; + serviceUrl: string; + instanceId: string; + }): Promise { + const { definitionId, serviceUrl, instanceId } = args; + try { + const urlToFetch = `${serviceUrl}/management/processes/${definitionId}/instances/${instanceId}/retrigger`; + const response = await fetch(urlToFetch, { + method: 'POST', + }); + return response.ok; + } catch (error) { + this.logger.error(`Error when retriggering workflow in error: ${error}`); + } + return false; + } + + public async updateInstanceInputData(args: { + definitionId: string; + serviceUrl: string; + instanceId: string; + inputData: ProcessInstanceVariables; + }): Promise { + const { definitionId, serviceUrl, instanceId, inputData } = args; + try { + const urlToFetch = `${serviceUrl}/${definitionId}/${instanceId}`; + const response = await fetch(urlToFetch, { + method: 'PATCH', + body: JSON.stringify(inputData), + headers: { 'content-type': 'application/json' }, + }); + return response.ok; + } catch (error) { + this.logger.error(`Error when updating instance input data: ${error}`); + } + return false; + } } diff --git a/plugins/orchestrator-backend/src/service/api/v1.test.ts b/plugins/orchestrator-backend/src/service/api/v1.test.ts new file mode 100644 index 0000000000..0676ea5195 --- /dev/null +++ b/plugins/orchestrator-backend/src/service/api/v1.test.ts @@ -0,0 +1,134 @@ +import { ProcessInstance } from '@janus-idp/backstage-plugin-orchestrator-common'; + +import { OrchestratorService } from '../OrchestratorService'; +import { V1 } from './v1'; + +// Mocked data +const inputData = {}; + +// Mocked data helpers +const createInstance = (args: Partial): ProcessInstance => ({ + id: args.id || 'instanceId', + processId: args.processId || 'processId', + state: args.state || 'ACTIVE', + serviceUrl: args.serviceUrl || 'http://localhost', + endpoint: args.endpoint || 'http://localhost', + nodes: args.nodes || [], +}); + +// Mocked dependencies +const orchestratorServiceMock = {} as OrchestratorService; +orchestratorServiceMock.fetchInstance = jest.fn(); +orchestratorServiceMock.updateInstanceInputData = jest.fn(); +orchestratorServiceMock.retriggerInstanceInError = jest.fn(); + +// Target +const v1 = new V1(orchestratorServiceMock); + +describe('retriggerInstanceInError', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should retrigger an instance in error state', async () => { + const instance = createInstance({ state: 'ERROR' }); + + orchestratorServiceMock.fetchInstance = jest + .fn() + .mockResolvedValue(instance); + orchestratorServiceMock.updateInstanceInputData = jest + .fn() + .mockResolvedValue(true); + orchestratorServiceMock.retriggerInstanceInError = jest + .fn() + .mockResolvedValue(true); + + const response = await v1.retriggerInstanceInError(instance.id, inputData); + + expect(response).toStrictEqual({ id: instance.id }); + expect(orchestratorServiceMock.fetchInstance).toHaveBeenCalled(); + expect(orchestratorServiceMock.updateInstanceInputData).toHaveBeenCalled(); + expect(orchestratorServiceMock.retriggerInstanceInError).toHaveBeenCalled(); + }); + + it('should throw an error if the instance is not found', async () => { + orchestratorServiceMock.fetchInstance = jest + .fn() + .mockResolvedValue(undefined); + + const promise = v1.retriggerInstanceInError('unknown', inputData); + + await expect(promise).rejects.toThrow(); + + expect(orchestratorServiceMock.fetchInstance).toHaveBeenCalled(); + expect( + orchestratorServiceMock.updateInstanceInputData, + ).not.toHaveBeenCalled(); + expect( + orchestratorServiceMock.retriggerInstanceInError, + ).not.toHaveBeenCalled(); + }); + + it('should throw an error if instance is not in error state', async () => { + const instance = createInstance({ state: 'ACTIVE' }); + + orchestratorServiceMock.fetchInstance = jest + .fn() + .mockResolvedValue(instance); + + const promise = v1.retriggerInstanceInError(instance.id, inputData); + + await expect(promise).rejects.toThrow(); + + expect(orchestratorServiceMock.fetchInstance).toHaveBeenCalled(); + expect( + orchestratorServiceMock.updateInstanceInputData, + ).not.toHaveBeenCalled(); + expect( + orchestratorServiceMock.retriggerInstanceInError, + ).not.toHaveBeenCalled(); + }); + + it('should throw an error if could not update the instance input data', async () => { + const instance = createInstance({ state: 'ERROR' }); + + orchestratorServiceMock.fetchInstance = jest + .fn() + .mockResolvedValue(instance); + orchestratorServiceMock.updateInstanceInputData = jest + .fn() + .mockResolvedValue(false); + + const promise = v1.retriggerInstanceInError(instance.id, inputData); + + await expect(promise).rejects.toThrow(); + + expect(orchestratorServiceMock.fetchInstance).toHaveBeenCalled(); + expect(orchestratorServiceMock.updateInstanceInputData).toHaveBeenCalled(); + expect( + orchestratorServiceMock.retriggerInstanceInError, + ).not.toHaveBeenCalled(); + }); + + it('should throw an error if could not retrigger the instance', async () => { + const instance = createInstance({ state: 'ERROR' }); + + orchestratorServiceMock.fetchInstance = jest + .fn() + .mockResolvedValue(instance); + orchestratorServiceMock.updateInstanceInputData = jest + .fn() + .mockResolvedValue(true); + orchestratorServiceMock.retriggerInstanceInError = jest + .fn() + .mockResolvedValue(false); + + const promise = v1.retriggerInstanceInError(instance.id, inputData); + + await expect(promise).rejects.toThrow(); + + expect(orchestratorServiceMock.fetchInstance).toHaveBeenCalled(); + expect(orchestratorServiceMock.updateInstanceInputData).toHaveBeenCalled(); + expect(orchestratorServiceMock.retriggerInstanceInError).toHaveBeenCalled(); + }); +}); diff --git a/plugins/orchestrator-backend/src/service/api/v1.ts b/plugins/orchestrator-backend/src/service/api/v1.ts index 9cb0a9a5c3..82f51f2e94 100644 --- a/plugins/orchestrator-backend/src/service/api/v1.ts +++ b/plugins/orchestrator-backend/src/service/api/v1.ts @@ -3,6 +3,7 @@ import express from 'express'; import { AssessedProcessInstance, ProcessInstance, + ProcessInstanceVariables, WorkflowDefinition, WorkflowExecutionResponse, WorkflowOverview, @@ -108,7 +109,7 @@ export class V1 { } public async executeWorkflow( - reqBody: Record, + inputData: ProcessInstanceVariables, definitionId: string, businessKey: string | undefined, ): Promise { @@ -124,7 +125,7 @@ export class V1 { } const executionResponse = await this.orchestratorService.executeWorkflow({ definitionId: definitionId, - inputData: reqBody, + inputData, serviceUrl: definition.serviceUrl, businessKey, cacheHandler: 'throw', @@ -155,6 +156,53 @@ export class V1 { }); } + public async retriggerInstanceInError( + instanceId: string, + inputData: ProcessInstanceVariables, + ): Promise { + const instance = await this.orchestratorService.fetchInstance({ + instanceId, + cacheHandler: 'throw', + }); + + if (!instance?.serviceUrl) { + throw new Error(`Couldn't fetch process instance ${instanceId}`); + } + + if (instance.state !== 'ERROR') { + throw new Error( + `Can't retrigger an instance on ${instance.state} state.`, + ); + } + + const isUpdateInstanceInputDataOk = + await this.orchestratorService.updateInstanceInputData({ + definitionId: instance.processId, + instanceId, + inputData, + serviceUrl: instance.serviceUrl, + cacheHandler: 'throw', + }); + + if (!isUpdateInstanceInputDataOk) { + throw new Error(`Couldn't update instance input data for ${instanceId}`); + } + + const isRetriggerInstanceInErrorOk = + await this.orchestratorService.retriggerInstanceInError({ + definitionId: instance.processId, + instanceId, + serviceUrl: instance.serviceUrl, + cacheHandler: 'throw', + }); + + if (!isRetriggerInstanceInErrorOk) { + throw new Error(`Couldn't retrigger instance in error for ${instanceId}`); + } + + return { id: instanceId }; + } + public extractQueryParam( req: express.Request, key: string, diff --git a/plugins/orchestrator-backend/src/service/router.ts b/plugins/orchestrator-backend/src/service/router.ts index c1e846e272..4ea5b2b87e 100644 --- a/plugins/orchestrator-backend/src/service/router.ts +++ b/plugins/orchestrator-backend/src/service/router.ts @@ -307,8 +307,8 @@ function setupInternalRoutes( await routerApi.v1 .executeWorkflow(req.body, workflowId, businessKey) - .then((result: any) => res.status(200).json(result)) - .catch((error: { message: any }) => { + .then(result => res.status(200).json(result)) + .catch((error: { message: string }) => { res .status(500) .json({ message: error.message || INTERNAL_SERVER_ERROR_MESSAGE }); @@ -337,6 +337,22 @@ function setupInternalRoutes( }, ); + // v1 + router.post('/instances/:instanceId/retrigger', async (req, res) => { + const { + params: { instanceId }, + } = req; + + await routerApi.v1 + .retriggerInstanceInError(instanceId, req.body) + .then(result => res.status(200).json(result)) + .catch((error: { message: string }) => { + res + .status(500) + .json({ message: error.message || INTERNAL_SERVER_ERROR_MESSAGE }); + }); + }); + // v1 router.get('/workflows/:workflowId/overview', async (req, res) => { const { @@ -496,7 +512,11 @@ function setupInternalRoutes( if (!workflowInfo.inputSchema) { res .status(500) - .send(`failed to retreive schema ${definition.dataInputSchema}`); + .send( + `failed to retreive schema ${JSON.stringify( + definition.dataInputSchema, + )}`, + ); return; } diff --git a/plugins/orchestrator-backend/src/types/apiResponse.ts b/plugins/orchestrator-backend/src/types/apiResponse.ts deleted file mode 100644 index 3e39c9b66c..0000000000 --- a/plugins/orchestrator-backend/src/types/apiResponse.ts +++ /dev/null @@ -1,31 +0,0 @@ -export interface ApiResponse { - message?: string; - result?: any; - backEndErrCd?: string; -} - -export class ApiResponseBuilder { - static SUCCESS_RESPONSE(result: any, message = 'success'): ApiResponse { - return { - result: result, - message: message, - }; - } - - static VALIDATION_ERR_RESPONSE( - backEndErrCd = 'backend validation error code', - message = 'backend validation error', - ): ApiResponse { - return { - message: message, - backEndErrCd: backEndErrCd, - }; - } - - static HTTP_ERR_RESPONSE(message = 'Internal Server Error'): ApiResponse { - return { - result: null, - message: message, - }; - } -} diff --git a/plugins/orchestrator/src/api/MockOrchestratorClient.ts b/plugins/orchestrator/src/api/MockOrchestratorClient.ts index e413d8bbd6..759185cfc4 100644 --- a/plugins/orchestrator/src/api/MockOrchestratorClient.ts +++ b/plugins/orchestrator/src/api/MockOrchestratorClient.ts @@ -16,6 +16,12 @@ import { OrchestratorApi } from './api'; export interface MockOrchestratorApiData { executeWorkflowResponse: () => ReturnType; getInstanceResponse: () => ReturnType; + retriggerInstanceInErrorResponse: () => ReturnType< + OrchestratorApi['retriggerInstanceInError'] + >; + abortWorkflowInstanceResponse: () => ReturnType< + OrchestratorApi['abortWorkflowInstance'] + >; listInstancesResponse: ReturnType; getWorkflowDefinitionResponse: ReturnType< OrchestratorApi['getWorkflowDefinition'] @@ -138,6 +144,27 @@ export class MockOrchestratorClient implements OrchestratorApi { } abortWorkflowInstance(_instanceId: string): Promise { - return Promise.resolve(undefined); + if ( + !hasOwnProp(this._mockData, 'abortWorkflowInstanceResponse') || + !isNonNullable(this._mockData.abortWorkflowInstanceResponse) + ) { + throw new Error(`[abortWorkflowInstance]: No mock data available`); + } + + return this._mockData.abortWorkflowInstanceResponse(); + } + + retriggerInstanceInError(_args: { + instanceId: string; + inputData: JsonObject; + }): Promise { + if ( + !hasOwnProp(this._mockData, 'retriggerInstanceInErrorResponse') || + !isNonNullable(this._mockData.retriggerInstanceInErrorResponse) + ) { + throw new Error(`[retriggerInstanceInError]: No mock data available`); + } + + return this._mockData.retriggerInstanceInErrorResponse(); } } diff --git a/plugins/orchestrator/src/api/OrchestratorClient.ts b/plugins/orchestrator/src/api/OrchestratorClient.ts index a42782dc7c..51d2b0fa3b 100644 --- a/plugins/orchestrator/src/api/OrchestratorClient.ts +++ b/plugins/orchestrator/src/api/OrchestratorClient.ts @@ -141,11 +141,28 @@ export class OrchestratorClient implements OrchestratorApi { } async getWorkflowOverview(workflowId: string): Promise { - const baseUrl = await this.discoveryApi.getBaseUrl('orchestrator'); + const baseUrl = await this.getBaseUrl(); const res = await fetch(`${baseUrl}/workflows/${workflowId}/overview`); if (!res.ok) { throw await ResponseError.fromResponse(res); } return res.json(); } + + async retriggerInstanceInError(args: { + instanceId: string; + inputData: JsonObject; + }): Promise { + const baseUrl = await this.getBaseUrl(); + const urlToFetch = `${baseUrl}/instances/${args.instanceId}/retrigger`; + const response = await fetch(urlToFetch, { + method: 'POST', + body: JSON.stringify(args.inputData), + headers: { 'content-type': 'application/json' }, + }); + if (!response.ok) { + throw await ResponseError.fromResponse(response); + } + return response.json(); + } } diff --git a/plugins/orchestrator/src/api/api.ts b/plugins/orchestrator/src/api/api.ts index ad3a1f971b..54a9304b10 100644 --- a/plugins/orchestrator/src/api/api.ts +++ b/plugins/orchestrator/src/api/api.ts @@ -20,6 +20,11 @@ export interface OrchestratorApi { businessKey?: string; }): Promise; + retriggerInstanceInError(args: { + instanceId: string; + inputData: JsonObject; + }): Promise; + getWorkflowDefinition(workflowId: string): Promise; getWorkflowSource(workflowId: string): Promise;