Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow config update for external app #14034

Merged
merged 20 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- DataInsightsApplication should not allow configuration
update apps_marketplace
set json = JSON_INSERT(
JSON_REMOVE(json, '$.allowConfiguration'),
'$.allowConfiguration',
false
)
where name = 'DataInsightsApplication';

update installed_apps
set json = JSON_INSERT(
JSON_REMOVE(json, '$.allowConfiguration'),
'$.allowConfiguration',
false
)
where name = 'DataInsightsApplication';
16 changes: 16 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- DataInsightsApplication should not allow configuration
UPDATE apps_marketplace
SET json = jsonb_set(
json::jsonb,
'{allowConfiguration}',
to_jsonb(false)
)
where name = 'DataInsightsApplication';

UPDATE installed_apps
SET json = jsonb_set(
json::jsonb,
'{allowConfiguration}',
to_jsonb(false)
)
where name = 'DataInsightsApplication';
3 changes: 2 additions & 1 deletion ingestion/src/metadata/applications/auto_tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class AutoTaggerApp(AppRunner):
with a YAML file like:

sourcePythonClass: metadata.applications.auto_tagger.AutoTaggerApp
config:
appConfig:
type: AutoTagger
confidenceLevel: 80
workflowConfig:
loggerLevel: INFO
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/workflow/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import List, Optional

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.applications.configuration.externalApplicationConfig import (
from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
build_workflow_config_property,
)

from metadata.generated.schema.entity.applications.configuration.externalApplicationConfig import (
from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
build_usage_workflow_config,
)

from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import (
AutoTaggerAppConfig,
)
from metadata.generated.schema.entity.applications.configuration.externalApplicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR;

import com.cronutils.mapper.CronMapper;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import java.util.List;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -21,24 +19,16 @@
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppType;
import org.openmetadata.schema.entity.app.ExternalAppIngestionConfig;
import org.openmetadata.schema.entity.app.ScheduleType;
import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
import org.openmetadata.schema.entity.services.ServiceType;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.quartz.JobExecutionContext;
Expand Down Expand Up @@ -87,77 +77,7 @@ public void scheduleInternal() {

@Override
public void initializeExternalApp() {
if (app.getAppType() == AppType.External && app.getScheduleType().equals(ScheduleType.Scheduled)) {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ExternalAppIngestionConfig ingestionConfig =
JsonUtils.convertValue(app.getAppConfiguration(), ExternalAppIngestionConfig.class);

try {
// Check if the Pipeline Already Exists
String fqn = FullyQualifiedName.add(ingestionConfig.getService().getName(), ingestionConfig.getName());
IngestionPipeline storedPipeline =
ingestionPipelineRepository.getByName(null, fqn, ingestionPipelineRepository.getFields("id"));

// Init Application Code for Some Initialization
List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO
.relationshipDAO()
.findTo(app.getId(), Entity.APPLICATION, Relationship.HAS.ordinal(), Entity.INGESTION_PIPELINE);

if (records.isEmpty()) {
// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
app.getId(),
storedPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} catch (EntityNotFoundException ex) {
// Pipeline needs to be created
EntityRepository<?> serviceRepository =
Entity.getServiceEntityRepository(ServiceType.fromValue(ingestionConfig.getService().getType()));
EntityReference service =
serviceRepository
.getByName(null, ingestionConfig.getService().getName(), serviceRepository.getFields("id"))
.getEntityReference();

Cron quartzCron = cronParser.parse(app.getAppSchedule().getCronExpression());

CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(ingestionConfig.getName())
.withDisplayName(ingestionConfig.getDisplayName())
.withDescription(ingestionConfig.getDescription())
.withPipelineType(ingestionConfig.getPipelineType())
.withSourceConfig(ingestionConfig.getSourceConfig())
.withAirflowConfig(
ingestionConfig.getAirflowConfig().withScheduleInterval(cronMapper.map(quartzCron).asString()))
.withService(service);

// Get Pipeline
IngestionPipeline dataInsightPipeline =
getIngestionPipeline(createPipelineRequest, String.format("%sBot", app.getName()), "admin")
.withProvider(ProviderType.USER);
ingestionPipelineRepository.setFullyQualifiedName(dataInsightPipeline);
ingestionPipelineRepository.initializeEntity(dataInsightPipeline);

// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
app.getId(),
dataInsightPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} else {
throw new IllegalArgumentException(INVALID_APP_TYPE);
}
// External apps should either use or extend `ExternalApplicationHandler`
}

protected void validateServerExecutableApp(AppRuntime context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.entity.applications.configuration.ExternalApplicationConfig;
import org.openmetadata.schema.entity.applications.configuration.ApplicationConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
Expand Down Expand Up @@ -34,8 +34,7 @@ public class ExternalApplicationHandler extends AbstractNativeApplication {
@Override
public void initializeExternalApp() {

ExternalApplicationConfig config =
JsonUtils.convertValue(this.getApp().getAppConfiguration(), ExternalApplicationConfig.class);
ApplicationConfig config = JsonUtils.convertValue(this.getApp().getAppConfiguration(), ApplicationConfig.class);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

Expand Down Expand Up @@ -85,7 +84,7 @@ public void initializeExternalApp() {
.withConfig(
new ApplicationPipeline()
.withSourcePythonClass(this.getApp().getSourcePythonClass())
.withAppConfig(config.getConfig())))
.withAppConfig(config)))
.withAirflowConfig(
new AirflowConfig().withScheduleInterval(this.getCronMapper().map(quartzCron).asString()))
.withService(service);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,112 @@
package org.openmetadata.service.apps.bundles.insights;

import static org.openmetadata.service.exception.CatalogExceptionMessage.INVALID_APP_TYPE;

import com.cronutils.model.Cron;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppType;
import org.openmetadata.schema.entity.app.ScheduleType;
import org.openmetadata.schema.entity.services.ServiceType;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.DataInsightPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.FullyQualifiedName;

@Slf4j
public class DataInsightsApp extends AbstractNativeApplication {

private static final String INGESTION_PIPELINE_NAME = "OpenMetadata_dataInsight";
private static final String SERVICE_NAME = "OpenMetadata";
private static final String SERVICE_TYPE = "Metadata";
private static final String PIPELINE_DESCRIPTION = "OpenMetadata DataInsight Pipeline";

@Override
public void init(App app, CollectionDAO dao, SearchRepository searchRepository) {
super.init(app, dao, searchRepository);
LOG.info("Data Insights App is initialized");
}

@Override
public void initializeExternalApp() {
if (getApp().getAppType() == AppType.External && getApp().getScheduleType().equals(ScheduleType.Scheduled)) {
pmbrull marked this conversation as resolved.
Show resolved Hide resolved
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

try {
// Check if the Pipeline Already Exists
String fqn = FullyQualifiedName.add(SERVICE_NAME, INGESTION_PIPELINE_NAME);
IngestionPipeline storedPipeline =
ingestionPipelineRepository.getByName(null, fqn, ingestionPipelineRepository.getFields("id"));

// Init Application Code for Some Initialization
List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO
.relationshipDAO()
.findTo(getApp().getId(), Entity.APPLICATION, Relationship.HAS.ordinal(), Entity.INGESTION_PIPELINE);

if (records.isEmpty()) {
// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
getApp().getId(),
storedPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} catch (EntityNotFoundException ex) {
// Pipeline needs to be created
EntityRepository<?> serviceRepository = Entity.getServiceEntityRepository(ServiceType.fromValue(SERVICE_TYPE));
pmbrull marked this conversation as resolved.
Show resolved Hide resolved
EntityReference service =
serviceRepository.getByName(null, SERVICE_NAME, serviceRepository.getFields("id")).getEntityReference();

Cron quartzCron = getCronParser().parse(getApp().getAppSchedule().getCronExpression());

CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(INGESTION_PIPELINE_NAME)
.withDisplayName(INGESTION_PIPELINE_NAME)
.withDescription(PIPELINE_DESCRIPTION)
.withPipelineType(PipelineType.DATA_INSIGHT)
.withSourceConfig(new SourceConfig().withConfig(new DataInsightPipeline()))
.withAirflowConfig(new AirflowConfig().withScheduleInterval(getCronMapper().map(quartzCron).asString()))
.withService(service);

// Get Pipeline
IngestionPipeline dataInsightPipeline =
getIngestionPipeline(createPipelineRequest, String.format("%sBot", getApp().getName()), "admin")
.withProvider(ProviderType.USER);
ingestionPipelineRepository.setFullyQualifiedName(dataInsightPipeline);
ingestionPipelineRepository.initializeEntity(dataInsightPipeline);

// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
getApp().getId(),
dataInsightPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} else {
throw new IllegalArgumentException(INVALID_APP_TYPE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.openmetadata.schema.dataInsight.type.TotalEntitiesByType;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppSchedule;
import org.openmetadata.schema.entity.app.DataInsightsReportAppConfig;
import org.openmetadata.schema.entity.applications.configuration.internal.DataInsightsReportAppConfig;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.EntityReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ private AppMarketPlaceDefinition getApplicationDefinition(
.withAppLogoUrl(create.getAppLogoUrl())
.withAppScreenshots(create.getAppScreenshots())
.withFeatures(create.getFeatures())
.withSourcePythonClass(create.getSourcePythonClass());
.withSourcePythonClass(create.getSourcePythonClass())
.withAllowConfiguration(create.getAllowConfiguration());

// Validate App
validateApplication(app);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,8 @@ private App getApplication(
.withAppLogoUrl(marketPlaceDefinition.getAppLogoUrl())
.withAppScreenshots(marketPlaceDefinition.getAppScreenshots())
.withFeatures(marketPlaceDefinition.getFeatures())
.withSourcePythonClass(marketPlaceDefinition.getSourcePythonClass());
.withSourcePythonClass(marketPlaceDefinition.getSourcePythonClass())
.withAllowConfiguration(marketPlaceDefinition.getAllowConfiguration());

// validate Bot if provided
validateAndAddBot(app, createAppRequest.getBot());
Expand Down
Loading
Loading