Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow config update for external app #14034

Merged
merged 20 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- DataInsightsApplication should not allow configuration
update apps_marketplace
set json = JSON_INSERT(
JSON_REMOVE(json, '$.allowConfiguration'),
'$.allowConfiguration',
false
)
where name = 'DataInsightsApplication';

update installed_apps
set json = JSON_INSERT(
JSON_REMOVE(json, '$.allowConfiguration'),
'$.allowConfiguration',
false
)
where name = 'DataInsightsApplication';
16 changes: 16 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- DataInsightsApplication should not allow configuration
UPDATE apps_marketplace
SET json = jsonb_set(
json::jsonb,
'{allowConfiguration}',
to_jsonb(false)
)
where name = 'DataInsightsApplication';

UPDATE installed_apps
SET json = jsonb_set(
json::jsonb,
'{allowConfiguration}',
to_jsonb(false)
)
where name = 'DataInsightsApplication';
3 changes: 2 additions & 1 deletion ingestion/src/metadata/applications/auto_tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class AutoTaggerApp(AppRunner):
with a YAML file like:

sourcePythonClass: metadata.applications.auto_tagger.AutoTaggerApp
config:
appConfig:
type: AutoTagger
confidenceLevel: 80
workflowConfig:
loggerLevel: INFO
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/workflow/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import List, Optional

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.applications.configuration.externalApplicationConfig import (
from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
build_workflow_config_property,
)

from metadata.generated.schema.entity.applications.configuration.externalApplicationConfig import (
from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
build_usage_workflow_config,
)

from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import (
AutoTaggerAppConfig,
)
from metadata.generated.schema.entity.applications.configuration.externalApplicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY;
import static org.openmetadata.service.exception.CatalogExceptionMessage.INVALID_APP_TYPE;
import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR;

import com.cronutils.mapper.CronMapper;
Expand All @@ -21,11 +20,14 @@
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppType;
import org.openmetadata.schema.entity.app.ExternalAppIngestionConfig;
import org.openmetadata.schema.entity.app.ScheduleType;
import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
import org.openmetadata.schema.entity.services.ServiceType;
import org.openmetadata.schema.entity.applications.configuration.ApplicationConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.ApplicationPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.ProviderType;
Expand All @@ -35,8 +37,8 @@
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.MetadataServiceRepository;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
Expand All @@ -52,13 +54,25 @@ public class AbstractNativeApplication implements NativeApplication {
private final @Getter CronMapper cronMapper = CronMapper.fromQuartzToUnix();
private final @Getter CronParser cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));

// Default service that contains external apps' Ingestion Pipelines
private static final String SERVICE_NAME = "OpenMetadata";

@Override
public void init(App app, CollectionDAO dao, SearchRepository searchRepository) {
this.collectionDAO = dao;
this.searchRepository = searchRepository;
this.app = app;
}

@Override
public void install() {
if (app.getAppType() == AppType.Internal && app.getScheduleType().equals(ScheduleType.Scheduled)) {
scheduleInternal();
} else if (app.getAppType() == AppType.External && app.getScheduleType().equals(ScheduleType.Scheduled)) {
scheduleExternal();
}
}

@Override
public void triggerOnDemand() {
// Validate Native Application
Expand All @@ -72,94 +86,95 @@ public void triggerOnDemand() {
}
}

@Override
public void scheduleInternal() {
// Validate Native Application
if (app.getAppType() == AppType.Internal && app.getScheduleType().equals(ScheduleType.Scheduled)) {
AppRuntime runtime = JsonUtils.convertValue(app.getRuntime(), ScheduledExecutionContext.class);
validateServerExecutableApp(runtime);
// Schedule New Application Run
AppScheduler.getInstance().addApplicationSchedule(app);
return;
AppRuntime runtime = JsonUtils.convertValue(app.getRuntime(), ScheduledExecutionContext.class);
validateServerExecutableApp(runtime);
// Schedule New Application Run
AppScheduler.getInstance().addApplicationSchedule(app);
}

public void scheduleExternal() {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

try {
bindExistingIngestionToApplication(ingestionPipelineRepository);
} catch (EntityNotFoundException ex) {
ApplicationConfig config = JsonUtils.convertValue(this.getApp().getAppConfiguration(), ApplicationConfig.class);
createAndBindIngestionPipeline(ingestionPipelineRepository, config);
}
throw new IllegalArgumentException(INVALID_APP_TYPE);
}

@Override
public void initializeExternalApp() {
if (app.getAppType() == AppType.External && app.getScheduleType().equals(ScheduleType.Scheduled)) {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ExternalAppIngestionConfig ingestionConfig =
JsonUtils.convertValue(app.getAppConfiguration(), ExternalAppIngestionConfig.class);

try {
// Check if the Pipeline Already Exists
String fqn = FullyQualifiedName.add(ingestionConfig.getService().getName(), ingestionConfig.getName());
IngestionPipeline storedPipeline =
ingestionPipelineRepository.getByName(null, fqn, ingestionPipelineRepository.getFields("id"));

// Init Application Code for Some Initialization
List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO
.relationshipDAO()
.findTo(app.getId(), Entity.APPLICATION, Relationship.HAS.ordinal(), Entity.INGESTION_PIPELINE);

if (records.isEmpty()) {
// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
app.getId(),
storedPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} catch (EntityNotFoundException ex) {
// Pipeline needs to be created
EntityRepository<?> serviceRepository =
Entity.getServiceEntityRepository(ServiceType.fromValue(ingestionConfig.getService().getType()));
EntityReference service =
serviceRepository
.getByName(null, ingestionConfig.getService().getName(), serviceRepository.getFields("id"))
.getEntityReference();

Cron quartzCron = cronParser.parse(app.getAppSchedule().getCronExpression());

CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(ingestionConfig.getName())
.withDisplayName(ingestionConfig.getDisplayName())
.withDescription(ingestionConfig.getDescription())
.withPipelineType(ingestionConfig.getPipelineType())
.withSourceConfig(ingestionConfig.getSourceConfig())
.withAirflowConfig(
ingestionConfig.getAirflowConfig().withScheduleInterval(cronMapper.map(quartzCron).asString()))
.withService(service);

// Get Pipeline
IngestionPipeline dataInsightPipeline =
getIngestionPipeline(createPipelineRequest, String.format("%sBot", app.getName()), "admin")
.withProvider(ProviderType.USER);
ingestionPipelineRepository.setFullyQualifiedName(dataInsightPipeline);
ingestionPipelineRepository.initializeEntity(dataInsightPipeline);

// Add Ingestion Pipeline to Application
private void bindExistingIngestionToApplication(IngestionPipelineRepository ingestionPipelineRepository) {
String fqn = FullyQualifiedName.add(SERVICE_NAME, this.getApp().getName());
IngestionPipeline storedPipeline =
ingestionPipelineRepository.getByName(null, fqn, ingestionPipelineRepository.getFields("id"));

// Init Application Code for Some Initialization
List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO
.relationshipDAO()
.insert(
app.getId(),
dataInsightPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} else {
throw new IllegalArgumentException(INVALID_APP_TYPE);
.findTo(this.getApp().getId(), Entity.APPLICATION, Relationship.HAS.ordinal(), Entity.INGESTION_PIPELINE);

if (records.isEmpty()) {
// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
this.getApp().getId(),
storedPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
}

private void createAndBindIngestionPipeline(
IngestionPipelineRepository ingestionPipelineRepository, ApplicationConfig config) {
MetadataServiceRepository serviceEntityRepository =
(MetadataServiceRepository) Entity.getEntityRepository(Entity.METADATA_SERVICE);
EntityReference service =
serviceEntityRepository
.getByName(null, SERVICE_NAME, serviceEntityRepository.getFields("id"))
.getEntityReference();

Cron quartzCron = this.getCronParser().parse(this.getApp().getAppSchedule().getCronExpression());

CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(this.getApp().getName())
.withDisplayName(this.getApp().getDisplayName())
.withDescription(this.getApp().getDescription())
.withPipelineType(PipelineType.APPLICATION)
.withSourceConfig(
new SourceConfig()
.withConfig(
new ApplicationPipeline()
.withSourcePythonClass(this.getApp().getSourcePythonClass())
.withAppConfig(config)))
.withAirflowConfig(
new AirflowConfig().withScheduleInterval(this.getCronMapper().map(quartzCron).asString()))
.withService(service);

// Get Pipeline
IngestionPipeline ingestionPipeline =
getIngestionPipeline(createPipelineRequest, String.format("%sBot", this.getApp().getName()), "admin")
.withProvider(ProviderType.USER);
ingestionPipelineRepository.setFullyQualifiedName(ingestionPipeline);
ingestionPipelineRepository.initializeEntity(ingestionPipeline);

// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
this.getApp().getId(),
ingestionPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}

protected void validateServerExecutableApp(AppRuntime context) {
// Server apps are native
if (!app.getAppType().equals(AppType.Internal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppType;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;

Expand All @@ -17,24 +16,20 @@ private ApplicationHandler() {

public static void triggerApplicationOnDemand(
App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "triggerOnDemand", "triggerOnDemand");
runMethodFromApplication(app, daoCollection, searchRepository, "triggerOnDemand");
}

public static void scheduleApplication(App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "scheduleInternal", "initializeExternalApp");
public static void installApplication(App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "install");
}

public static void configureApplication(App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "configure", "configure");
runMethodFromApplication(app, daoCollection, searchRepository, "configure");
}

/** Load an App from its className and call its methods dynamically */
public static void runMethodFromApplication(
App app,
CollectionDAO daoCollection,
SearchRepository searchRepository,
String internalMethodName,
String externalMethodName) {
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) {
// Native Application
try {
Class<?> clz = Class.forName(app.getClassName());
Expand All @@ -44,14 +39,10 @@ public static void runMethodFromApplication(
Method initMethod = resource.getClass().getMethod("init", App.class, CollectionDAO.class, SearchRepository.class);
initMethod.invoke(resource, app, daoCollection, searchRepository);

// Call Trigger On Demand Method
if (app.getAppType() == AppType.Internal) {
Method scheduleMethod = resource.getClass().getMethod(internalMethodName);
scheduleMethod.invoke(resource);
} else if (app.getAppType() == AppType.External) {
Method scheduleMethod = resource.getClass().getMethod(externalMethodName);
scheduleMethod.invoke(resource);
}
// Call method on demand
Method scheduleMethod = resource.getClass().getMethod(methodName);
scheduleMethod.invoke(resource);

} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
LOG.error("Exception encountered", e);
throw new RuntimeException(e);
Expand Down
Loading
Loading