Skip to content

Commit

Permalink
Changes to address UpdateVariableFunctionality and Inprogress throwin…
Browse files Browse the repository at this point in the history
…g error issue (#100)

* Changes to address the below
1. update variables functionality,
2. Inprogress throws exception when default param value after callbackAfterSeconds

* Revert "Changes to address the below"

This reverts commit 67d56b4.

* Changes to address the below
1. update variables functionality,
2. Inprogress throws exception when default param value after callbackAfterSeconds

* Resolved lint whitespace errors

* Fixing the space issues

---------

Co-authored-by: Jithesh.Poojary <Jithesh.Poojary@TL300>
  • Loading branch information
Jithesh-poojary and Jithesh.Poojary authored Jan 23, 2024
1 parent ae86f42 commit 399ed7c
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 3 deletions.
73 changes: 73 additions & 0 deletions Conductor/Api/WorkflowResourceApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public interface IWorkflowResourceApi : IApiAccessor
/// <returns>WorkflowRun</returns>
WorkflowRun ExecuteWorkflow(StartWorkflowRequest body, string requestId, string name, int? version, string waitUntilTaskRef = null);

/// <summary>
/// Update the value of the workflow variables for the given workflow id
/// </summary>
/// <param name="workflow"></param>
/// <returns>ApiResponse of Object(void)</returns>
Object UpdateWorkflowVariables(Workflow workflow);
/// <summary>
/// Execute a workflow synchronously
/// </summary>
Expand Down Expand Up @@ -1029,6 +1035,73 @@ public ApiResponse<WorkflowRun> ExecuteWorkflowWithHttpInfo(StartWorkflowRequest
(WorkflowRun)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(WorkflowRun)));
}

public Object UpdateWorkflowVariables(Workflow workflow)
{
ApiResponse<Object> localVarResponse = UpdateWorkflowVariablesWithHttpInfo(workflow);
return localVarResponse.Data;
}

public ApiResponse<Object> UpdateWorkflowVariablesWithHttpInfo(Workflow workflow)
{
// verify the required parameter 'body' is set
if (workflow == null)
throw new ApiException(400, "Missing required parameter 'body' when calling WorkflowResourceApi->Update");

if (string.IsNullOrEmpty(workflow.WorkflowId))
throw new ApiException(400, "Missing required parameter 'WorkflowId' when calling WorkflowResourceApi->Update");

if (workflow.Variables == null)
throw new ApiException(400, "Missing required parameter 'Variables' when calling WorkflowResourceApi->Update");

var localVarPath = $"/workflow/{workflow.WorkflowId}/variables";
var localVarPathParams = new Dictionary<String, String>();
var localVarQueryParams = new List<KeyValuePair<String, String>>();
var localVarHeaderParams = new Dictionary<String, String>(this.Configuration.DefaultHeader);
var localVarFormParams = new Dictionary<String, String>();
var localVarFileParams = new Dictionary<String, FileParameter>();
Object localVarPostBody = null;

// to determine the Content-Type header
String[] localVarHttpContentTypes = new String[] {
"application/json"
};
String localVarHttpContentType = this.Configuration.ApiClient.SelectHeaderContentType(localVarHttpContentTypes);

// to determine the Accept header
String[] localVarHttpHeaderAccepts = new String[] {
"*/*"
};
String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts);
if (localVarHttpHeaderAccept != null)
localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept);

if (workflow != null && workflow.GetType() != typeof(byte[]))
{
localVarPostBody = this.Configuration.ApiClient.Serialize(workflow.Variables);
}

// authentication (api_key) required
if (!String.IsNullOrEmpty(this.Configuration.AccessToken))
{
localVarHeaderParams["X-Authorization"] = this.Configuration.AccessToken;
}

IRestResponse localVarResponse = (IRestResponse)this.Configuration.ApiClient.CallApi(localVarPath,
Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams,
localVarPathParams, localVarHttpContentType);
int localVarStatusCode = (int)localVarResponse.StatusCode;

if (ExceptionFactory != null)
{
Exception exception = ExceptionFactory("Update", localVarResponse);
if (exception != null) throw exception;
}

return new ApiResponse<Object>(localVarStatusCode,
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
(Object)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Object)));
}

/// <summary>
/// Gets the workflow by workflow id
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Conductor/Client/Extensions/ConductorTaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static TaskResult InProgress(this Task task, string log = null, long? cal
{
new TaskExecLog { TaskId = task.TaskId, Log = log, CreatedTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
},
callbackAfterSeconds: callbackAfterSeconds.Value
callbackAfterSeconds: callbackAfterSeconds
);
}

Expand Down
7 changes: 7 additions & 0 deletions Conductor/Definition/ConductorWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public ConductorWorkflow WithOutputParameter(string key, object value)
return this;
}

public ConductorWorkflow WithVariable(string key, object value)
{
if (Variables == null) // if workflow does not have any variables, initialize with empty collection
Variables = new Dictionary<string, object>();
Variables.Add(key, value);
return this;
}
public ConductorWorkflow WithOwner(string ownerEmail)
{
OwnerEmail = ownerEmail;
Expand Down
97 changes: 97 additions & 0 deletions Tests/Api/WorkflowResourceApiTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using conductor.csharp.Client.Extensions;
using Conductor.Api;
using Conductor.Client.Extensions;
using Conductor.Client.Models;
using Conductor.Definition;
using Conductor.Definition.TaskType;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Tests.Worker;
using Xunit;

namespace conductor_csharp.test.Api
{
public class WorkflowResourceApiTest
{
private const string WORKFLOW_NAME = "TestToCreateVariables";
private const string TASK_NAME = "TestToCreateVariables_Task";
private const string WORKFLOW_VARIABLE_1 = "TestVariable1";
private const string WORKFLOW_VARIABLE_2 = "TestVariable2";
private const string WORKFLOW_DESC = "Test Workflow With Variables";
private const int WORKFLOW_VERSION = 1;

private readonly WorkflowResourceApi _workflowClient;
private readonly ILogger _logger;

public WorkflowResourceApiTest()
{
_workflowClient = ApiExtensions.GetClient<WorkflowResourceApi>();
_logger = ApplicationLogging.CreateLogger<WorkerTests>();
}

[Fact]
public async void UpdateWorkflowVariables()
{
// Prepare workflow
var _workflow = GetConductorWorkflow();
ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(_workflow, true);
var workflowId = ApiExtensions.GetWorkflowExecutor().StartWorkflow(_workflow);
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20));
await ValidateWorkflowCompletion(workflowId);

// Create variables collection with values to be updated
var updateDict = new Dictionary<string, object> {
{WORKFLOW_VARIABLE_1,"Value1" },
{WORKFLOW_VARIABLE_2,"Value2" },
};
var updateVariableData = new Workflow() { WorkflowId = workflowId, Variables = updateDict };
// Update the work flow variables
_workflowClient.UpdateWorkflowVariables(updateVariableData);
// Fetch latest workflow data to validate the change in variables
var _updatedWorkFlow = _workflowClient.GetWorkflowStatusSummary(workflowId, includeVariables: true);
// Verify workflow variables data is equal with input passed
Assert.Equal(_updatedWorkFlow.Variables, updateDict);
}

private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout)
{
var host = WorkflowTaskHost.CreateWorkerHost(LogLevel.Information, new ClassWorker());
await host.StartAsync();
Thread.Sleep(workflowCompletionTimeout);
await host.StopAsync();
}

private ConductorWorkflow GetConductorWorkflow()
{
return new ConductorWorkflow()
.WithName(WORKFLOW_NAME)
.WithVersion(WORKFLOW_VERSION)
.WithDescription(WORKFLOW_DESC)
.WithTask(new SimpleTask(TASK_NAME, TASK_NAME))
.WithVariable(WORKFLOW_VARIABLE_1, $"{WORKFLOW_VARIABLE_1}_Value")
.WithVariable(WORKFLOW_VARIABLE_2, $"{WORKFLOW_VARIABLE_2}_Value");
}

private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params string[] workflowIdList)
{
var workflowStatusList = await WorkflowExtensions.GetWorkflowStatusList(
_workflowClient,
maxAllowedInParallel: 10,
workflowIdList
);
var incompleteWorkflowCounter = 0;
workflowStatusList.ToList().ForEach(wf =>
{
if (wf.Status.Value != WorkflowStatus.StatusEnum.COMPLETED)
{
incompleteWorkflowCounter += 1;
_logger.LogInformation($"Workflow not completed, workflowId: {wf.WorkflowId}");
}
});
Assert.Equal(0, incompleteWorkflowCounter);
}
}
}
4 changes: 2 additions & 2 deletions csharp-examples/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public async void StartTasks()
}
}

while (true)
Thread.Sleep(TimeSpan.FromDays(1)); // after 1 year will stop the service
while (true)
Thread.Sleep(TimeSpan.FromDays(1));// after 1 year will stop the service

}
catch (Exception e)
Expand Down
77 changes: 77 additions & 0 deletions csharp-examples/WorkFlowExamples.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using Conductor.Client;
using Conductor.Definition.TaskType;
using Conductor.Definition;
using Conductor.Executor;
using Conductor.Api;
using Conductor.Client.Authentication;

namespace csharp_examples
{
public class WorkFlowExamples
{

private const string KEY_ID = "<REPLACE_WITH_KEY_ID>";
private const string KEY_SECRET = "<REPLACE_WITH_KEY_SECRET>";
private const string OWNER_EMAIL = "<REPLACE_WITH_OWNER_EMAIL>";

private const string WORKFLOW_ID = "<REPLACE_WITH_WORKFLOW_ID>";
private const string WORKFLOW_NAME = "<REPLACE_WITH_WORKFLOW_NAME>";
private const string WORKFLOW_DESCRIPTION = "<REPLACE_WITH_WORKFLOW_DESCRIPTION>";
private const string TASK_NAME = "<REPLACE_WITH_TASK_NAME >";
private const string TASK_REFERENCE = "<REPLACE_WITH_TASK_REFERENCE_NAME>";

private const string VARIABLE_OLD_VALUE = "SOME_OLD_VALUE";
private const string VARIABLE_NAME_1 = "<REPLACE_WITH_VARIABLE_NAME_1>";
private const string VARIABLE_NEW_VALUE_1 = "<REPLACE_WITH_OWNER_VALUE_1>";
private const string VARIABLE_NAME_2 = "<REPLACE_WITH_VARIABLE_NAME_2>";
private const string VARIABLE_NEW_VALUE_2 = "<REPLACE_WITH_OWNER_VALUE_2>";


public void RegisterWorkFlow()
{
Configuration configuration = new Configuration()
{
AuthenticationSettings = new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET)
};

WorkflowExecutor executor = new WorkflowExecutor(configuration);
executor.RegisterWorkflow(GetConductorWorkflow(), true);
}

private ConductorWorkflow GetConductorWorkflow()
{
var conductorWorkFlow = new ConductorWorkflow()
.WithName(WORKFLOW_NAME).WithDescription(WORKFLOW_DESCRIPTION)
.WithTask(new SimpleTask(TASK_NAME, TASK_REFERENCE))
.WithOwner(OWNER_EMAIL);

var workflowVariableTobeAdded = new Dictionary<string, object>
{
{ VARIABLE_NAME_1, VARIABLE_OLD_VALUE},
{ VARIABLE_NAME_2, VARIABLE_OLD_VALUE }
};

conductorWorkFlow.Variables = workflowVariableTobeAdded;
return conductorWorkFlow;
}

public void UpdateWorkflowVariablesWithWorkFlowId()
{
var orkesApiClient = new OrkesApiClient(new Configuration(),
new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET));
var workflowClient = orkesApiClient.GetClient<WorkflowResourceApi>();
var workFlowVariables = new Dictionary<string, object>
{
{ VARIABLE_NAME_1, VARIABLE_NEW_VALUE_1 },
{ VARIABLE_NAME_2, VARIABLE_NEW_VALUE_2 }
};

workflowClient.UpdateWorkflowVariables(new Conductor.Client.Models.Workflow()
{
WorkflowId = WORKFLOW_ID,
Variables = workFlowVariables
});
}

}
}

0 comments on commit 399ed7c

Please sign in to comment.