From f1ae49e1a0fc560b3a611acb855ec5d465deff5c Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Mon, 30 Sep 2024 23:39:27 +0530 Subject: [PATCH] [Search] Indexing Fixes (#18048) * Fix OpenSearch Content Size Issue entity content is too long [204857600] for the configured buffer limit [104857600] * Change Type to Long * Add Payload Size to take dynamic entry * Migrations for 1.5.6 * Mark Stale Entries Stopped * Format checkstyle * Fix failure --------- Co-authored-by: Sriharsha Chintalapani --- .../1.5.6/mysql/postDataMigrationSQLScript.sql | 0 .../native/1.5.6/mysql/schemaChanges.sql | 5 +++++ .../1.5.6/postgres/postDataMigrationSQLScript.sql | 0 .../native/1.5.6/postgres/schemaChanges.sql | 5 +++++ .../apps/bundles/searchIndex/SearchIndexApp.java | 14 ++++++++++++++ .../openmetadata/service/jdbi3/CollectionDAO.java | 10 ++++++++++ .../search/elasticsearch/ElasticSearchClient.java | 1 + .../elasticsearch/ElasticSearchIndexSink.java | 4 ++-- .../search/opensearch/OpenSearchClient.java | 2 ++ .../search/opensearch/OpenSearchIndexSink.java | 4 ++-- .../service/util/OpenMetadataOperations.java | 6 +++--- .../json/data/app/SearchIndexingApplication.json | 1 + .../SearchIndexingApplication.json | 1 + .../internal/searchIndexingAppConfig.json | 6 ++++++ .../json/schema/system/eventPublisherJob.json | 5 +++-- .../SearchIndexingApplication.json | 6 ++++++ 16 files changed, 61 insertions(+), 9 deletions(-) create mode 100644 bootstrap/sql/migrations/native/1.5.6/mysql/postDataMigrationSQLScript.sql create mode 100644 bootstrap/sql/migrations/native/1.5.6/mysql/schemaChanges.sql create mode 100644 bootstrap/sql/migrations/native/1.5.6/postgres/postDataMigrationSQLScript.sql create mode 100644 bootstrap/sql/migrations/native/1.5.6/postgres/schemaChanges.sql diff --git a/bootstrap/sql/migrations/native/1.5.6/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.5.6/mysql/postDataMigrationSQLScript.sql new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/bootstrap/sql/migrations/native/1.5.6/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.5.6/mysql/schemaChanges.sql new file mode 100644 index 000000000000..e6c09213ef7a --- /dev/null +++ b/bootstrap/sql/migrations/native/1.5.6/mysql/schemaChanges.sql @@ -0,0 +1,5 @@ +-- Delete Search Indexing Application +DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication'; +DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication'; +DELETE FROM installed_apps where name = 'SearchIndexingApplication'; +DELETE FROM apps_marketplace where name = 'SearchIndexingApplication'; \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.5.6/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.5.6/postgres/postDataMigrationSQLScript.sql new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/bootstrap/sql/migrations/native/1.5.6/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.5.6/postgres/schemaChanges.sql new file mode 100644 index 000000000000..4a37bbc9d39c --- /dev/null +++ b/bootstrap/sql/migrations/native/1.5.6/postgres/schemaChanges.sql @@ -0,0 +1,5 @@ +-- Delete Search Indexing Application +DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication'; +DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication'; +DELETE FROM installed_apps where name = 'SearchIndexingApplication'; +DELETE FROM apps_marketplace where name = 'SearchIndexingApplication'; \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 977f82cd5e0f..adb45d380433 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -163,7 +163,21 @@ public void startApp(JobExecutionContext jobExecutionContext) { } } + private void cleanUpStaleJobsFromRuns() { + try { + collectionDAO + .appExtensionTimeSeriesDao() + .markStaleEntriesStopped(getApp().getId().toString()); + } catch (Exception ex) { + LOG.error("Failed in Marking Stale Entries Stopped."); + } + } + private void initializeJob() { + // Remove any Stale Jobs + cleanUpStaleJobsFromRuns(); + + // Initialize New Job int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO); this.jobData.setStats( new Stats() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index ae06d02bbc10..88772268aa7f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -4252,6 +4252,16 @@ interface AppExtensionTimeSeries { connectionType = POSTGRES) void insert(@Bind("json") String json); + @ConnectionAwareSqlUpdate( + value = + "UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running'", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running'", + connectionType = POSTGRES) + void markStaleEntriesStopped(@Bind("appId") String appId); + @ConnectionAwareSqlUpdate( value = "UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp", diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index c65cab8de183..7acefe878022 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -2228,6 +2228,7 @@ public RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration requestConfigBuilder .setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000) .setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000)); + restClientBuilder.setCompressionEnabled(true); return new RestHighLevelClientBuilder(restClientBuilder.build()) .setApiCompatibilityMode(true) .build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java index 553788aa1581..dedd9ac74cff 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexSink.java @@ -28,10 +28,10 @@ public class ElasticSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final SearchRepository searchRepository; - private final int maxPayLoadSizeInBytes; + private final long maxPayLoadSizeInBytes; public ElasticSearchIndexSink( - SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) { + SearchRepository searchRepository, int total, long maxPayLoadSizeInBytes) { this.searchRepository = searchRepository; this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes; this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 8a1c5b93b461..6e1b3551b93c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -2193,6 +2193,8 @@ public RestHighLevelClient createOpenSearchClient(ElasticSearchConfiguration esC requestConfigBuilder .setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000) .setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000)); + restClientBuilder.setCompressionEnabled(true); + restClientBuilder.setChunkedEnabled(true); return new RestHighLevelClient(restClientBuilder); } catch (Exception e) { LOG.error("Failed to create open search client ", e); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java index 7dc0766b5987..205b65cf3eac 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchIndexSink.java @@ -29,9 +29,9 @@ public class OpenSearchIndexSink implements Sink { private final StepStats stats = new StepStats(); private final SearchRepository searchRepository; - private final int maxPayLoadSizeInBytes; + private final long maxPayLoadSizeInBytes; - public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) { + public OpenSearchIndexSink(SearchRepository repository, int total, long maxPayLoadSizeInBytes) { this.searchRepository = repository; this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes; this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 754867160730..903f40eaa687 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -270,8 +270,8 @@ public Integer reIndex( int batchSize, @Option( names = {"-p", "--payload-size"}, - defaultValue = "10485760") - int payloadSize, + defaultValue = "104857600l") + long payloadSize, @Option( names = {"--recreate-indexes"}, defaultValue = "true") @@ -295,7 +295,7 @@ public Integer reIndex( } private int executeSearchReindexApp( - String appName, int batchSize, int payloadSize, boolean recreateIndexes) { + String appName, int batchSize, long payloadSize, boolean recreateIndexes) { AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); App originalSearchIndexApp = appRepository.getByName(null, appName, appRepository.getFields("id")); diff --git a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json index 285124f81f5a..3169c3634a42 100644 --- a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json @@ -45,6 +45,7 @@ ], "recreateIndex": false, "batchSize": "100", + "payLoadSize": 104857600, "searchIndexMappingLanguage": "EN" }, "appSchedule": { diff --git a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json index c8bb323a9142..8690cd5864d5 100644 --- a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json @@ -60,6 +60,7 @@ ], "recreateIndex": false, "batchSize": "100", + "payLoadSize": 104857600, "searchIndexMappingLanguage": "EN" } } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json index e432d259f5e4..f05af337987c 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/internal/searchIndexingAppConfig.json @@ -38,6 +38,12 @@ "type": "integer", "default": 100 }, + "payLoadSize": { + "description": "Maximum number of events sent in a batch (Default 100).", + "type": "integer", + "existingJavaType": "java.lang.Long", + "default": 104857600 + }, "searchIndexMappingLanguage": { "description": "Recreate Indexes with updated Language", "$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage" diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index e6f6b9922975..4a4f8b123f76 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -102,9 +102,10 @@ "type": "integer" }, "payLoadSize": { - "description": "Payload size in bytes depending on config", + "description": "Payload size in bytes depending on config.", "type": "integer", - "default": 10485760 + "existingJavaType": "java.lang.Long", + "default": 104857600 }, "searchIndexMappingLanguage": { "description": "Recreate Indexes with updated Language", diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json index f3332d92c856..69e218615f5a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ApplicationSchemas/SearchIndexingApplication.json @@ -10,6 +10,12 @@ "type": "integer", "default": 100 }, + "payLoadSize": { + "title": "Payload Size", + "description": "Maximum number of events entities in a batch (Default 100).", + "type": "integer", + "default": 104857600 + }, "entities": { "title": "Entities", "description": "List of entities that you need to reindex",