Skip to content

Commit

Permalink
Allowing non-rollup and rollup indices to be searched together
Browse files Browse the repository at this point in the history
Signed-off-by: Kshitij Tandon <tandonks@amazon.com>
  • Loading branch information
tandonks committed Sep 27, 2024
1 parent 316df70 commit 4ea022d
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ class RollupInterceptor(

@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)

@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
searchEnabled = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) {
searchAllJobs = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) {
searchRawRollupIndices = it
}
}

@Suppress("SpreadOperator")
Expand Down Expand Up @@ -144,15 +149,16 @@ class RollupInterceptor(
private fun validateIndicies(concreteIndices: Array<String>, fieldMappings: Set<RollupFieldMapping>): Map<Rollup, Set<RollupFieldMapping>> {
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
for (concreteIndex in concreteIndices) {
val rollupJobs =
clusterService.state().metadata.index(concreteIndex).getRollupJobs()
?: throw IllegalArgumentException("Not all indices have rollup job")

val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
if (rollupJobs != null) {
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}
allMatchingRollupJobs += matchingRollupJobs
} else if (!searchRawRollupIndices) {
throw IllegalArgumentException("Not all indices have rollup job")
}
allMatchingRollupJobs += matchingRollupJobs
}
return allMatchingRollupJobs
}
Expand Down Expand Up @@ -347,6 +353,9 @@ class RollupInterceptor(
if (searchAllJobs) {
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
} else {
if (matchingRollupJobs.keys.size > 1) {
logger.warn("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window")
}
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RollupSettings {
companion object {
const val DEFAULT_ROLLUP_ENABLED = true
const val DEFAULT_SEARCH_ALL_JOBS = false
const val DEFAULT_SEARCH_SOURCE_INDICES = false
const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3
const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
Expand Down Expand Up @@ -85,6 +86,14 @@ class RollupSettings {
Setting.Property.Dynamic,
)

val ROLLUP_SEARCH_SOURCE_INDICES: Setting<Boolean> =
Setting.boolSetting(
"plugins.rollup.search.search_source_indices",
DEFAULT_SEARCH_SOURCE_INDICES,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
)

val ROLLUP_DASHBOARDS: Setting<Boolean> =
Setting.boolSetting(
"plugins.rollup.dashboards.enabled",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
RollupSettings.ROLLUP_DASHBOARDS,
SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES,
),
Expand Down Expand Up @@ -176,6 +177,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false)
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1)
assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateSearchRawRollupClusterSetting(value: Boolean) {
val formattedValue = "\"${value}\""
val request =
"""
{
"persistent": {
"${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue
}
}
""".trimIndent()
val res =
client().makeRequest(
"PUT", "_cluster/settings", emptyMap(),
StringEntity(request, ContentType.APPLICATION_JSON),
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun createSampleIndexForQSQTest(index: String) {
val mapping =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,34 @@ class RollupInterceptorIT : RollupRestTestCase() {
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response",
)

// Updating to allow searching on non-rollup and rolled-up index together
updateSearchRawRollupClusterSetting(true)
val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes1.restStatus() == RestStatus.OK)
val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes2.restStatus() == RestStatus.OK)
val searchResult2 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult2.restStatus() == RestStatus.OK)
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rollupAggResMulti = searchResult2.asMap()["aggregations"] as Map<String, Map<String, Any>>

val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int
val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double

assertEquals(
"Searching single raw source index and rollup target index did not return the same sum results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"],
)
assertEquals(
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"],
)
assertEquals(
"Searching rollup target index did not return the value count for all of the rollup jobs on the index",
trueAggCount, rollupAggResMulti.getValue("value_count_passenger_count")["value"],
)

// Search 2 rollups with different mappings
try {
client().makeRequest(
Expand Down

0 comments on commit 4ea022d

Please sign in to comment.