Skip to content

Commit

Permalink
- Fix for reindex job not waiting for completion (#16210)
Browse files Browse the repository at this point in the history
* - Fix for reindex job not waiting for completion

* - Review Comments
  • Loading branch information
mohityadav766 committed May 13, 2024
1 parent 1971d8b commit 4f9415f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.openmetadata.service.exception;

import javax.ws.rs.core.Response;
import org.openmetadata.sdk.exception.WebServiceException;

public class AppException extends WebServiceException {
public static final String APP_RUN_RECORD_NOT_FOUND = "No Available Application Run Records.";
private static final String ERROR_TYPE = "APP_ERROR";

public AppException(String message) {
super(Response.Status.INTERNAL_SERVER_ERROR, ERROR_TYPE, message);
}

private AppException(Response.Status status, String message) {
super(status, ERROR_TYPE, message);
}

public static AppException byMessage(Response.Status status, String errorMessage) {
return new AppException(status, errorMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,17 @@ default String getThreadAssetsUrl(
}
}

private String getDateString(long epochTimestamp) {
static String getDateString(long epochTimestamp) {
Instant instant = Instant.ofEpochSecond(epochTimestamp);
return getDateString(instant);
}

static String getDateStringEpochMilli(long epochTimestamp) {
Instant instant = Instant.ofEpochMilli(epochTimestamp);
return getDateString(instant);
}

private static String getDateString(Instant instant) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());

// Format LocalDateTime to a specific date and time format
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.openmetadata.service.jdbi3;

import static org.openmetadata.service.exception.AppException.APP_RUN_RECORD_NOT_FOUND;
import static org.openmetadata.service.resources.teams.UserResource.getUser;

import java.util.ArrayList;
Expand All @@ -19,8 +20,8 @@
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.AppException;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.resources.apps.AppResource;
import org.openmetadata.service.security.jwt.JWTTokenGenerator;
import org.openmetadata.service.util.EntityUtil;
Expand Down Expand Up @@ -211,7 +212,15 @@ protected void cleanup(App app) {
public AppRunRecord getLatestAppRuns(UUID appId) {
String json = daoCollection.appExtensionTimeSeriesDao().getLatestAppRun(appId);
if (json == null) {
throw new UnhandledServerException("No Available Application Run Records.");
throw new AppException(APP_RUN_RECORD_NOT_FOUND);
}
return JsonUtils.readValue(json, AppRunRecord.class);
}

public AppRunRecord getLatestAppRunsAfterStartTime(UUID appId, long startTime) {
String json = daoCollection.appExtensionTimeSeriesDao().getLatestAppRun(appId, startTime);
if (json == null) {
throw new AppException(APP_RUN_RECORD_NOT_FOUND);
}
return JsonUtils.readValue(json, AppRunRecord.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3760,13 +3760,29 @@ void update(
List<String> listAppRunRecord(
@Bind("appId") String appId, @Bind("limit") int limit, @Bind("offset") int offset);

@SqlQuery(
"SELECT json FROM apps_extension_time_series where appId = :appId AND timestamp > :startTime ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listAppRunRecordAfterTime(
@Bind("appId") String appId,
@Bind("limit") int limit,
@Bind("offset") int offset,
@Bind("startTime") long startTime);

default String getLatestAppRun(UUID appId) {
List<String> result = listAppRunRecord(appId.toString(), 1, 0);
if (!nullOrEmpty(result)) {
return result.get(0);
}
return null;
}

default String getLatestAppRun(UUID appId, long startTime) {
List<String> result = listAppRunRecordAfterTime(appId.toString(), 1, 0, startTime);
if (!nullOrEmpty(result)) {
return result.get(0);
}
return null;
}
}

interface ReportDataTimeSeriesDAO extends EntityTimeSeriesDAO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.openmetadata.service.migration.context.MigrationContext;
import org.openmetadata.service.migration.context.MigrationWorkflowContext;
import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.util.AsciiTable;

@Slf4j
public class MigrationWorkflow {
Expand Down Expand Up @@ -227,7 +228,9 @@ private void runSchemaChanges(List<String> row, MigrationProcess process) {
.toList());
LOG.info(
"[MigrationWorkflow] Version : {} Run Schema Changes Query Status", process.getVersion());
printToAsciiTable(schemaChangesColumns, allSchemaChangesRows, "No New Queries");
LOG.debug(
new AsciiTable(schemaChangesColumns, allSchemaChangesRows, true, "", "No New Queries")
.render());
row.add(SUCCESS_MSG);
} catch (Exception e) {
row.add(FAILED_MSG + e.getMessage());
Expand All @@ -253,7 +256,9 @@ private void runPostDDLChanges(List<String> row, MigrationProcess process) {
entry.getValue().getStatus(), entry.getValue().getMessage())))
.toList());
LOG.info("[MigrationWorkflow] Version : {} Run Post DDL Query Status", process.getVersion());
printToAsciiTable(schemaChangesColumns, allSchemaChangesRows, "No New Queries");
LOG.debug(
new AsciiTable(schemaChangesColumns, allSchemaChangesRows, true, "", "No New Queries")
.render());
row.add(SUCCESS_MSG);
} catch (Exception e) {
row.add(FAILED_MSG + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.formatter.decorators.MessageDecorator.getDateStringEpochMilli;
import static org.openmetadata.service.util.AsciiTable.printOpenMetadataText;

import ch.qos.logback.classic.Level;
Expand Down Expand Up @@ -285,9 +286,10 @@ private int executeSearchReindexApp(String appName, int batchSize, boolean recre
appRepository.patch(null, originalSearchIndexApp.getId(), "admin", patch);

// Trigger Application
long currentTime = System.currentTimeMillis();
AppScheduler.getInstance().triggerOnDemandApplication(updatedSearchIndexApp);

int result = waitAndReturnReindexingAppStatus(updatedSearchIndexApp);
int result = waitAndReturnReindexingAppStatus(updatedSearchIndexApp, currentTime);

// Repatch with original
JsonPatch repatch = JsonUtils.getJsonPatch(updatedSearchIndexApp, originalSearchIndexApp);
Expand All @@ -297,34 +299,53 @@ private int executeSearchReindexApp(String appName, int batchSize, boolean recre
}

@SneakyThrows
private int waitAndReturnReindexingAppStatus(App searchIndexApp) {
AppRunRecord appRunRecord;
private int waitAndReturnReindexingAppStatus(App searchIndexApp, long startTime) {
AppRunRecord appRunRecord = null;
do {
try {
AppRepository appRepository =
(AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId());
appRunRecord =
appRepository.getLatestAppRunsAfterStartTime(searchIndexApp.getId(), startTime);
if (isRunCompleted(appRunRecord)) {
List<String> columns =
new ArrayList<>(
List.of("status", "startTime", "endTime", "executionTime", "success", "failure"));
List.of(
"jobStatus",
"startTime",
"endTime",
"executionTime",
"successContext",
"failureContext"));
List<List<String>> rows = new ArrayList<>();

String startTimeofJob =
nullOrEmpty(appRunRecord.getStartTime())
? "Unavailable"
: getDateStringEpochMilli(appRunRecord.getStartTime());
String endTimeofJob =
nullOrEmpty(appRunRecord.getEndTime())
? "Unavailable"
: getDateStringEpochMilli(appRunRecord.getEndTime());
String executionTime =
nullOrEmpty(appRunRecord.getExecutionTime())
? "Unavailable"
: String.format("%d seconds", appRunRecord.getExecutionTime() / 1000);
rows.add(
Arrays.asList(
getValueOrUnavailable(appRunRecord.getStatus().value()),
getValueOrUnavailable(appRunRecord.getStartTime()),
getValueOrUnavailable(appRunRecord.getEndTime()),
getValueOrUnavailable(appRunRecord.getExecutionTime()),
getValueOrUnavailable(startTimeofJob),
getValueOrUnavailable(endTimeofJob),
getValueOrUnavailable(executionTime),
getValueOrUnavailable(appRunRecord.getSuccessContext()),
getValueOrUnavailable(appRunRecord.getFailureContext())));
printToAsciiTable(columns, rows, "Failed to run Search Reindexing");
}
} catch (UnhandledServerException e) {
LOG.info(
"Reindexing Status not available yet, waiting for 10 seconds to fetch the status again.");
appRunRecord = null;
Thread.sleep(10000);
} catch (UnhandledServerException ignored) {
}
LOG.info(
"Reindexing Status not available yet, waiting for 10 seconds to fetch the status again.");
Thread.sleep(10000);
} while (!isRunCompleted(appRunRecord));

if (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
Expand Down

0 comments on commit 4f9415f

Please sign in to comment.