From 4f9415fef648aef6ebd53bafca87e3053078c73e Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Mon, 13 May 2024 11:42:51 +0530 Subject: [PATCH] - Fix for reindex job not waiting for completion (#16210) * - Fix for reindex job not waiting for completion * - Review Comments --- .../service/exception/AppException.java | 21 +++++++++ .../decorators/MessageDecorator.java | 11 ++++- .../service/jdbi3/AppRepository.java | 13 ++++- .../service/jdbi3/CollectionDAO.java | 16 +++++++ .../migration/api/MigrationWorkflow.java | 9 +++- .../service/util/OpenMetadataOperations.java | 47 ++++++++++++++----- 6 files changed, 99 insertions(+), 18 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/exception/AppException.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/AppException.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/AppException.java new file mode 100644 index 000000000000..e352494366d8 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/AppException.java @@ -0,0 +1,21 @@ +package org.openmetadata.service.exception; + +import javax.ws.rs.core.Response; +import org.openmetadata.sdk.exception.WebServiceException; + +public class AppException extends WebServiceException { + public static final String APP_RUN_RECORD_NOT_FOUND = "No Available Application Run Records."; + private static final String ERROR_TYPE = "APP_ERROR"; + + public AppException(String message) { + super(Response.Status.INTERNAL_SERVER_ERROR, ERROR_TYPE, message); + } + + private AppException(Response.Status status, String message) { + super(status, ERROR_TYPE, message); + } + + public static AppException byMessage(Response.Status status, String errorMessage) { + return new AppException(status, errorMessage); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java index 19491c3fe9d5..fbf9330d4ee4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/formatter/decorators/MessageDecorator.java @@ -389,8 +389,17 @@ default String getThreadAssetsUrl( } } - private String getDateString(long epochTimestamp) { + static String getDateString(long epochTimestamp) { Instant instant = Instant.ofEpochSecond(epochTimestamp); + return getDateString(instant); + } + + static String getDateStringEpochMilli(long epochTimestamp) { + Instant instant = Instant.ofEpochMilli(epochTimestamp); + return getDateString(instant); + } + + private static String getDateString(Instant instant) { LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); // Format LocalDateTime to a specific date and time format diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java index 7789d3b25728..a80fd38680cb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java @@ -1,5 +1,6 @@ package org.openmetadata.service.jdbi3; +import static org.openmetadata.service.exception.AppException.APP_RUN_RECORD_NOT_FOUND; import static org.openmetadata.service.resources.teams.UserResource.getUser; import java.util.ArrayList; @@ -19,8 +20,8 @@ import org.openmetadata.schema.type.ProviderType; import org.openmetadata.schema.type.Relationship; import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.AppException; import org.openmetadata.service.exception.EntityNotFoundException; -import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.resources.apps.AppResource; import org.openmetadata.service.security.jwt.JWTTokenGenerator; import org.openmetadata.service.util.EntityUtil; @@ -211,7 +212,15 @@ protected void cleanup(App app) { public AppRunRecord getLatestAppRuns(UUID appId) { String json = daoCollection.appExtensionTimeSeriesDao().getLatestAppRun(appId); if (json == null) { - throw new UnhandledServerException("No Available Application Run Records."); + throw new AppException(APP_RUN_RECORD_NOT_FOUND); + } + return JsonUtils.readValue(json, AppRunRecord.class); + } + + public AppRunRecord getLatestAppRunsAfterStartTime(UUID appId, long startTime) { + String json = daoCollection.appExtensionTimeSeriesDao().getLatestAppRun(appId, startTime); + if (json == null) { + throw new AppException(APP_RUN_RECORD_NOT_FOUND); } return JsonUtils.readValue(json, AppRunRecord.class); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 95afd0d3a62f..93b0fc0b2a32 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3760,6 +3760,14 @@ void update( List listAppRunRecord( @Bind("appId") String appId, @Bind("limit") int limit, @Bind("offset") int offset); + @SqlQuery( + "SELECT json FROM apps_extension_time_series where appId = :appId AND timestamp > :startTime ORDER BY timestamp DESC LIMIT :limit OFFSET :offset") + List listAppRunRecordAfterTime( + @Bind("appId") String appId, + @Bind("limit") int limit, + @Bind("offset") int offset, + @Bind("startTime") long startTime); + default String getLatestAppRun(UUID appId) { List result = listAppRunRecord(appId.toString(), 1, 0); if (!nullOrEmpty(result)) { @@ -3767,6 +3775,14 @@ default String getLatestAppRun(UUID appId) { } return null; } + + default String getLatestAppRun(UUID appId, long startTime) { + List result = listAppRunRecordAfterTime(appId.toString(), 1, 0, startTime); + if (!nullOrEmpty(result)) { + return result.get(0); + } + return null; + } } interface ReportDataTimeSeriesDAO extends EntityTimeSeriesDAO { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java index e8440bcc3b7c..e7de3257cbd7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java @@ -21,6 +21,7 @@ import org.openmetadata.service.migration.context.MigrationContext; import org.openmetadata.service.migration.context.MigrationWorkflowContext; import org.openmetadata.service.migration.utils.MigrationFile; +import org.openmetadata.service.util.AsciiTable; @Slf4j public class MigrationWorkflow { @@ -227,7 +228,9 @@ private void runSchemaChanges(List row, MigrationProcess process) { .toList()); LOG.info( "[MigrationWorkflow] Version : {} Run Schema Changes Query Status", process.getVersion()); - printToAsciiTable(schemaChangesColumns, allSchemaChangesRows, "No New Queries"); + LOG.debug( + new AsciiTable(schemaChangesColumns, allSchemaChangesRows, true, "", "No New Queries") + .render()); row.add(SUCCESS_MSG); } catch (Exception e) { row.add(FAILED_MSG + e.getMessage()); @@ -253,7 +256,9 @@ private void runPostDDLChanges(List row, MigrationProcess process) { entry.getValue().getStatus(), entry.getValue().getMessage()))) .toList()); LOG.info("[MigrationWorkflow] Version : {} Run Post DDL Query Status", process.getVersion()); - printToAsciiTable(schemaChangesColumns, allSchemaChangesRows, "No New Queries"); + LOG.debug( + new AsciiTable(schemaChangesColumns, allSchemaChangesRows, true, "", "No New Queries") + .render()); row.add(SUCCESS_MSG); } catch (Exception e) { row.add(FAILED_MSG + e.getMessage()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 32744eeff5c1..5afe845224c7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -3,6 +3,7 @@ import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.FIELD_OWNER; +import static org.openmetadata.service.formatter.decorators.MessageDecorator.getDateStringEpochMilli; import static org.openmetadata.service.util.AsciiTable.printOpenMetadataText; import ch.qos.logback.classic.Level; @@ -285,9 +286,10 @@ private int executeSearchReindexApp(String appName, int batchSize, boolean recre appRepository.patch(null, originalSearchIndexApp.getId(), "admin", patch); // Trigger Application + long currentTime = System.currentTimeMillis(); AppScheduler.getInstance().triggerOnDemandApplication(updatedSearchIndexApp); - int result = waitAndReturnReindexingAppStatus(updatedSearchIndexApp); + int result = waitAndReturnReindexingAppStatus(updatedSearchIndexApp, currentTime); // Repatch with original JsonPatch repatch = JsonUtils.getJsonPatch(updatedSearchIndexApp, originalSearchIndexApp); @@ -297,34 +299,53 @@ private int executeSearchReindexApp(String appName, int batchSize, boolean recre } @SneakyThrows - private int waitAndReturnReindexingAppStatus(App searchIndexApp) { - AppRunRecord appRunRecord; + private int waitAndReturnReindexingAppStatus(App searchIndexApp, long startTime) { + AppRunRecord appRunRecord = null; do { try { AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); - appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId()); + appRunRecord = + appRepository.getLatestAppRunsAfterStartTime(searchIndexApp.getId(), startTime); if (isRunCompleted(appRunRecord)) { List columns = new ArrayList<>( - List.of("status", "startTime", "endTime", "executionTime", "success", "failure")); + List.of( + "jobStatus", + "startTime", + "endTime", + "executionTime", + "successContext", + "failureContext")); List> rows = new ArrayList<>(); + + String startTimeofJob = + nullOrEmpty(appRunRecord.getStartTime()) + ? "Unavailable" + : getDateStringEpochMilli(appRunRecord.getStartTime()); + String endTimeofJob = + nullOrEmpty(appRunRecord.getEndTime()) + ? "Unavailable" + : getDateStringEpochMilli(appRunRecord.getEndTime()); + String executionTime = + nullOrEmpty(appRunRecord.getExecutionTime()) + ? "Unavailable" + : String.format("%d seconds", appRunRecord.getExecutionTime() / 1000); rows.add( Arrays.asList( getValueOrUnavailable(appRunRecord.getStatus().value()), - getValueOrUnavailable(appRunRecord.getStartTime()), - getValueOrUnavailable(appRunRecord.getEndTime()), - getValueOrUnavailable(appRunRecord.getExecutionTime()), + getValueOrUnavailable(startTimeofJob), + getValueOrUnavailable(endTimeofJob), + getValueOrUnavailable(executionTime), getValueOrUnavailable(appRunRecord.getSuccessContext()), getValueOrUnavailable(appRunRecord.getFailureContext()))); printToAsciiTable(columns, rows, "Failed to run Search Reindexing"); } - } catch (UnhandledServerException e) { - LOG.info( - "Reindexing Status not available yet, waiting for 10 seconds to fetch the status again."); - appRunRecord = null; - Thread.sleep(10000); + } catch (UnhandledServerException ignored) { } + LOG.info( + "Reindexing Status not available yet, waiting for 10 seconds to fetch the status again."); + Thread.sleep(10000); } while (!isRunCompleted(appRunRecord)); if (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)