Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue-16487] Add Stop for Search Indexing #17914

Merged
merged 8 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ UPDATE test_suite
SET json = JSON_REMOVE(json, '$.testCaseResultSummary');

UPDATE test_case
SET json = JSON_REMOVE(json, '$.testCaseResult');
SET json = JSON_REMOVE(json, '$.testCaseResult');

-- Add Supports interrupts to SearchIndexingApplication
UPDATE installed_apps SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication';
UPDATE apps_marketplace SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication';

Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,20 @@ SET json = json - 'testCaseResultSummary';

UPDATE test_case
SET json = json - 'testCaseResult';

-- Add Supports interrupts to SearchIndexingApplication
UPDATE apps_marketplace
SET json = jsonb_set(
json::jsonb,
'{supportsInterrupt}',
to_jsonb(true)
)
where name = 'SearchIndexingApplication';

UPDATE installed_apps
SET json = jsonb_set(
json::jsonb,
'{supportsInterrupt}',
to_jsonb(true)
)
where name = 'SearchIndexingApplication';
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.quartz.JobExecutionContext;
import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;

@Getter
@Slf4j
public class AbstractNativeApplication implements NativeApplication {
protected CollectionDAO collectionDAO;
private App app;
protected SearchRepository searchRepository;
protected boolean isJobInterrupted = false;

// Default service that contains external apps' Ingestion Pipelines
private static final String SERVICE_NAME = "OpenMetadata";
Expand Down Expand Up @@ -296,4 +298,10 @@ protected void pushAppStatusUpdates(
OmAppJobListener listener = getJobListener(jobExecutionContext);
listener.pushApplicationStatusUpdates(jobExecutionContext, appRecord, update);
}

@Override
public void interrupt() throws UnableToInterruptJobException {
LOG.info("Interrupting the job for app: {}", this.app.getName());
isJobInterrupted = true;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.openmetadata.service.apps;

import org.openmetadata.schema.entity.app.App;
import org.quartz.Job;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;

public interface NativeApplication extends Job {
public interface NativeApplication extends InterruptableJob {
void init(App app);

void install();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void performReindex(JobExecutionContext jobExecutionContext) {
reCreateIndexes(paginatedSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType());
Object resultList;
while (!stopped && !paginatedSource.isDone()) {
while (!isJobInterrupted && !stopped && !paginatedSource.isDone()) {
try {
resultList = paginatedSource.readNext(null);
if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) {
Expand Down Expand Up @@ -264,6 +264,10 @@ private void performReindex(JobExecutionContext jobExecutionContext) {
paginatedSource.updateStats(
rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
} finally {
if (isJobInterrupted) {
LOG.info("Search Indexing will now return since the Job has been interrupted.");
jobData.setStatus(EventPublisherJob.Status.STOPPED);
}
updateStats(paginatedSource.getEntityType(), paginatedSource.getStats());
sendUpdates(jobExecutionContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,32 @@ public void triggerOnDemandApplication(App application) {
LOG.error("Failed in running job", ex);
}
}

public void stopApplicationRun(App application) {
if (application.getFullyQualifiedName() == null) {
throw new IllegalArgumentException("Application's fullyQualifiedName is null.");
}
try {
// Interrupt any scheduled job
JobDetail jobDetailScheduled =
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
if (jobDetailScheduled != null) {
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("Stopping Scheduled Execution for App : {}", application.getName());
scheduler.interrupt(jobDetailScheduled.getKey());
}

// Interrupt any on-demand job
JobDetail jobDetailOnDemand =
scheduler.getJobDetail(
new JobKey(
String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP));

if (jobDetailOnDemand != null) {
LOG.debug("Stopping On Demand Execution for App : {}", application.getName());
scheduler.interrupt(jobDetailOnDemand.getKey());
}
} catch (Exception ex) {
LOG.error("Failed to stop job execution.", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ private AppMarketPlaceDefinition getApplicationDefinition(
.withFeatures(create.getFeatures())
.withSourcePythonClass(create.getSourcePythonClass())
.withAllowConfiguration(create.getAllowConfiguration())
.withSystem(create.getSystem());
.withSystem(create.getSystem())
.withSupportsInterrupt(create.getSupportsInterrupt());

// Validate App
validateApplication(app);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ public Response delete(

limits.invalidateCache(entityType);
// Remove from Pipeline Service
deleteApp(securityContext, app, hardDelete);
deleteApp(securityContext, app);
return deleteByName(uriInfo, securityContext, name, true, hardDelete);
}

Expand Down Expand Up @@ -766,7 +766,7 @@ public Response delete(
.performCleanup(app, Entity.getCollectionDAO(), searchRepository);

// Remove from Pipeline Service
deleteApp(securityContext, app, hardDelete);
deleteApp(securityContext, app);
// Remove from repository
return delete(uriInfo, securityContext, id, true, hardDelete);
}
Expand Down Expand Up @@ -881,7 +881,7 @@ public Response configureApplication(
@Operation(
operationId = "triggerApplicationRun",
summary = "Trigger an Application run",
description = "Trigger a Application run by id.",
description = "Trigger a Application run by name.",
responses = {
@ApiResponse(
responseCode = "200",
Expand All @@ -905,15 +905,7 @@ public Response triggerApplicationRun(
return Response.status(Response.Status.OK).entity("Application Triggered").build();
} else {
if (!app.getPipelines().isEmpty()) {
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection());
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
PipelineServiceClientResponse response =
Expand All @@ -924,6 +916,47 @@ public Response triggerApplicationRun(
throw new BadRequestException("Failed to trigger application.");
}

@POST
@Path("/stop/{name}")
@Operation(
operationId = "stopApplicationRun",
summary = "Stop a Application run",
description = "Stop a application run by name.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Application stopped status code",
content = @Content(mediaType = "application/json")),
@ApiResponse(
responseCode = "404",
description = "Application for instance {id} is not found")
})
public Response stopApplicationRun(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string"))
@PathParam("name")
String name) {
EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS));
App app = repository.getByName(uriInfo, name, fields);
if (Boolean.TRUE.equals(app.getSupportsInterrupt())) {
if (app.getAppType().equals(AppType.Internal)) {
AppScheduler.getInstance().stopApplicationRun(app);
return Response.status(Response.Status.OK)
.entity("Application will be stopped in some time.")
.build();
} else {
if (!app.getPipelines().isEmpty()) {
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
return Response.status(response.getCode()).entity(response).build();
}
}
}
throw new BadRequestException("Application does not support Interrupts.");
}

@POST
@Path("/deploy/{name}")
@Operation(
Expand Down Expand Up @@ -953,21 +986,14 @@ public Response deployApplicationFlow(
return Response.status(Response.Status.OK).entity("Application Deployed").build();
} else {
if (!app.getPipelines().isEmpty()) {
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));

ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection());
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
PipelineServiceClientResponse status =
pipelineServiceClient.deployPipeline(ingestionPipeline, service);
if (status.getCode() == 200) {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ingestionPipelineRepository.createOrUpdate(uriInfo, ingestionPipeline);
} else {
ingestionPipeline.setDeployed(false);
Expand Down Expand Up @@ -1032,7 +1058,8 @@ private App getApplication(
.withFeatures(marketPlaceDefinition.getFeatures())
.withSourcePythonClass(marketPlaceDefinition.getSourcePythonClass())
.withAllowConfiguration(marketPlaceDefinition.getAllowConfiguration())
.withSystem(marketPlaceDefinition.getSystem());
.withSystem(marketPlaceDefinition.getSystem())
.withSupportsInterrupt(marketPlaceDefinition.getSupportsInterrupt());

// validate Bot if provided
validateAndAddBot(app, createAppRequest.getBot());
Expand All @@ -1048,7 +1075,23 @@ private void validateAndAddBot(App app, String botName) {
}
}

private void deleteApp(SecurityContext securityContext, App installedApp, boolean hardDelete) {
private IngestionPipeline getIngestionPipeline(
UriInfo uriInfo, SecurityContext securityContext, App app) {
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));

ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection());
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);

return ingestionPipeline;
}

private void deleteApp(SecurityContext securityContext, App installedApp) {
if (installedApp.getAppType().equals(AppType.Internal)) {
try {
AppScheduler.getInstance().deleteScheduledApplication(installedApp);
Expand All @@ -1058,13 +1101,8 @@ private void deleteApp(SecurityContext securityContext, App installedApp, boolea
}
} else {
if (!nullOrEmpty(installedApp.getPipelines())) {
EntityReference pipelineRef = installedApp.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
null, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
getIngestionPipeline(null, securityContext, installedApp);
try {
pipelineServiceClient.deletePipeline(ingestionPipeline);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@
"appSchedule": {
"scheduleTimeline": "Custom",
"cronExpression": "0 0 * * *"
}
},
"supportsInterrupt": true
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"runtime": {
"enabled": true
},
"supportsInterrupt": true,
"appConfiguration": {
"entities": [
"table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,11 @@ void delete_systemApp_400() throws IOException {
void post_trigger_app_200() throws HttpResponseException {
String appName = "SearchIndexingApplication";
postTriggerApp(appName, ADMIN_AUTH_HEADERS);
assertAppRanAfterTrigger(appName);
assertAppStatusAvailableAfterTrigger(appName);
assertAppRanAfterTriggerWithStatus(appName, AppRunRecord.Status.SUCCESS);
}

private void assertAppRanAfterTrigger(String appName) {
private void assertAppStatusAvailableAfterTrigger(String appName) {
assertEventually(
"appIsRunning",
() -> {
Expand All @@ -349,12 +350,13 @@ private void assertAppRanAfterTrigger(String appName) {
}
},
APP_TRIGGER_RETRY);
}

private void assertAppRanAfterTriggerWithStatus(String appName, AppRunRecord.Status status) {
assertEventually(
"appSuccess",
"appStatus",
() -> {
assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS)
.getStatus()
.equals(AppRunRecord.Status.SUCCESS);
assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS).getStatus().equals(status);
},
APP_TRIGGER_RETRY);
}
Expand Down Expand Up @@ -406,6 +408,13 @@ private void postTriggerApp(String appName, Map<String, String> authHeaders)
readResponse(response, OK.getStatusCode());
}

private void postAppStop(String appName, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("apps/stop").path(appName);
Response response = SecurityUtil.addHeaders(target, authHeaders).post(null);
readResponse(response, OK.getStatusCode());
}

private AppRunRecord getLatestAppRun(String appName, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource(String.format("apps/name/%s/runs/latest", appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@
"domain" : {
"description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.",
"$ref": "../../type/entityReference.json"
},
"supportsInterrupt": {
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"supportsInterrupt": {
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false
Expand Down
Loading
Loading