Skip to content

Commit

Permalink
Add async apis for csv import (#18647)
Browse files Browse the repository at this point in the history
* Add async apis for import csv

* checkstyle fix

* fix checkstyle after merge from main

* fix auts

* fix backend tests

* ui: add async import support for glossary

* ui: improve the import page layout

* ui: add async import support for teams and users

* show banner at the top

* fix unit test

* update importteams page test

* minor change

---------

Co-authored-by: Sachin Chaurasiya <sachinchaurasiyachotey87@gmail.com>
  • Loading branch information
sonika-shah and Sachin-chaurasiya authored Nov 24, 2024
1 parent 4696b99 commit 0e41609
Show file tree
Hide file tree
Showing 26 changed files with 1,055 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.BulkAssetsOperationResponse;
import org.openmetadata.service.util.CSVExportResponse;
import org.openmetadata.service.util.CSVImportResponse;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.RestUtil;
Expand Down Expand Up @@ -465,6 +466,28 @@ public Response bulkRemoveFromAssetsAsync(
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public Response importCsvInternalAsync(
SecurityContext securityContext, String name, String csv, boolean dryRun) {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
CsvImportResult result = importCsvInternal(securityContext, name, csv, dryRun);
WebsocketNotificationHandler.sendCsvImportCompleteNotification(
jobId, securityContext, result);
} catch (Exception e) {
WebsocketNotificationHandler.sendCsvImportFailedNotification(
jobId, securityContext, e.getMessage());
}
});
CSVImportResponse response = new CSVImportResponse(jobId, "Import initiated successfully.");
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
}

public String exportCsvInternal(SecurityContext securityContext, String name) throws IOException {
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.VIEW_ALL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,41 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseAsync",
summary = "Import database schemas from CSV asynchronously",
description =
"Import database schemas from CSV to update database schemas asynchronously (no creation allowed).",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@PUT
@Path("/{id}/vote")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,40 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseSchemaAsync",
summary =
"Import tables from CSV to update database schema asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database schema", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@PUT
@Path("/{id}/vote")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,38 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Valid
@Operation(
operationId = "importTableAsync",
summary = "Import columns from CSV to update table asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import result",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the table", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@DELETE
@Path("/{id}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,36 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importGlossaryAsync",
summary = "Import glossary in CSV format asynchronously",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the glossary", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@RequestBody(description = "CSV data to import", required = true) String csv,
@Parameter(description = "Dry run the import", schema = @Schema(type = "boolean"))
@QueryParam("dryRun")
@DefaultValue("true")
boolean dryRun) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

private Glossary getGlossary(CreateGlossary create, String user) {
return getGlossary(repository, create, user);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,40 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importDatabaseServiceAsync",
summary =
"Import service from CSV to update database service asynchronously (no creation allowed)",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the Database Service", schema = @Schema(type = "string"))
@PathParam("name")
String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

@DELETE
@Path("/{id}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,37 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, name, csv, dryRun);
}

@PUT
@Path("/name/{name}/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
@Valid
@Operation(
operationId = "importTeamsAsync",
summary = "Import from CSV to create, and update teams asynchronously.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import initiated successfully",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@PathParam("name") String name,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, name, csv, dryRun);
}

private Team getTeam(CreateTeam ct, String user) {
if (ct.getTeamType().equals(TeamType.ORGANIZATION)) {
throw new IllegalArgumentException(CREATE_ORGANIZATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,41 @@ public CsvImportResult importCsv(
return importCsvInternal(securityContext, team, csv, dryRun);
}

@PUT
@Path("/importAsync")
@Consumes(MediaType.TEXT_PLAIN)
@Valid
@Operation(
operationId = "importTeamsAsync",
summary = "Import from CSV to create, and update teams asynchronously.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Import result",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CsvImportResult.class)))
})
public Response importCsvAsync(
@Context SecurityContext securityContext,
@Parameter(
description = "Name of the team to under which the users are imported to",
required = true,
schema = @Schema(type = "string"))
@QueryParam("team")
String team,
@Parameter(
description =
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
schema = @Schema(type = "boolean"))
@DefaultValue("true")
@QueryParam("dryRun")
boolean dryRun,
String csv) {
return importCsvInternalAsync(securityContext, team, csv, dryRun);
}

public void validateEmailAlreadyExists(String email) {
if (repository.checkEmailAlreadyExists(email)) {
throw new CustomExceptionMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WebSocketManager {
public static final String MENTION_CHANNEL = "mentionChannel";
public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel";
public static final String CSV_EXPORT_CHANNEL = "csvExportChannel";
public static final String CSV_IMPORT_CHANNEL = "csvImportChannel";

public static final String BULK_ASSETS_CHANNEL = "bulkAssetsChannel";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.openmetadata.service.util;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.openmetadata.schema.type.csv.CsvImportResult;

@NoArgsConstructor
public class CSVImportMessage {
@Getter @Setter private String jobId;
@Getter @Setter private String status;
@Getter @Setter private CsvImportResult result;
@Getter @Setter private String error;

public CSVImportMessage(String jobId, String status, CsvImportResult result, String error) {
this.jobId = jobId;
this.status = status;
this.result = result;
this.error = error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.openmetadata.service.util;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
public class CSVImportResponse {
@Getter @Setter private String jobId;
@Getter @Setter private String message;

public CSVImportResponse(String jobId, String message) {
this.jobId = jobId;
this.message = message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.openmetadata.schema.type.Post;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.api.BulkOperationResult;
import org.openmetadata.schema.type.csv.CsvImportResult;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.feeds.MessageParser;
Expand Down Expand Up @@ -194,4 +195,22 @@ private static UUID getUserIdFromSecurityContext(SecurityContext securityContext
User user = Entity.getCollectionDAO().userDAO().findEntityByName(username);
return user.getId();
}

public static void sendCsvImportCompleteNotification(
String jobId, SecurityContext securityContext, CsvImportResult result) {
CSVImportMessage message = new CSVImportMessage(jobId, "COMPLETED", result, null);
String jsonMessage = JsonUtils.pojoToJson(message);
UUID userId = getUserIdFromSecurityContext(securityContext);
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.CSV_IMPORT_CHANNEL, jsonMessage);
}

public static void sendCsvImportFailedNotification(
String jobId, SecurityContext securityContext, String errorMessage) {
CSVExportMessage message = new CSVExportMessage(jobId, "FAILED", null, errorMessage);
String jsonMessage = JsonUtils.pojoToJson(message);
UUID userId = getUserIdFromSecurityContext(securityContext);
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.CSV_IMPORT_CHANNEL, jsonMessage);
}
}
Loading

0 comments on commit 0e41609

Please sign in to comment.