Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into MINOR-Logical-testS…
Browse files Browse the repository at this point in the history
…uite-agg-update
  • Loading branch information
TeddyCr committed Dec 9, 2024
2 parents d99c852 + 5263858 commit 781726b
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.openmetadata.csv.CsvUtil.fieldToExtensionStrings;
import static org.openmetadata.csv.CsvUtil.fieldToInternalArray;
import static org.openmetadata.csv.CsvUtil.recordToString;
import static org.openmetadata.service.events.ChangeEventHandler.copyChangeEvent;

import com.fasterxml.jackson.databind.JsonNode;
import com.networknt.schema.JsonSchema;
Expand Down Expand Up @@ -60,7 +61,9 @@
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.ApiStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TagLabel.TagSource;
Expand All @@ -72,7 +75,9 @@
import org.openmetadata.schema.type.customProperties.TableConfig;
import org.openmetadata.service.Entity;
import org.openmetadata.service.TypeRegistry;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil.PutResponse;
Expand Down Expand Up @@ -721,6 +726,9 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
repository.prepareInternal(entity, false);
PutResponse<T> response = repository.createOrUpdate(null, entity);
responseStatus = response.getStatus();
AsyncService.getInstance()
.getExecutorService()
.submit(() -> createChangeEventAndUpdateInES(response, importedBy));
} catch (Exception ex) {
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
importResult.setStatus(ApiStatus.FAILURE);
Expand All @@ -744,6 +752,20 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
}
}

private void createChangeEventAndUpdateInES(PutResponse<T> response, String importedBy) {
if (!response.getChangeType().equals(EventType.ENTITY_NO_CHANGE)) {
ChangeEvent changeEvent =
FormatterUtil.createChangeEventForEntity(
importedBy, response.getChangeType(), response.getEntity());
Object entity = changeEvent.getEntity();
changeEvent = copyChangeEvent(changeEvent);
changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entity));
// Change Event and Update in Es
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
Entity.getSearchRepository().updateEntity(response.getEntity().getEntityReference());
}
}

@Transaction
protected void createUserEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T entity)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Void process(
return null;
}

private static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) {
public static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) {
return new ChangeEvent()
.withId(changeEvent.getId())
.withEventType(changeEvent.getEventType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private static ChangeEvent extractChangeEvent(
return null;
}

private static ChangeEvent createChangeEventForEntity(
public static ChangeEvent createChangeEventForEntity(
String updateBy, EventType eventType, EntityInterface entityInterface) {
return getChangeEvent(
updateBy, eventType, entityInterface.getEntityReference().getType(), entityInterface)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

import java.util.UUID;
Expand All @@ -9,21 +8,12 @@
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;

@Slf4j
public class MainWorkflowTerminationListener implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) {
try {
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);

UUID workflowInstanceStateId = (UUID) execution.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
workflowInstanceStateRepository.updateStage(
workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables());

WorkflowInstanceRepository workflowInstanceRepository =
(WorkflowInstanceRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ public CheckEntityAttributesTask(CheckEntityAttributesTaskDefinition nodeDefinit
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), checkEntityAttributes.getId()));
subProcess.addFlowElement(new SequenceFlow(checkEntityAttributes.getId(), endEvent.getId()));

attachWorkflowInstanceStageListeners(subProcess);

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public SetEntityCertificationTask(SetEntityCertificationTaskDefinition nodeDefin
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), setEntityCertification.getId()));
subProcess.addFlowElement(new SequenceFlow(setEntityCertification.getId(), endEvent.getId()));

attachWorkflowInstanceStageListeners(subProcess);

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public SetGlossaryTermStatusTask(SetGlossaryTermStatusTaskDefinition nodeDefinit
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), setGlossaryTermStatus.getId()));
subProcess.addFlowElement(new SequenceFlow(setGlossaryTermStatus.getId(), endEvent.getId()));

attachWorkflowInstanceStageListeners(subProcess);

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ public class EndEvent implements NodeInterface {

public EndEvent(String id) {
this.endEvent = new EndEventBuilder().id(id).build();
attachWorkflowInstanceStageListeners(endEvent);
}

public EndEvent(EndEventDefinition nodeDefinition) {
this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build();
attachWorkflowInstanceStageListeners(endEvent);
}

public void addToWorkflow(BpmnModel model, Process process) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class StartEvent implements NodeInterface {
public StartEvent(StartEventDefinition nodeDefinition) {
this.startEvent = new StartEventBuilder().id(nodeDefinition.getName()).build();
attachWorkflowInstanceExecutionIdSetterListener(startEvent);
attachWorkflowInstanceStageListeners(startEvent);
}

public void addToWorkflow(BpmnModel model, Process process) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public UserApprovalTask(UserApprovalTaskDefinition nodeDefinition) {
subProcess.addFlowElement(new SequenceFlow(userTask.getId(), endEvent.getId()));
subProcess.addFlowElement(new SequenceFlow(terminationEvent.getId(), terminatedEvent.getId()));

attachWorkflowInstanceStageListeners(subProcess);

this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

Expand All @@ -27,7 +26,6 @@
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
import org.openmetadata.service.resources.feeds.FeedResource;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.WebsocketNotificationHandler;
Expand All @@ -45,13 +43,6 @@ public void notify(DelegateTask delegateTask) {

Thread task = createApprovalTask(entity, assignees);
WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId());

UUID workflowInstanceStateId =
(UUID) delegateTask.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
workflowInstanceStateRepository.updateStageWithTask(task.getId(), workflowInstanceStateId);
} catch (Exception exc) {
LOG.error(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.openmetadata.schema.governance.workflows.WorkflowInstance;
import org.openmetadata.service.Entity;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void updateWorkflowInstance(

workflowInstance.setEndedAt(endedAt);

if (variables.containsKey(EXCEPTION_VARIABLE)) {
if (Optional.ofNullable(variables.getOrDefault(EXCEPTION_VARIABLE, null)).isPresent()) {
workflowInstance.setException(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,16 @@ public SearchResultListMapper listWithDeepPagination(
new os.org.opensearch.action.search.SearchRequest(index).source(searchSourceBuilder),
RequestOptions.DEFAULT);
SearchHits searchHits = response.getHits();
SearchHit[] hits = searchHits.getHits();
Arrays.stream(hits).forEach(hit -> results.add(hit.getSourceAsMap()));
return new SearchResultListMapper(results, searchHits.getTotalHits().value);
List<SearchHit> hits = List.of(searchHits.getHits());
Object[] lastHitSortValues = null;

if (!hits.isEmpty()) {
lastHitSortValues = hits.get(hits.size() - 1).getSortValues();
}

hits.forEach(hit -> results.add(hit.getSourceAsMap()));
return new SearchResultListMapper(
results, searchHits.getTotalHits().value, lastHitSortValues);
} catch (OpenSearchStatusException e) {
if (e.status() == RestStatus.NOT_FOUND) {
throw new SearchIndexNotFoundException(String.format("Failed to to find index %s", index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static class PutResponse<T> {
@Getter private T entity;
private ChangeEvent changeEvent;
@Getter private final Response.Status status;
private final EventType changeType;
@Getter private final EventType changeType;

/**
* Response.Status.CREATED when PUT operation creates a new entity or Response.Status.OK when PUT operation updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,25 @@ void testGlossaryImportExport() throws IOException {
List<String> newRecords =
listOf(
",g3,dsp0,dsc0,h1;h2;h3,,term0;http://term0,PII.Sensitive,,,Approved,\"\"\"glossaryTermTableCol1Cp:row_1_col1_Value,,\"\";\"\"glossaryTermTableCol3Cp:row_1_col1_Value,row_1_col2_Value,row_1_col3_Value|row_2_col1_Value,row_2_col2_Value,row_2_col3_Value\"\"\"");
testImportExport(
glossary.getName(), GlossaryCsv.HEADERS, createRecords, updateRecords, newRecords);
Awaitility.await()
.atMost(Duration.ofMillis(120 * 1000L))
.pollInterval(Duration.ofMillis(2000L))
.ignoreExceptions()
.until(
() -> {
try {
testImportExport(
glossary.getName(),
GlossaryCsv.HEADERS,
createRecords,
updateRecords,
newRecords);
return true;
} catch (Exception e) {
// Return false to retry
return false;
}
});
}

@Test
Expand Down
Loading

0 comments on commit 781726b

Please sign in to comment.