From 196f311544b32d6fce40f278977e5bfc253d76df Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 31 Aug 2023 09:54:08 -0700 Subject: [PATCH] Add primary first preference to all search requests Signed-off-by: bowenlan-amzn --- .../indexstatemanagement/ManagedIndexCoordinator.kt | 2 ++ .../transport/action/explain/TransportExplainAction.kt | 2 ++ .../transport/action/getpolicy/TransportGetPoliciesAction.kt | 2 ++ .../transport/action/indexpolicy/TransportIndexPolicyAction.kt | 2 ++ .../indexstatemanagement/util/ManagedIndexUtils.kt | 2 ++ 5 files changed, 10 insertions(+) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 8e955e034..3583122e2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -35,6 +35,7 @@ import org.opensearch.cluster.ClusterState import org.opensearch.cluster.ClusterStateListener import org.opensearch.cluster.block.ClusterBlockException import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.lifecycle.LifecycleListener import org.opensearch.common.regex.Regex @@ -427,6 +428,7 @@ class ManagedIndexCoordinator( ).size(MAX_HITS) ) .indices(INDEX_MANAGEMENT_INDEX) + .preference(Preference.PRIMARY_FIRST.type()) return try { val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index f31d8e3d1..9e67742b3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -24,6 +24,7 @@ import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.util.concurrent.ThreadContext @@ -177,6 +178,7 @@ class TransportExplainAction @Inject constructor( return SearchRequest() .indices(INDEX_MANAGEMENT_INDEX) .source(searchSourceBuilder) + .preference(Preference.PRIMARY_FIRST.type()) } private fun searchForMetadata(searchRequest: SearchRequest) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt index 607f13a49..9aaa9c91c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt @@ -13,6 +13,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.client.Client +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings @@ -90,6 +91,7 @@ class TransportGetPoliciesAction @Inject constructor( val searchRequest = SearchRequest() .source(searchSourceBuilder) .indices(INDEX_MANAGEMENT_INDEX) + .preference(Preference.PRIMARY_FIRST.type()) client.threadPool().threadContext.stashContext().use { client.search( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index 7240c4f77..a2f483cbb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -20,6 +20,7 @@ import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.AutoExpandReplicas +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance import org.opensearch.cluster.service.ClusterService import org.opensearch.common.ValidationException @@ -180,6 +181,7 @@ class TransportIndexPolicyAction @Inject constructor( ).size(MAX_HITS) ) .indices(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + .preference(Preference.PRIMARY_FIRST.type()) client.search( searchRequest, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index c4f31bc87..dc2e50f97 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -21,6 +21,7 @@ import org.opensearch.action.support.WriteRequest import org.opensearch.action.update.UpdateRequest // import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.client.Client +import org.opensearch.cluster.routing.Preference import org.opensearch.core.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler @@ -199,6 +200,7 @@ fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = Manag .fetchSource(emptyArray(), emptyArray()) .query(boolQueryBuilder) ) + .preference(Preference.PRIMARY_FIRST.type()) if (scroll) req.scroll(TimeValue.timeValueMinutes(1)) return req }