Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR - Improve Data Quality Results List from Search #18014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1ec2981
fix import issue
chirag-madlani Sep 16, 2024
178ea6a
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 17, 2024
044fd6b
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 17, 2024
c5d2bd5
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 18, 2024
24435e7
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 18, 2024
d8b83f8
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 19, 2024
ed4d355
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 20, 2024
d6f01a1
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 23, 2024
57d7c7f
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 24, 2024
4e5943d
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 24, 2024
ae13863
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 25, 2024
fa71f32
Merge remote-tracking branch 'upstream/main'
TeddyCr Sep 25, 2024
ab2e1e4
feat: added dimension and type field in DQ Result search listing
TeddyCr Sep 26, 2024
d96f321
Merge remote-tracking branch 'upstream/main' into MINOR-improve-dq-re…
TeddyCr Sep 26, 2024
52fe1c3
fix: remove code duplication
TeddyCr Sep 26, 2024
16de5b0
style: ran java linting
TeddyCr Sep 26, 2024
59d65c7
Merge remote-tracking branch 'upstream/main' into MINOR-improve-dq-re…
TeddyCr Sep 26, 2024
9c2c3cf
chore: clean up unecessary elements
TeddyCr Sep 26, 2024
bec7468
style: ran java linting
TeddyCr Sep 26, 2024
a0a4c9f
fix: remove unecessary conditional statement + remove comments
TeddyCr Sep 26, 2024
1270071
Merge remote-tracking branch 'upstream/main' into MINOR-improve-dq-re…
TeddyCr Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.getEntityFields;

import java.beans.IntrospectionException;
import java.io.IOException;
Expand All @@ -11,9 +12,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonPatch;
Expand Down Expand Up @@ -46,6 +47,7 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
protected final String entityType;
protected final Class<T> entityClass;
protected final CollectionDAO daoCollection;
protected final Set<String> allowedFields;

public EntityTimeSeriesRepository(
String collectionPath,
Expand All @@ -58,34 +60,43 @@ public EntityTimeSeriesRepository(
this.entityType = entityType;
this.searchRepository = Entity.getSearchRepository();
this.daoCollection = Entity.getCollectionDAO();
this.allowedFields = getEntityFields(entityClass);
Entity.registerEntity(entityClass, entityType, this);
}

@Transaction
public T createNewRecord(T recordEntity, String extension, String recordFQN) {
recordEntity.setId(UUID.randomUUID());
storeInternal(recordEntity, recordFQN, extension);
storeRelationshipInternal(recordEntity);
postCreate(recordEntity);
return recordEntity;
public T createNewRecord(T recordEntity, String recordFQN) {
return createNewRecord(recordEntity, null, recordFQN);
}

public T createNewRecord(T recordEntity, String recordFQN) {
@Transaction
public T createNewRecord(T recordEntity, String extension, String recordFQN) {
recordEntity.setId(UUID.randomUUID());
storeInternal(recordEntity, recordFQN);
storeInternal(recordEntity, recordFQN, extension);
storeRelationshipInternal(recordEntity);
postCreate(recordEntity);
return recordEntity;
}

@Transaction
protected void storeInternal(T recordEntity, String recordFQN) {
timeSeriesDao.insert(recordFQN, entityType, JsonUtils.pojoToJson(recordEntity));
storeInternal(recordEntity, recordFQN, null);
}

@Transaction
protected void storeInternal(T recordEntity, String recordFQN, String extension) {
timeSeriesDao.insert(recordFQN, extension, entityType, JsonUtils.pojoToJson(recordEntity));
if (extension != null) {
timeSeriesDao.insert(recordFQN, extension, entityType, JsonUtils.pojoToJson(recordEntity));
} else {
timeSeriesDao.insert(recordFQN, entityType, JsonUtils.pojoToJson(recordEntity));
}
}

public final EntityUtil.Fields getFields(String fields) {
if ("*".equals(fields)) {
return new EntityUtil.Fields(allowedFields, String.join(",", allowedFields));
}
return new EntityUtil.Fields(allowedFields, fields);
}

protected void storeRelationshipInternal(T recordEntity) {
Expand Down Expand Up @@ -124,7 +135,6 @@ protected void setUpdatedFields(T updated, String user) {
protected void validatePatchFields(T updated, T original) {
// Nothing to do in the default implementation
}
;

@Transaction
public final void addRelationship(
Expand Down Expand Up @@ -214,37 +224,36 @@ public ResultList<T> listWithOffset(
boolean latest,
boolean skipErrors) {
int total = timeSeriesDao.listCount(filter, startTs, endTs, latest);
List<T> entityList = new ArrayList<>();
List<EntityError> errors = null;
int offsetInt = getOffset(offset);
String afterOffset = getAfterOffset(offsetInt, limitParam, total);
String beforeOffset = getBeforeOffset(offsetInt, limitParam);

if (limitParam > 0) {
List<String> jsons =
timeSeriesDao.listWithOffset(filter, limitParam, offsetInt, startTs, endTs, latest);
Map<String, List<?>> entityListMap = getEntityList(jsons, skipErrors);
entityList = (List<T>) entityListMap.get("entityList");
if (skipErrors) {
errors = (List<EntityError>) entityListMap.get("errors");
}
return getResultList(entityList, beforeOffset, afterOffset, total, errors);
} else {
return getResultList(entityList, null, null, total);
}
return listWithOffsetInternal(
offset, filter, limitParam, startTs, endTs, latest, skipErrors, total);
}

public ResultList<T> listWithOffset(
String offset, ListFilter filter, int limitParam, boolean skipErrors) {
int total = timeSeriesDao.listCount(filter);
return listWithOffsetInternal(offset, filter, limitParam, null, null, false, skipErrors, total);
}

private ResultList<T> listWithOffsetInternal(
String offset,
ListFilter filter,
int limitParam,
Long startTs,
Long endTs,
boolean latest,
boolean skipErrors,
int total) {
List<T> entityList = new ArrayList<>();
List<EntityError> errors = null;

int offsetInt = getOffset(offset);
String afterOffset = getAfterOffset(offsetInt, limitParam, total);
String beforeOffset = getBeforeOffset(offsetInt, limitParam);
if (limitParam > 0) {
List<String> jsons = timeSeriesDao.listWithOffset(filter, limitParam, offsetInt);
List<String> jsons =
(startTs != null && endTs != null)
? timeSeriesDao.listWithOffset(filter, limitParam, offsetInt, startTs, endTs, latest)
: timeSeriesDao.listWithOffset(filter, limitParam, offsetInt);
Map<String, List<?>> entityListMap = getEntityList(jsons, skipErrors);
entityList = (List<T>) entityListMap.get("entityList");
if (skipErrors) {
Expand Down Expand Up @@ -397,21 +406,40 @@ public ResultList<T> listLatestFromSearch(
List<T> entityList = new ArrayList<>();
setIncludeSearchFields(searchListFilter);
setExcludeSearchFields(searchListFilter);
String aggregationPath = "$.sterms#byTerms.buckets";
String aggregationStr =
"{\"aggregations\": {\"byTerms\": {\"terms\": {\"field\": \"%s\", \"size\":100},\"aggs\": {\"latest\": "
+ "{\"top_hits\": {\"size\": 1, \"sort_field\":\"timestamp\",\"sort_order\":\"desc\"}}}}}}";
"{\"aggregations\":{\"byTerms\":{\"terms\": {\"field\":\"%s\",\"size\":100},\"aggs\":{\"latest\":"
+ "{\"top_hits\":{\"size\":1,\"sort_field\":\"timestamp\",\"sort_order\":\"desc\"}}}}}}";
aggregationStr = String.format(aggregationStr, groupBy);
JsonObject aggregation = JsonUtils.readJson(aggregationStr).asJsonObject();
JsonObject jsonObjResults =
searchRepository.aggregate(q, entityType, aggregation, searchListFilter);
List<JsonObject> jsonTestCaseResults = parseListLatestAggregation(jsonObjResults);

for (JsonObject json : jsonTestCaseResults) {
T entity = setFieldsInternal(JsonUtils.readOrConvertValue(json, entityClass), fields);
setInheritedFields(entity);
clearFieldsInternal(entity, fields);
entityList.add(entity);
}
Optional<List> jsonObjects =
JsonUtils.readJsonAtPath(jsonObjResults.toString(), aggregationPath, List.class);
jsonObjects.ifPresent(
jsonObjectList -> {
for (Map<String, String> json : (List<Map<String, String>>) jsonObjectList) {
String bucketAggregationPath = "top_hits#latest.hits.hits";
Optional<List> hits =
JsonUtils.readJsonAtPath(
JsonUtils.pojoToJson(json), bucketAggregationPath, List.class);
hits.ifPresent(
hitList -> {
for (Map<String, String> hit : (List<Map<String, String>>) hitList) {
JsonObject source = getSourceDocument(JsonUtils.pojoToJson(hit));
T entity =
setFieldsInternal(
JsonUtils.readOrConvertValue(source, entityClass), fields);
if (entity != null) {
setInheritedFields(entity);
clearFieldsInternal(entity, fields);
entityList.add(entity);
}
}
});
}
});
return new ResultList<>(entityList, null, null, entityList.size());
}

Expand Down Expand Up @@ -447,48 +475,27 @@ protected List<String> getExcludeSearchFields() {
return new ArrayList<>();
}

private List<JsonObject> parseListLatestAggregation(JsonObject jsonObjResults) {
JsonObject jsonByTerms = jsonObjResults.getJsonObject("sterms#byTerms");
List<JsonObject> jsonTestCaseResults = new ArrayList<>();
private JsonObject getSourceDocument(String hit) {
List<String> includeSearchFields = getIncludeSearchFields();
List<String> excludeSearchFields = getExcludeSearchFields();
Optional.ofNullable(jsonByTerms)
.map(jbt -> jbt.getJsonArray("buckets"))
.ifPresent(
termsBucket -> {
for (JsonValue bucket : termsBucket) {
JsonObject hitsBucket = bucket.asJsonObject().getJsonObject("top_hits#latest");
if (hitsBucket != null) {
JsonObject hitsTwo = hitsBucket.getJsonObject("hits");
if (hitsTwo != null) {
JsonArray hits = hitsTwo.getJsonArray("hits");
if (hits != null) {
for (JsonValue hit : hits) {
JsonObject source = hit.asJsonObject().getJsonObject("_source");
// Aggregation results will return all fields by default, so we need to
// filter out the fields
// that are not included in the search fields
if (source != null
&& (!CommonUtil.nullOrEmpty(includeSearchFields)
|| !CommonUtil.nullOrEmpty(excludeSearchFields))) {
JsonObjectBuilder sourceCopy = Json.createObjectBuilder();
for (Map.Entry<String, JsonValue> entry : source.entrySet()) {
if (includeSearchFields.contains(entry.getKey())
|| (CommonUtil.nullOrEmpty(includeSearchFields)
&& !excludeSearchFields.contains(entry.getKey()))) {
sourceCopy.add(entry.getKey(), entry.getValue());
}
}
jsonTestCaseResults.add(sourceCopy.build());
} else {
if (source != null) jsonTestCaseResults.add(source);
}
}
}
}
}
}
});
return jsonTestCaseResults;
JsonObject hitJson = JsonUtils.readJson(hit).asJsonObject();
JsonObject source = hitJson.asJsonObject().getJsonObject("_source");
// Aggregation results will return all fields by default,
// so we need to filter out the fields that are not included
// in the search fields
if (source != null
&& (!CommonUtil.nullOrEmpty(includeSearchFields)
|| !CommonUtil.nullOrEmpty(excludeSearchFields))) {
JsonObjectBuilder sourceCopy = Json.createObjectBuilder();
for (Map.Entry<String, JsonValue> entry : source.entrySet()) {
if (includeSearchFields.contains(entry.getKey())
|| (CommonUtil.nullOrEmpty(includeSearchFields)
&& !excludeSearchFields.contains(entry.getKey()))) {
sourceCopy.add(entry.getKey(), entry.getValue());
}
}
return sourceCopy.build();
}
return source;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ private void validateStatus(

@Override
@Transaction
public void storeInternal(TestCaseResolutionStatus recordEntity, String recordFQN) {
public void storeInternal(
TestCaseResolutionStatus recordEntity, String recordFQN, String extension) {

TestCaseResolutionStatus lastIncident = getLatestRecord(recordFQN);

Expand Down Expand Up @@ -212,7 +213,7 @@ public void storeInternal(TestCaseResolutionStatus recordEntity, String recordFQ
}
EntityReference testCaseReference = recordEntity.getTestCaseReference();
recordEntity.withTestCaseReference(null); // we don't want to store the reference in the record
super.storeInternal(recordEntity, recordFQN);
timeSeriesDao.insert(recordFQN, entityType, JsonUtils.pojoToJson(recordEntity));
recordEntity.withTestCaseReference(testCaseReference);
}

Expand Down Expand Up @@ -302,7 +303,10 @@ private void resolveTask(
EntityReference testCaseReference = newIncidentStatus.getTestCaseReference();
newIncidentStatus.setTestCaseReference(
null); // we don't want to store the reference in the record
super.storeInternal(newIncidentStatus, testCase.getFullyQualifiedName());
timeSeriesDao.insert(
testCaseReference.getFullyQualifiedName(),
entityType,
JsonUtils.pojoToJson(newIncidentStatus));
newIncidentStatus.setTestCaseReference(testCaseReference);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,41 @@ public ResultList<TestCaseResult> listTestCaseResultsFromSearch(
@Parameter(
description =
"Get the latest test case result for each test case -- requires `testSuiteId`. Offset and limit are ignored",
schema = @Schema(type = "boolean", example = "true|false"))
schema =
@Schema(
type = "boolean",
example = "false",
allowableValues = {"true", "false"}))
@QueryParam("latest")
@DefaultValue("false")
String latest,
@Parameter(
description = "Filter for test case result by type (e.g. column, table, all)",
schema =
@Schema(
type = "string",
example = "all",
allowableValues = {"column", "table", "all"}))
@QueryParam("testCaseType")
@DefaultValue("all")
String type,
@Parameter(
description =
"Filter for test case by data quality dimension (e.g. OpenMetadata, dbt, etc.)",
schema =
@Schema(
type = "string",
allowableValues = {
"Completeness",
"Accuracy",
"Consistency",
"Validity",
"Uniqueness",
"Integrity",
"SQL"
}))
@QueryParam("dataQualityDimension")
String dataQualityDimension,
@Parameter(
description = "search query term to use in list",
schema = @Schema(type = "string"))
Expand All @@ -235,7 +266,7 @@ public ResultList<TestCaseResult> listTestCaseResultsFromSearch(
if (latest.equals("true") && testSuiteId == null) {
throw new IllegalArgumentException("latest=true requires testSuiteId");
}
EntityUtil.Fields fields = new EntityUtil.Fields(Set.of(""), fieldParams);
EntityUtil.Fields fields = repository.getFields(fieldParams);
SearchListFilter searchListFilter = new SearchListFilter();
Optional.ofNullable(startTimestamp)
.ifPresent(ts -> searchListFilter.addQueryParam("startTimestamp", ts.toString()));
Expand All @@ -247,6 +278,9 @@ public ResultList<TestCaseResult> listTestCaseResultsFromSearch(
.ifPresent(tcf -> searchListFilter.addQueryParam("testCaseFQN", tcf));
Optional.ofNullable(testSuiteId)
.ifPresent(tsi -> searchListFilter.addQueryParam("testSuiteId", tsi));
Optional.ofNullable(type).ifPresent(t -> searchListFilter.addQueryParam("testCaseType", t));
Optional.ofNullable(dataQualityDimension)
.ifPresent(dqd -> searchListFilter.addQueryParam("dataQualityDimension", dqd));

ResourceContextInterface resourceContextInterface = getResourceContext(testCaseFQN);
// Override OperationContext to change the entity to table
Expand All @@ -259,7 +293,7 @@ public ResultList<TestCaseResult> listTestCaseResultsFromSearch(
securityContext,
fields,
searchListFilter,
"testSuites.id",
"testCaseFQN.keyword",
q,
operationContext,
resourceContextInterface);
Expand Down
Loading
Loading