Skip to content

Commit

Permalink
[Search] Indexing Fixes (#18048)
Browse files Browse the repository at this point in the history
* Fix OpenSearch Content Size Issue
entity content is too long [204857600] for the configured buffer limit [104857600]

* Change Type to Long

* Add Payload Size to take dynamic entry

* Migrations for 1.5.6

* Mark Stale Entries Stopped

* Format checkstyle

* Fix failure

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
  • Loading branch information
mohityadav766 and harshach committed Sep 30, 2024
1 parent 9552886 commit f1ae49e
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 9 deletions.
Empty file.
5 changes: 5 additions & 0 deletions bootstrap/sql/migrations/native/1.5.6/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Delete Search Indexing Application
DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE FROM installed_apps where name = 'SearchIndexingApplication';
DELETE FROM apps_marketplace where name = 'SearchIndexingApplication';
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Delete Search Indexing Application
DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM installed_apps where name = 'SearchIndexingApplication';
DELETE FROM apps_marketplace where name = 'SearchIndexingApplication';
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,21 @@ public void startApp(JobExecutionContext jobExecutionContext) {
}
}

private void cleanUpStaleJobsFromRuns() {
try {
collectionDAO
.appExtensionTimeSeriesDao()
.markStaleEntriesStopped(getApp().getId().toString());
} catch (Exception ex) {
LOG.error("Failed in Marking Stale Entries Stopped.");
}
}

private void initializeJob() {
// Remove any Stale Jobs
cleanUpStaleJobsFromRuns();

// Initialize New Job
int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
this.jobData.setStats(
new Stats()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,16 @@ interface AppExtensionTimeSeries {
connectionType = POSTGRES)
void insert(@Bind("json") String json);

@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running'",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running'",
connectionType = POSTGRES)
void markStaleEntriesStopped(@Bind("appId") String appId);

@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ public RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration
requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
restClientBuilder.setCompressionEnabled(true);
return new RestHighLevelClientBuilder(restClientBuilder.build())
.setApiCompatibilityMode(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final SearchRepository searchRepository;
private final int maxPayLoadSizeInBytes;
private final long maxPayLoadSizeInBytes;

public ElasticSearchIndexSink(
SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) {
SearchRepository searchRepository, int total, long maxPayLoadSizeInBytes) {
this.searchRepository = searchRepository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,8 @@ public RestHighLevelClient createOpenSearchClient(ElasticSearchConfiguration esC
requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
restClientBuilder.setCompressionEnabled(true);
restClientBuilder.setChunkedEnabled(true);
return new RestHighLevelClient(restClientBuilder);
} catch (Exception e) {
LOG.error("Failed to create open search client ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final SearchRepository searchRepository;

private final int maxPayLoadSizeInBytes;
private final long maxPayLoadSizeInBytes;

public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) {
public OpenSearchIndexSink(SearchRepository repository, int total, long maxPayLoadSizeInBytes) {
this.searchRepository = repository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ public Integer reIndex(
int batchSize,
@Option(
names = {"-p", "--payload-size"},
defaultValue = "10485760")
int payloadSize,
defaultValue = "104857600l")
long payloadSize,
@Option(
names = {"--recreate-indexes"},
defaultValue = "true")
Expand All @@ -295,7 +295,7 @@ public Integer reIndex(
}

private int executeSearchReindexApp(
String appName, int batchSize, int payloadSize, boolean recreateIndexes) {
String appName, int batchSize, long payloadSize, boolean recreateIndexes) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App originalSearchIndexApp =
appRepository.getByName(null, appName, appRepository.getFields("id"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
],
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"searchIndexMappingLanguage": "EN"
},
"appSchedule": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
],
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"searchIndexMappingLanguage": "EN"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
"type": "integer",
"default": 100
},
"payLoadSize": {
"description": "Maximum number of events sent in a batch (Default 100).",
"type": "integer",
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@
"type": "integer"
},
"payLoadSize": {
"description": "Payload size in bytes depending on config",
"description": "Payload size in bytes depending on config.",
"type": "integer",
"default": 10485760
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"type": "integer",
"default": 100
},
"payLoadSize": {
"title": "Payload Size",
"description": "Maximum number of events entities in a batch (Default 100).",
"type": "integer",
"default": 104857600
},
"entities": {
"title": "Entities",
"description": "List of entities that you need to reindex",
Expand Down

0 comments on commit f1ae49e

Please sign in to comment.