Skip to content

Commit

Permalink
Merge branch 'main' into fix-dashboard-datamodel-not-updated
Browse files Browse the repository at this point in the history
  • Loading branch information
TeddyCr authored Nov 18, 2024
2 parents 432ced5 + 19497db commit 793f46e
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public final class CatalogExceptionMessage {
public static final String INVALID_BOT_USER = "Revoke Token can only be applied to Bot Users.";
public static final String NO_MANUAL_TRIGGER_ERR = "App does not support manual trigger.";
public static final String INVALID_APP_TYPE = "Application Type is not valid.";
public static final String CSV_EXPORT_FAILED = "CSV Export Failed.";

private CatalogExceptionMessage() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.UUID;
import javax.json.JsonPatch;
import javax.ws.rs.core.Response;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -258,19 +259,28 @@ public final String exportCsv(
return CsvUtil.formatCsv(csvFile);
}

@Getter
private static class ColumnMapping {
String fromChildFQN;
String toChildFQN;

ColumnMapping(String from, String to) {
this.fromChildFQN = from;
this.toChildFQN = to;
}
}

public final String exportCsvAsync(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
String entityType,
boolean deleted)
throws IOException {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);

boolean deleted) {
try {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
JsonNode rootNode = JsonUtils.readTree(jsonResponse);

Expand All @@ -284,11 +294,28 @@ public final String exportCsvAsync(
StringWriter csvContent = new StringWriter();
CSVWriter csvWriter = new CSVWriter(csvContent);
String[] headers = {
"fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain",
"toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain",
"fromChildEntityFQN", "toChildEntityFQN"
"fromEntityFQN",
"fromServiceName",
"fromServiceType",
"fromOwners",
"fromDomain",
"toEntityFQN",
"toServiceName",
"toServiceType",
"toOwners",
"toDomain",
"fromChildEntityFQN",
"toChildEntityFQN",
"pipelineName",
"pipelineType",
"pipelineDescription",
"pipelineOwners",
"pipelineDomain",
"pipelineServiceName",
"pipelineServiceType"
};
csvWriter.writeNext(headers);

JsonNode edges = rootNode.path("edges");
for (JsonNode edge : edges) {
String fromEntityId = edge.path("fromEntity").path("id").asText();
Expand All @@ -302,50 +329,59 @@ public final String exportCsvAsync(
baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name"));
baseRow.put("fromServiceType", getText(fromEntity, "serviceType"));
baseRow.put("fromOwners", getOwners(fromEntity.path("owners")));
baseRow.put("fromDomain", getText(fromEntity, "domain"));
baseRow.put("fromDomain", getDomainFQN(fromEntity.path("domain")));

baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName"));
baseRow.put("toServiceName", getText(toEntity.path("service"), "name"));
baseRow.put("toServiceType", getText(toEntity, "serviceType"));
baseRow.put("toOwners", getOwners(toEntity.path("owners")));
baseRow.put("toDomain", getText(toEntity, "domain"));

List<String> fromChildFQNs = new ArrayList<>();
List<String> toChildFQNs = new ArrayList<>();

extractChildEntities(fromEntity, fromChildFQNs);
extractChildEntities(toEntity, toChildFQNs);
baseRow.put("toDomain", getDomainFQN(toEntity.path("domain")));

JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) {
for (JsonNode columnMapping : columns) {
JsonNode fromColumns = columnMapping.path("fromColumns");
String toColumn = columnMapping.path("toColumn").asText();

for (JsonNode fromColumn : fromColumns) {
String fromChildFQN = fromColumn.asText();
String toChildFQN = toColumn;
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
List<ColumnMapping> explicitColumnMappings = extractColumnMappingsFromEdge(columns);
for (ColumnMapping mapping : explicitColumnMappings) {
writeCsvRow(
csvWriter,
baseRow,
mapping.getFromChildFQN(),
mapping.getToChildFQN(),
"",
"",
"",
"",
"",
"",
"");
LOG.debug(
"Exported explicit ColumnMapping: from='{}', to='{}'",
mapping.getFromChildFQN(),
mapping.getToChildFQN());
}
} else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) {
if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, "");
}
} else {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, "", toChildFQN);
}
}
} else {
writeCsvRow(csvWriter, baseRow, "", "");
}

JsonNode pipeline = edge.path("pipeline");
if (!pipeline.isMissingNode() && !pipeline.isNull()) {
String pipelineName = getText(pipeline, "name");
String pipelineType = getText(pipeline, "serviceType");
String pipelineDescription = getText(pipeline, "description");
String pipelineOwners = getOwners(pipeline.path("owners"));
String pipelineServiceName = getText(pipeline.path("service"), "name");
String pipelineServiceType = getText(pipeline, "serviceType");
String pipelineDomain = getDomainFQN(pipeline.path("domain"));
writeCsvRow(
csvWriter,
baseRow,
"",
"",
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType);
LOG.debug("Exported Pipeline Information: {}", pipelineName);
}
}
csvWriter.close();
Expand All @@ -356,20 +392,37 @@ public final String exportCsvAsync(
}

private static void writeCsvRow(
CSVWriter csvWriter, Map<String, String> baseRow, String fromChildFQN, String toChildFQN) {
CSVWriter csvWriter,
Map<String, String> baseRow,
String fromChildFQN,
String toChildFQN,
String pipelineName,
String pipelineType,
String pipelineDescription,
String pipelineOwners,
String pipelineDomain,
String pipelineServiceName,
String pipelineServiceType) {
String[] row = {
baseRow.get("fromEntityFQN"),
baseRow.get("fromServiceName"),
baseRow.get("fromServiceType"),
baseRow.get("fromOwners"),
baseRow.get("fromDomain"),
baseRow.get("toEntityFQN"),
baseRow.get("toServiceName"),
baseRow.get("toServiceType"),
baseRow.get("toOwners"),
baseRow.get("toDomain"),
baseRow.getOrDefault("fromEntityFQN", ""),
baseRow.getOrDefault("fromServiceName", ""),
baseRow.getOrDefault("fromServiceType", ""),
baseRow.getOrDefault("fromOwners", ""),
baseRow.getOrDefault("fromDomain", ""),
baseRow.getOrDefault("toEntityFQN", ""),
baseRow.getOrDefault("toServiceName", ""),
baseRow.getOrDefault("toServiceType", ""),
baseRow.getOrDefault("toOwners", ""),
baseRow.getOrDefault("toDomain", ""),
fromChildFQN,
toChildFQN
toChildFQN,
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType
};
csvWriter.writeNext(row);
}
Expand All @@ -386,7 +439,7 @@ private static String getOwners(JsonNode ownersNode) {
if (ownersNode != null && ownersNode.isArray()) {
List<String> ownersList = new ArrayList<>();
for (JsonNode owner : ownersNode) {
String ownerName = getText(owner, "name");
String ownerName = getText(owner, "displayName");
if (!ownerName.isEmpty()) {
ownersList.add(ownerName);
}
Expand All @@ -396,91 +449,32 @@ private static String getOwners(JsonNode ownersNode) {
return "";
}

private static void extractChildEntities(JsonNode entityNode, List<String> childFQNs) {
if (entityNode == null) {
return;
}
String entityType = getText(entityNode, "entityType");
switch (entityType) {
case TABLE:
extractColumns(entityNode.path("columns"), childFQNs);
break;
case DASHBOARD:
extractCharts(entityNode.path("charts"), childFQNs);
break;
case SEARCH_INDEX:
extractFields(entityNode.path("fields"), childFQNs);
break;
case CONTAINER:
extractContainers(entityNode.path("children"), childFQNs);
extractColumns(entityNode.path("dataModel").path("columns"), childFQNs);
break;
case TOPIC:
extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs);
break;
case DASHBOARD_DATA_MODEL:
extractColumns(entityNode.path("columns"), childFQNs);
break;
default:
break;
private static String getDomainFQN(JsonNode domainNode) {
if (domainNode != null && domainNode.has("fullyQualifiedName")) {
JsonNode fqnNode = domainNode.get("fullyQualifiedName");
return fqnNode.isNull() ? "" : fqnNode.asText();
}
return "";
}

private static void extractColumns(JsonNode columnsNode, List<String> childFQNs) {
private static List<ColumnMapping> extractColumnMappingsFromEdge(JsonNode columnsNode) {
List<ColumnMapping> mappings = new ArrayList<>();
if (columnsNode != null && columnsNode.isArray()) {
for (JsonNode column : columnsNode) {
if (column != null) {
String columnFQN = getText(column, "fullyQualifiedName");
childFQNs.add(columnFQN);
extractColumns(column.path("children"), childFQNs);
}
}
}
}

private static void extractCharts(JsonNode chartsNode, List<String> childFQNs) {
if (chartsNode != null && chartsNode.isArray()) {
for (JsonNode chart : chartsNode) {
String chartFQN = getText(chart, "fullyQualifiedName");
childFQNs.add(chartFQN);
}
}
}

private static void extractFields(JsonNode fieldsNode, List<String> childFQNs) {
if (fieldsNode != null && fieldsNode.isArray()) {
for (JsonNode field : fieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractFields(field.path("children"), childFQNs);
}
}
}
}

private static void extractContainers(JsonNode containersNode, List<String> childFQNs) {
if (containersNode != null && containersNode.isArray()) {
for (JsonNode container : containersNode) {
if (container != null) {
String containerFQN = getText(container, "fullyQualifiedName");
childFQNs.add(containerFQN);
extractContainers(container.path("children"), childFQNs);
}
}
}
}

private static void extractSchemaFields(JsonNode schemaFieldsNode, List<String> childFQNs) {
if (schemaFieldsNode != null && schemaFieldsNode.isArray()) {
for (JsonNode field : schemaFieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractSchemaFields(field.path("children"), childFQNs);
for (JsonNode columnMapping : columnsNode) {
JsonNode fromColumns = columnMapping.path("fromColumns");
String toColumn = columnMapping.path("toColumn").asText().trim();

if (fromColumns.isArray() && !toColumn.isEmpty()) {
for (JsonNode fromColumn : fromColumns) {
String fromChildFQN = fromColumn.asText().trim();
if (!fromChildFQN.isEmpty()) {
mappings.add(new ColumnMapping(fromChildFQN, toColumn));
}
}
}
}
}
return mappings;
}

private String getStringOrNull(HashMap map, String key) {
Expand Down
Loading

0 comments on commit 793f46e

Please sign in to comment.