Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] Indexing Fixes #18048

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bootstrap/sql/migrations/native/1.5.6/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Delete Search Indexing Application
DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE FROM installed_apps where name = 'SearchIndexingApplication';
DELETE FROM apps_marketplace where name = 'SearchIndexingApplication';
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Delete Search Indexing Application
DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM installed_apps where name = 'SearchIndexingApplication';
DELETE FROM apps_marketplace where name = 'SearchIndexingApplication';
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,21 @@ public void startApp(JobExecutionContext jobExecutionContext) {
}
}

private void cleanUpStaleJobsFromRuns() {
try {
collectionDAO
.appExtensionTimeSeriesDao()
.markStaleEntriesStopped(getApp().getId().toString());
} catch (Exception ex) {
LOG.error("Failed in Marking Stale Entries Stopped.");
}
}

private void initializeJob() {
// Remove any Stale Jobs
cleanUpStaleJobsFromRuns();

// Initialize New Job
int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
this.jobData.setStats(
new Stats()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,16 @@ interface AppExtensionTimeSeries {
connectionType = POSTGRES)
void insert(@Bind("json") String json);

@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running'",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running'",
connectionType = POSTGRES)
void markStaleEntriesStopped(@Bind("appId") String appId);

@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ public RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration
requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
restClientBuilder.setCompressionEnabled(true);
return new RestHighLevelClientBuilder(restClientBuilder.build())
.setApiCompatibilityMode(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final SearchRepository searchRepository;
private final int maxPayLoadSizeInBytes;
private final long maxPayLoadSizeInBytes;

public ElasticSearchIndexSink(
SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) {
SearchRepository searchRepository, int total, long maxPayLoadSizeInBytes) {
this.searchRepository = searchRepository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,8 @@ public RestHighLevelClient createOpenSearchClient(ElasticSearchConfiguration esC
requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
restClientBuilder.setCompressionEnabled(true);
restClientBuilder.setChunkedEnabled(true);
return new RestHighLevelClient(restClientBuilder);
} catch (Exception e) {
LOG.error("Failed to create open search client ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final SearchRepository searchRepository;

private final int maxPayLoadSizeInBytes;
private final long maxPayLoadSizeInBytes;

public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) {
public OpenSearchIndexSink(SearchRepository repository, int total, long maxPayLoadSizeInBytes) {
this.searchRepository = repository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ public Integer reIndex(
int batchSize,
@Option(
names = {"-p", "--payload-size"},
defaultValue = "10485760")
int payloadSize,
defaultValue = "104857600l")
long payloadSize,
@Option(
names = {"--recreate-indexes"},
defaultValue = "true")
Expand All @@ -295,7 +295,7 @@ public Integer reIndex(
}

private int executeSearchReindexApp(
String appName, int batchSize, int payloadSize, boolean recreateIndexes) {
String appName, int batchSize, long payloadSize, boolean recreateIndexes) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App originalSearchIndexApp =
appRepository.getByName(null, appName, appRepository.getFields("id"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
],
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"searchIndexMappingLanguage": "EN"
},
"appSchedule": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
],
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"searchIndexMappingLanguage": "EN"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
"type": "integer",
"default": 100
},
"payLoadSize": {
"description": "Maximum number of events sent in a batch (Default 100).",
"type": "integer",
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@
"type": "integer"
},
"payLoadSize": {
"description": "Payload size in bytes depending on config",
"description": "Payload size in bytes depending on config.",
"type": "integer",
"default": 10485760
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"type": "integer",
"default": 100
},
"payLoadSize": {
"title": "Payload Size",
"description": "Maximum number of events entities in a batch (Default 100).",
"type": "integer",
"default": 104857600
},
"entities": {
"title": "Entities",
"description": "List of entities that you need to reindex",
Expand Down
Loading