Skip to content

Commit

Permalink
Changes to address the below
Browse files Browse the repository at this point in the history
1. update variables functionality,
2. Inprogress throws exception when default param value after callbackAfterSeconds
  • Loading branch information
Jithesh.Poojary authored and Jithesh.Poojary committed Jan 10, 2024
1 parent a09fdb2 commit 2ae6a33
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 25 deletions.
119 changes: 96 additions & 23 deletions Conductor/Api/WorkflowResourceApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,26 @@ public interface IWorkflowResourceApi : IApiAccessor
/// <returns>WorkflowRun</returns>
WorkflowRun ExecuteWorkflow(StartWorkflowRequest body, string requestId, string name, int? version, string waitUntilTaskRef = null);

/// <summary>
/// Execute a workflow synchronously
/// </summary>
/// <remarks>
///
/// </remarks>
/// <exception cref="Conductor.Client.ApiException">Thrown when fails to make API call</exception>
/// <param name="body"></param>
/// <param name="requestId"></param>
/// <param name="name"></param>
/// <param name="version"></param>
/// <param name="waitUntilTaskRef"> (optional)</param>
/// <returns>ApiResponse of WorkflowRun</returns>
ApiResponse<WorkflowRun> ExecuteWorkflowWithHttpInfo(StartWorkflowRequest body, string requestId, string name, int? version, string waitUntilTaskRef = null);
/// <summary>
/// Update the value of the workflow variables for the given workflow id
/// </summary>
/// <param name="workflow"></param>
/// <returns>ApiResponse of Object(void)</returns>
Object UpdateWorkflowVariables(Workflow workflow);
/// <summary>
/// Execute a workflow synchronously
/// </summary>
/// <remarks>
///
/// </remarks>
/// <exception cref="Conductor.Client.ApiException">Thrown when fails to make API call</exception>
/// <param name="body"></param>
/// <param name="requestId"></param>
/// <param name="name"></param>
/// <param name="version"></param>
/// <param name="waitUntilTaskRef"> (optional)</param>
/// <returns>ApiResponse of WorkflowRun</returns>
ApiResponse<WorkflowRun> ExecuteWorkflowWithHttpInfo(StartWorkflowRequest body, string requestId, string name, int? version, string waitUntilTaskRef = null);
/// <summary>
/// Gets the workflow by workflow id
/// </summary>
Expand Down Expand Up @@ -1029,15 +1035,82 @@ public ApiResponse<WorkflowRun> ExecuteWorkflowWithHttpInfo(StartWorkflowRequest
(WorkflowRun)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(WorkflowRun)));
}

/// <summary>
/// Gets the workflow by workflow id
/// </summary>
/// <exception cref="Conductor.Client.ApiException">Thrown when fails to make API call</exception>
/// <param name="workflowId"></param>
/// <param name="includeTasks"> (optional, default to true)</param>
/// <param name="summarize"> (optional, default to false)</param>
/// <returns>Workflow</returns>
public Workflow GetExecutionStatus(string workflowId, bool? includeTasks = null, bool? summarize = null)
public Object UpdateWorkflowVariables(Workflow workflow)
{
ApiResponse<Object> localVarResponse = UpdateWorkflowVariablesWithHttpInfo(workflow);
return localVarResponse.Data;
}

public ApiResponse<Object> UpdateWorkflowVariablesWithHttpInfo(Workflow workflow)
{
// verify the required parameter 'body' is set
if (workflow == null)
throw new ApiException(400, "Missing required parameter 'body' when calling WorkflowResourceApi->Update");

if (string.IsNullOrEmpty(workflow.WorkflowId))
throw new ApiException(400, "Missing required parameter 'WorkflowId' when calling WorkflowResourceApi->Update");

if (workflow.Variables == null)
throw new ApiException(400, "Missing required parameter 'Variables' when calling WorkflowResourceApi->Update");

var localVarPath = $"/workflow/{workflow.WorkflowId}/variables";
var localVarPathParams = new Dictionary<String, String>();
var localVarQueryParams = new List<KeyValuePair<String, String>>();
var localVarHeaderParams = new Dictionary<String, String>(this.Configuration.DefaultHeader);
var localVarFormParams = new Dictionary<String, String>();
var localVarFileParams = new Dictionary<String, FileParameter>();
Object localVarPostBody = null;

// to determine the Content-Type header
String[] localVarHttpContentTypes = new String[] {
"application/json"
};
String localVarHttpContentType = this.Configuration.ApiClient.SelectHeaderContentType(localVarHttpContentTypes);

// to determine the Accept header
String[] localVarHttpHeaderAccepts = new String[] {
"*/*"
};
String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts);
if (localVarHttpHeaderAccept != null)
localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept);

if (workflow != null && workflow.GetType() != typeof(byte[]))
{
localVarPostBody = this.Configuration.ApiClient.Serialize(workflow.Variables);
}

// authentication (api_key) required
if (!String.IsNullOrEmpty(this.Configuration.AccessToken))
{
localVarHeaderParams["X-Authorization"] = this.Configuration.AccessToken;
}

IRestResponse localVarResponse = (IRestResponse)this.Configuration.ApiClient.CallApi(localVarPath,
Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams,
localVarPathParams, localVarHttpContentType);
int localVarStatusCode = (int)localVarResponse.StatusCode;

if (ExceptionFactory != null)
{
Exception exception = ExceptionFactory("Update", localVarResponse);
if (exception != null) throw exception;
}

return new ApiResponse<Object>(localVarStatusCode,
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
(Object)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Object)));
}

/// <summary>
/// Gets the workflow by workflow id
/// </summary>
/// <exception cref="Conductor.Client.ApiException">Thrown when fails to make API call</exception>
/// <param name="workflowId"></param>
/// <param name="includeTasks"> (optional, default to true)</param>
/// <param name="summarize"> (optional, default to false)</param>
/// <returns>Workflow</returns>
public Workflow GetExecutionStatus(string workflowId, bool? includeTasks = null, bool? summarize = null)
{
ApiResponse<Workflow> localVarResponse = GetExecutionStatusWithHttpInfo(workflowId, includeTasks, summarize);
return localVarResponse.Data;
Expand Down
2 changes: 1 addition & 1 deletion Conductor/Client/Extensions/ConductorTaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static TaskResult InProgress(this Task task, string log = null, long? cal
{
new TaskExecLog { TaskId = task.TaskId, Log = log, CreatedTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
},
callbackAfterSeconds: callbackAfterSeconds.Value
callbackAfterSeconds: callbackAfterSeconds
);
}

Expand Down
9 changes: 8 additions & 1 deletion Conductor/Definition/ConductorWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ public ConductorWorkflow WithOutputParameter(string key, object value)
return this;
}

public ConductorWorkflow WithOwner(string ownerEmail)
public ConductorWorkflow WithVariable(string key, object value)
{
if (Variables == null) // if workflow does not have any variables, initialize with empty collection
Variables = new Dictionary<string, object>();
Variables.Add(key, value);
return this;
}
public ConductorWorkflow WithOwner(string ownerEmail)
{
OwnerEmail = ownerEmail;
return this;
Expand Down
97 changes: 97 additions & 0 deletions Tests/Api/WorkflowResourceApiTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using conductor.csharp.Client.Extensions;
using Conductor.Api;
using Conductor.Client.Extensions;
using Conductor.Client.Models;
using Conductor.Definition;
using Conductor.Definition.TaskType;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Tests.Worker;
using Xunit;

namespace conductor_csharp.test.Api
{
public class WorkflowResourceApiTest
{
private const string WORKFLOW_NAME = "TestToCreateVariables";
private const string TASK_NAME = "TestToCreateVariables_Task";
private const string WORKFLOW_VARIABLE_1 = "TestVariable1";
private const string WORKFLOW_VARIABLE_2 = "TestVariable2";
private const string WORKFLOW_DESC = "Test Workflow With Variables";
private const int WORKFLOW_VERSION = 1;

private readonly WorkflowResourceApi _workflowClient;
private readonly ILogger _logger;

public WorkflowResourceApiTest()
{
_workflowClient = ApiExtensions.GetClient<WorkflowResourceApi>();
_logger = ApplicationLogging.CreateLogger<WorkerTests>();
}

[Fact]
public async void UpdateWorkflowVariables()
{
// Prepare workflow
var _workflow = GetConductorWorkflow();
ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(_workflow, true);
var workflowId = ApiExtensions.GetWorkflowExecutor().StartWorkflow(_workflow);
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20));
await ValidateWorkflowCompletion(workflowId);

// Create variables collection with values to be updated
var updateDict = new Dictionary<string, object> {
{WORKFLOW_VARIABLE_1,"Value1" },
{WORKFLOW_VARIABLE_2,"Value2" },
};
var updateVariableData = new Workflow() { WorkflowId = workflowId, Variables = updateDict };
// Update the work flow variables
_workflowClient.UpdateWorkflowVariables(updateVariableData);
// Fetch latest workflow data to validate the change in variables
var _updatedWorkFlow = _workflowClient.GetWorkflowStatusSummary(workflowId, includeVariables: true);
// Verify workflow variables data is equal with input passed
Assert.Equal(_updatedWorkFlow.Variables, updateDict);
}

private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout)
{
var host = WorkflowTaskHost.CreateWorkerHost(LogLevel.Information, new ClassWorker());
await host.StartAsync();
Thread.Sleep(workflowCompletionTimeout);
await host.StopAsync();
}

private ConductorWorkflow GetConductorWorkflow()
{
return new ConductorWorkflow()
.WithName(WORKFLOW_NAME)
.WithVersion(WORKFLOW_VERSION)
.WithDescription(WORKFLOW_DESC)
.WithTask(new SimpleTask(TASK_NAME, TASK_NAME))
.WithVariable(WORKFLOW_VARIABLE_1, $"{WORKFLOW_VARIABLE_1}_Value")
.WithVariable(WORKFLOW_VARIABLE_2, $"{WORKFLOW_VARIABLE_2}_Value");
}

private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params string[] workflowIdList)
{
var workflowStatusList = await WorkflowExtensions.GetWorkflowStatusList(
_workflowClient,
maxAllowedInParallel: 10,
workflowIdList
);
var incompleteWorkflowCounter = 0;
workflowStatusList.ToList().ForEach(wf =>
{
if (wf.Status.Value != WorkflowStatus.StatusEnum.COMPLETED)
{
incompleteWorkflowCounter += 1;
_logger.LogInformation($"Workflow not completed, workflowId: {wf.WorkflowId}");
}
});
Assert.Equal(0, incompleteWorkflowCounter);
}
}
}
77 changes: 77 additions & 0 deletions csharp-examples/WorkFlowExamples.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using Conductor.Client;
using Conductor.Definition.TaskType;
using Conductor.Definition;
using Conductor.Executor;
using Conductor.Api;
using Conductor.Client.Authentication;

namespace csharp_examples
{
public class WorkFlowExamples
{

private const string KEY_ID = "<REPLACE_WITH_KEY_ID>";
private const string KEY_SECRET = "<REPLACE_WITH_KEY_SECRET>";
private const string OWNER_EMAIL = "<REPLACE_WITH_OWNER_EMAIL>";

private const string WORKFLOW_ID = "<REPLACE_WITH_WORKFLOW_ID>";
private const string WORKFLOW_NAME = "<REPLACE_WITH_WORKFLOW_NAME>";
private const string WORKFLOW_DESCRIPTION = "<REPLACE_WITH_WORKFLOW_DESCRIPTION>";
private const string TASK_NAME = "<REPLACE_WITH_TASK_NAME >";
private const string TASK_REFERENCE = "<REPLACE_WITH_TASK_REFERENCE_NAME>";

private const string VARIABLE_OLD_VALUE = "SOME_OLD_VALUE";
private const string VARIABLE_NAME_1 = "<REPLACE_WITH_VARIABLE_NAME_1>";
private const string VARIABLE_NEW_VALUE_1 = "<REPLACE_WITH_OWNER_VALUE_1>";
private const string VARIABLE_NAME_2 = "<REPLACE_WITH_VARIABLE_NAME_2>";
private const string VARIABLE_NEW_VALUE_2 = "<REPLACE_WITH_OWNER_VALUE_2>";


public void RegisterWorkFlow()
{
Configuration configuration = new Configuration()
{
AuthenticationSettings = new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET)
};

WorkflowExecutor executor = new WorkflowExecutor(configuration);
executor.RegisterWorkflow(GetConductorWorkflow(), true);
}

private ConductorWorkflow GetConductorWorkflow()
{
var conductorWorkFlow = new ConductorWorkflow()
.WithName(WORKFLOW_NAME).WithDescription(WORKFLOW_DESCRIPTION)
.WithTask(new SimpleTask(TASK_NAME, TASK_REFERENCE))
.WithOwner(OWNER_EMAIL);

var workflowVariableTobeAdded = new Dictionary<string, object>
{
{ VARIABLE_NAME_1, VARIABLE_OLD_VALUE},
{ VARIABLE_NAME_2, VARIABLE_OLD_VALUE }
};

conductorWorkFlow.Variables = workflowVariableTobeAdded;
return conductorWorkFlow;
}

public void UpdateWorkflowVariablesWithWorkFlowId()
{
var orkesApiClient = new OrkesApiClient(new Configuration(),
new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET));
var workflowClient = orkesApiClient.GetClient<WorkflowResourceApi>();
var workFlowVariables = new Dictionary<string, object>
{
{ VARIABLE_NAME_1, VARIABLE_NEW_VALUE_1 },
{ VARIABLE_NAME_2, VARIABLE_NEW_VALUE_2 }
};

workflowClient.UpdateWorkflowVariables(new Conductor.Client.Models.Workflow()
{
WorkflowId = WORKFLOW_ID,
Variables = workFlowVariables
});
}

}
}

0 comments on commit 2ae6a33

Please sign in to comment.