Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing non-rollup and rollup indices to be searched together #1268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ class RollupInterceptor(

@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)

@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
searchEnabled = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) {
searchAllJobs = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) {
searchRawRollupIndices = it
}
}

@Suppress("SpreadOperator")
Expand Down Expand Up @@ -144,15 +149,16 @@ class RollupInterceptor(
private fun validateIndicies(concreteIndices: Array<String>, fieldMappings: Set<RollupFieldMapping>): Map<Rollup, Set<RollupFieldMapping>> {
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
for (concreteIndex in concreteIndices) {
val rollupJobs =
clusterService.state().metadata.index(concreteIndex).getRollupJobs()
?: throw IllegalArgumentException("Not all indices have rollup job")

val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
if (rollupJobs != null) {
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}
allMatchingRollupJobs += matchingRollupJobs
} else if (!searchRawRollupIndices) {
throw IllegalArgumentException("Not all indices have rollup job")
}
allMatchingRollupJobs += matchingRollupJobs
}
return allMatchingRollupJobs
}
Expand Down Expand Up @@ -347,6 +353,9 @@ class RollupInterceptor(
if (searchAllJobs) {
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
} else {
if (matchingRollupJobs.keys.size > 1) {
logger.trace("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window")
}
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RollupSettings {
companion object {
const val DEFAULT_ROLLUP_ENABLED = true
const val DEFAULT_SEARCH_ALL_JOBS = false
const val DEFAULT_SEARCH_SOURCE_INDICES = false
const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3
const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
Expand Down Expand Up @@ -85,6 +86,14 @@ class RollupSettings {
Setting.Property.Dynamic,
)

val ROLLUP_SEARCH_SOURCE_INDICES: Setting<Boolean> =
Setting.boolSetting(
"plugins.rollup.search.search_source_indices",
DEFAULT_SEARCH_SOURCE_INDICES,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
)

val ROLLUP_DASHBOARDS: Setting<Boolean> =
Setting.boolSetting(
"plugins.rollup.dashboards.enabled",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
RollupSettings.ROLLUP_DASHBOARDS,
SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES,
),
Expand Down Expand Up @@ -176,6 +177,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false)
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1)
assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateSearchRawRollupClusterSetting(value: Boolean) {
val formattedValue = "\"${value}\""
val request =
"""
{
"persistent": {
"${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue
}
}
""".trimIndent()
val res =
client().makeRequest(
"PUT", "_cluster/settings", emptyMap(),
StringEntity(request, ContentType.APPLICATION_JSON),
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun createSampleIndexForQSQTest(index: String) {
val mapping =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1092,12 +1092,12 @@ class RollupInterceptorIT : RollupRestTestCase() {
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing value_count?
It's better to check all this values are right in rollup search

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was initially also not being used in the test earlier since we were testing the validation (where we used to throw the exception) on rollup and source index searches together
I was actually trying with that too but was getting an Internal Server Error for the same
{"error":{"root_cause":[],"type":"search_phase_execution_exception","reason":"","phase":"fetch","grouped":true,"failed_shards":[],"caused_by":{"type":"class_cast_exception","reason":"class org.opensearch.search.aggregations.metrics.InternalScriptedMetric cannot be cast to class org.opensearch.search.aggregations.metrics.InternalValueCount (org.opensearch.search.aggregations.metrics.InternalScriptedMetric and org.opensearch.search.aggregations.metrics.InternalValueCount are in unnamed module of loader 'app')"}},"status":500}

"max_passenger_count": { "max": { "field": "passenger_count" } }
}
}
""".trimIndent()
// Search 1 non-rollup index and 1 rollup
// Search 1 non-rollup index and 1 rollup
updateSearchRawRollupClusterSetting(false)
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult1.restStatus() == RestStatus.OK)
val failures = extractFailuresFromSearchResponse(searchResult1)
Expand All @@ -1112,6 +1112,29 @@ class RollupInterceptorIT : RollupRestTestCase() {
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response",
)

// Updating to allow searching on non-rollup and rolled-up index together
updateSearchRawRollupClusterSetting(true)
val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes1.restStatus() == RestStatus.OK)
val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes2.restStatus() == RestStatus.OK)
val searchResult = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult.restStatus() == RestStatus.OK)
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rollupAggResMulti = searchResult.asMap()["aggregations"] as Map<String, Map<String, Any>>

val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double

assertEquals(
"Searching single raw source index and rollup target index did not return the same sum results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"],
)
assertEquals(
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"],
)

// Search 2 rollups with different mappings
try {
client().makeRequest(
Expand Down
Loading