diff --git a/Conductor/Api/WorkflowResourceApi.cs b/Conductor/Api/WorkflowResourceApi.cs index f0969e27..409632af 100644 --- a/Conductor/Api/WorkflowResourceApi.cs +++ b/Conductor/Api/WorkflowResourceApi.cs @@ -72,6 +72,12 @@ public interface IWorkflowResourceApi : IApiAccessor /// WorkflowRun WorkflowRun ExecuteWorkflow(StartWorkflowRequest body, string requestId, string name, int? version, string waitUntilTaskRef = null); + /// + /// Update the value of the workflow variables for the given workflow id + /// + /// + /// ApiResponse of Object(void) + Object UpdateWorkflowVariables(Workflow workflow); /// /// Execute a workflow synchronously /// @@ -1029,6 +1035,73 @@ public ApiResponse ExecuteWorkflowWithHttpInfo(StartWorkflowRequest (WorkflowRun)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(WorkflowRun))); } + public Object UpdateWorkflowVariables(Workflow workflow) + { + ApiResponse localVarResponse = UpdateWorkflowVariablesWithHttpInfo(workflow); + return localVarResponse.Data; + } + + public ApiResponse UpdateWorkflowVariablesWithHttpInfo(Workflow workflow) + { + // verify the required parameter 'body' is set + if (workflow == null) + throw new ApiException(400, "Missing required parameter 'body' when calling WorkflowResourceApi->Update"); + + if (string.IsNullOrEmpty(workflow.WorkflowId)) + throw new ApiException(400, "Missing required parameter 'WorkflowId' when calling WorkflowResourceApi->Update"); + + if (workflow.Variables == null) + throw new ApiException(400, "Missing required parameter 'Variables' when calling WorkflowResourceApi->Update"); + + var localVarPath = $"/workflow/{workflow.WorkflowId}/variables"; + var localVarPathParams = new Dictionary(); + var localVarQueryParams = new List>(); + var localVarHeaderParams = new Dictionary(this.Configuration.DefaultHeader); + var localVarFormParams = new Dictionary(); + var localVarFileParams = new Dictionary(); + Object localVarPostBody = null; + + // to determine the Content-Type header + String[] localVarHttpContentTypes = new String[] { + "application/json" + }; + String localVarHttpContentType = this.Configuration.ApiClient.SelectHeaderContentType(localVarHttpContentTypes); + + // to determine the Accept header + String[] localVarHttpHeaderAccepts = new String[] { + "*/*" + }; + String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts); + if (localVarHttpHeaderAccept != null) + localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept); + + if (workflow != null && workflow.GetType() != typeof(byte[])) + { + localVarPostBody = this.Configuration.ApiClient.Serialize(workflow.Variables); + } + + // authentication (api_key) required + if (!String.IsNullOrEmpty(this.Configuration.AccessToken)) + { + localVarHeaderParams["X-Authorization"] = this.Configuration.AccessToken; + } + + IRestResponse localVarResponse = (IRestResponse)this.Configuration.ApiClient.CallApi(localVarPath, + Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams, + localVarPathParams, localVarHttpContentType); + int localVarStatusCode = (int)localVarResponse.StatusCode; + + if (ExceptionFactory != null) + { + Exception exception = ExceptionFactory("Update", localVarResponse); + if (exception != null) throw exception; + } + + return new ApiResponse(localVarStatusCode, + localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)), + (Object)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Object))); + } + /// /// Gets the workflow by workflow id /// diff --git a/Conductor/Client/Extensions/ConductorTaskExtensions.cs b/Conductor/Client/Extensions/ConductorTaskExtensions.cs index b56968e9..b7303ec6 100644 --- a/Conductor/Client/Extensions/ConductorTaskExtensions.cs +++ b/Conductor/Client/Extensions/ConductorTaskExtensions.cs @@ -18,7 +18,7 @@ public static TaskResult InProgress(this Task task, string log = null, long? cal { new TaskExecLog { TaskId = task.TaskId, Log = log, CreatedTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, - callbackAfterSeconds: callbackAfterSeconds.Value + callbackAfterSeconds: callbackAfterSeconds ); } diff --git a/Conductor/Definition/ConductorWorkflow.cs b/Conductor/Definition/ConductorWorkflow.cs index f7d44657..e22a01a9 100644 --- a/Conductor/Definition/ConductorWorkflow.cs +++ b/Conductor/Definition/ConductorWorkflow.cs @@ -62,6 +62,13 @@ public ConductorWorkflow WithOutputParameter(string key, object value) return this; } + public ConductorWorkflow WithVariable(string key, object value) + { + if (Variables == null) // if workflow does not have any variables, initialize with empty collection + Variables = new Dictionary(); + Variables.Add(key, value); + return this; + } public ConductorWorkflow WithOwner(string ownerEmail) { OwnerEmail = ownerEmail; diff --git a/Tests/Api/WorkflowResourceApiTest.cs b/Tests/Api/WorkflowResourceApiTest.cs new file mode 100644 index 00000000..343c3bc7 --- /dev/null +++ b/Tests/Api/WorkflowResourceApiTest.cs @@ -0,0 +1,97 @@ +using conductor.csharp.Client.Extensions; +using Conductor.Api; +using Conductor.Client.Extensions; +using Conductor.Client.Models; +using Conductor.Definition; +using Conductor.Definition.TaskType; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Tests.Worker; +using Xunit; + +namespace conductor_csharp.test.Api +{ + public class WorkflowResourceApiTest + { + private const string WORKFLOW_NAME = "TestToCreateVariables"; + private const string TASK_NAME = "TestToCreateVariables_Task"; + private const string WORKFLOW_VARIABLE_1 = "TestVariable1"; + private const string WORKFLOW_VARIABLE_2 = "TestVariable2"; + private const string WORKFLOW_DESC = "Test Workflow With Variables"; + private const int WORKFLOW_VERSION = 1; + + private readonly WorkflowResourceApi _workflowClient; + private readonly ILogger _logger; + + public WorkflowResourceApiTest() + { + _workflowClient = ApiExtensions.GetClient(); + _logger = ApplicationLogging.CreateLogger(); + } + + [Fact] + public async void UpdateWorkflowVariables() + { + // Prepare workflow + var _workflow = GetConductorWorkflow(); + ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(_workflow, true); + var workflowId = ApiExtensions.GetWorkflowExecutor().StartWorkflow(_workflow); + await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20)); + await ValidateWorkflowCompletion(workflowId); + + // Create variables collection with values to be updated + var updateDict = new Dictionary { + {WORKFLOW_VARIABLE_1,"Value1" }, + {WORKFLOW_VARIABLE_2,"Value2" }, + }; + var updateVariableData = new Workflow() { WorkflowId = workflowId, Variables = updateDict }; + // Update the work flow variables + _workflowClient.UpdateWorkflowVariables(updateVariableData); + // Fetch latest workflow data to validate the change in variables + var _updatedWorkFlow = _workflowClient.GetWorkflowStatusSummary(workflowId, includeVariables: true); + // Verify workflow variables data is equal with input passed + Assert.Equal(_updatedWorkFlow.Variables, updateDict); + } + + private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout) + { + var host = WorkflowTaskHost.CreateWorkerHost(LogLevel.Information, new ClassWorker()); + await host.StartAsync(); + Thread.Sleep(workflowCompletionTimeout); + await host.StopAsync(); + } + + private ConductorWorkflow GetConductorWorkflow() + { + return new ConductorWorkflow() + .WithName(WORKFLOW_NAME) + .WithVersion(WORKFLOW_VERSION) + .WithDescription(WORKFLOW_DESC) + .WithTask(new SimpleTask(TASK_NAME, TASK_NAME)) + .WithVariable(WORKFLOW_VARIABLE_1, $"{WORKFLOW_VARIABLE_1}_Value") + .WithVariable(WORKFLOW_VARIABLE_2, $"{WORKFLOW_VARIABLE_2}_Value"); + } + + private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params string[] workflowIdList) + { + var workflowStatusList = await WorkflowExtensions.GetWorkflowStatusList( + _workflowClient, + maxAllowedInParallel: 10, + workflowIdList + ); + var incompleteWorkflowCounter = 0; + workflowStatusList.ToList().ForEach(wf => + { + if (wf.Status.Value != WorkflowStatus.StatusEnum.COMPLETED) + { + incompleteWorkflowCounter += 1; + _logger.LogInformation($"Workflow not completed, workflowId: {wf.WorkflowId}"); + } + }); + Assert.Equal(0, incompleteWorkflowCounter); + } + } +} diff --git a/csharp-examples/Runner.cs b/csharp-examples/Runner.cs index 02134d01..24123c5c 100644 --- a/csharp-examples/Runner.cs +++ b/csharp-examples/Runner.cs @@ -50,8 +50,8 @@ public async void StartTasks() } } - while (true) - Thread.Sleep(TimeSpan.FromDays(1)); // after 1 year will stop the service + while (true) + Thread.Sleep(TimeSpan.FromDays(1));// after 1 year will stop the service } catch (Exception e) diff --git a/csharp-examples/WorkFlowExamples.cs b/csharp-examples/WorkFlowExamples.cs new file mode 100644 index 00000000..5922836e --- /dev/null +++ b/csharp-examples/WorkFlowExamples.cs @@ -0,0 +1,77 @@ +using Conductor.Client; +using Conductor.Definition.TaskType; +using Conductor.Definition; +using Conductor.Executor; +using Conductor.Api; +using Conductor.Client.Authentication; + +namespace csharp_examples +{ + public class WorkFlowExamples + { + + private const string KEY_ID = ""; + private const string KEY_SECRET = ""; + private const string OWNER_EMAIL = ""; + + private const string WORKFLOW_ID = ""; + private const string WORKFLOW_NAME = ""; + private const string WORKFLOW_DESCRIPTION = ""; + private const string TASK_NAME = ""; + private const string TASK_REFERENCE = ""; + + private const string VARIABLE_OLD_VALUE = "SOME_OLD_VALUE"; + private const string VARIABLE_NAME_1 = ""; + private const string VARIABLE_NEW_VALUE_1 = ""; + private const string VARIABLE_NAME_2 = ""; + private const string VARIABLE_NEW_VALUE_2 = ""; + + + public void RegisterWorkFlow() + { + Configuration configuration = new Configuration() + { + AuthenticationSettings = new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET) + }; + + WorkflowExecutor executor = new WorkflowExecutor(configuration); + executor.RegisterWorkflow(GetConductorWorkflow(), true); + } + + private ConductorWorkflow GetConductorWorkflow() + { + var conductorWorkFlow = new ConductorWorkflow() + .WithName(WORKFLOW_NAME).WithDescription(WORKFLOW_DESCRIPTION) + .WithTask(new SimpleTask(TASK_NAME, TASK_REFERENCE)) + .WithOwner(OWNER_EMAIL); + + var workflowVariableTobeAdded = new Dictionary + { + { VARIABLE_NAME_1, VARIABLE_OLD_VALUE}, + { VARIABLE_NAME_2, VARIABLE_OLD_VALUE } + }; + + conductorWorkFlow.Variables = workflowVariableTobeAdded; + return conductorWorkFlow; + } + + public void UpdateWorkflowVariablesWithWorkFlowId() + { + var orkesApiClient = new OrkesApiClient(new Configuration(), + new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET)); + var workflowClient = orkesApiClient.GetClient(); + var workFlowVariables = new Dictionary + { + { VARIABLE_NAME_1, VARIABLE_NEW_VALUE_1 }, + { VARIABLE_NAME_2, VARIABLE_NEW_VALUE_2 } + }; + + workflowClient.UpdateWorkflowVariables(new Conductor.Client.Models.Workflow() + { + WorkflowId = WORKFLOW_ID, + Variables = workFlowVariables + }); + } + + } +}