From d6da55cdcb9ac96163644f0bfa295d56db5fd915 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Wed, 21 Aug 2024 09:31:14 +0530 Subject: [PATCH] skipping execution based on cluster service (#1219) Signed-off-by: Sarthak Aggarwal --- .../indexmanagement/IndexManagementPlugin.kt | 2 +- .../PluginVersionSweepCoordinator.kt | 6 +- .../indexstatemanagement/SkipExecution.kt | 82 ++++++----------- .../coordinator/SkipExecutionTests.kt | 89 +++++++++++++++---- 4 files changed, 103 insertions(+), 76 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 7660d48e1..e59602205 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -411,7 +411,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver) this.indexNameExpressionResolver = indexNameExpressionResolver - val skipFlag = SkipExecution(client) + val skipFlag = SkipExecution() RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService) val rollupRunner = RollupRunner diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt index 20dc3bcea..55c7511ba 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt @@ -25,7 +25,7 @@ class PluginVersionSweepCoordinator( private val skipExecution: SkipExecution, settings: Settings, private val threadPool: ThreadPool, - clusterService: ClusterService, + var clusterService: ClusterService, ) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")), LifecycleListener(), ClusterStateListener { @@ -58,7 +58,7 @@ class PluginVersionSweepCoordinator( override fun clusterChanged(event: ClusterChangedEvent) { if (event.nodesChanged() || event.isNewCluster) { - skipExecution.sweepISMPluginVersion() + skipExecution.sweepISMPluginVersion(clusterService) initBackgroundSweepISMPluginVersionExecution() } } @@ -77,7 +77,7 @@ class PluginVersionSweepCoordinator( logger.info("Canceling sweep ism plugin version job") scheduledSkipExecution?.cancel() } else { - skipExecution.sweepISMPluginVersion() + skipExecution.sweepISMPluginVersion(clusterService) } } catch (e: Exception) { logger.error("Failed to sweep ism plugin version", e) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt index c74f27c89..1d0d6c841 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt @@ -6,20 +6,14 @@ package org.opensearch.indexmanagement.indexstatemanagement import org.apache.logging.log4j.LogManager -import org.opensearch.action.admin.cluster.node.info.NodesInfoAction -import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest -import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse -import org.opensearch.action.admin.cluster.node.info.PluginsAndModules -import org.opensearch.client.Client -import org.opensearch.core.action.ActionListener +import org.opensearch.Version +import org.opensearch.cluster.service.ClusterService import org.opensearch.indexmanagement.util.OpenForTesting // TODO this can be moved to job scheduler, so that all extended plugin // can avoid running jobs in an upgrading cluster @OpenForTesting -class SkipExecution( - private val client: Client, -) { +class SkipExecution { private val logger = LogManager.getLogger(javaClass) @Volatile @@ -31,53 +25,27 @@ class SkipExecution( final var hasLegacyPlugin: Boolean = false private set - fun sweepISMPluginVersion() { - // if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true - val request = NodesInfoRequest().clear().addMetric("plugins") - client.execute( - NodesInfoAction.INSTANCE, request, - object : ActionListener { - override fun onResponse(response: NodesInfoResponse) { - val versionSet = mutableSetOf() - val legacyVersionSet = mutableSetOf() - - response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos } - .forEach { - it.forEach { nodePlugin -> - if (nodePlugin.name == "opensearch-index-management" || - nodePlugin.name == "opensearch_index_management" - ) { - versionSet.add(nodePlugin.version) - } - - if (nodePlugin.name == "opendistro-index-management" || - nodePlugin.name == "opendistro_index_management" - ) { - legacyVersionSet.add(nodePlugin.version) - } - } - } - - if ((versionSet.size + legacyVersionSet.size) > 1) { - flag = true - logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]") - } else { - flag = false - } - - if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) { - hasLegacyPlugin = true - logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster") - } else { - hasLegacyPlugin = false - } - } - - override fun onFailure(e: Exception) { - logger.error("Failed sweeping nodes for ISM plugin versions: $e") - flag = false - } - }, - ) + fun sweepISMPluginVersion(clusterService: ClusterService) { + try { + // if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true + val currentMinVersion = clusterService.state().nodes.minNodeVersion + val currentMaxVersion = clusterService.state().nodes.maxNodeVersion + + if (currentMinVersion != null && !currentMinVersion.equals(currentMaxVersion)) { + flag = true + logger.info("There are multiple versions of Index Management plugins in the cluster: [$currentMaxVersion, $currentMinVersion]") + } else { + flag = false + } + + if (currentMinVersion.major > Version.CURRENT.major && currentMinVersion != currentMaxVersion) { + hasLegacyPlugin = true + logger.info("Found legacy plugin versions [$currentMinVersion] and opensearch plugins versions [$currentMaxVersion] in the cluster") + } else { + hasLegacyPlugin = false + } + } catch (e: Exception) { + logger.error("Unable to fetch node versions from cluster service", e) + } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt index 2421ff5f1..d41de106a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt @@ -5,29 +5,88 @@ package org.opensearch.indexmanagement.indexstatemanagement.coordinator +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.junit.Before -import org.mockito.Mockito -import org.opensearch.action.admin.cluster.node.info.NodesInfoAction -import org.opensearch.client.Client -import org.opensearch.cluster.ClusterChangedEvent -import org.opensearch.cluster.OpenSearchAllocationTestCase +import org.opensearch.Version +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.node.DiscoveryNode +import org.opensearch.cluster.node.DiscoveryNodes +import org.opensearch.cluster.service.ClusterService +import org.opensearch.core.common.transport.TransportAddress import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution +import org.opensearch.test.OpenSearchTestCase -class SkipExecutionTests : OpenSearchAllocationTestCase() { - private lateinit var client: Client +class SkipExecutionTests : OpenSearchTestCase() { + private var clusterService: ClusterService = mock() + private lateinit var clusterState: ClusterState private lateinit var skip: SkipExecution @Before - @Throws(Exception::class) fun setup() { - client = Mockito.mock(Client::class.java) - skip = SkipExecution(client) + skip = SkipExecution() } - fun `test cluster change event`() { - val event = Mockito.mock(ClusterChangedEvent::class.java) - Mockito.`when`(event.nodesChanged()).thenReturn(true) - skip.sweepISMPluginVersion() - Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any()) + fun `test sweepISMPluginVersion should set flag to false and hasLegacyPlugin to false when all nodes have the same version`() { + val version = Version.CURRENT + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version) + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion(clusterService) + + assertFalse(skip.flag) + assertFalse(skip.hasLegacyPlugin) + } + + fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to false when all nodes have the different versions`() { + val version1 = Version.CURRENT + val version2 = Version.V_2_0_0 + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version1) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version2) + val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), version2) + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion(clusterService) + + assertTrue(skip.flag) + assertFalse(skip.hasLegacyPlugin) + } + + fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true when there are different versions including current version`() { + val minVersion = Version.fromString("7.10.0") + val maxVersion = Version.CURRENT + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion) + val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion) + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion(clusterService) + + assertTrue(skip.flag) + assertTrue(skip.hasLegacyPlugin) + } + + fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true with different versions`() { + val minVersion = Version.fromString("7.10.0") + val maxVersion = Version.V_2_0_0 + val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion) + val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion) + val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion) + + val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build() + clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build() + whenever(clusterService.state()).thenReturn(clusterState) + + skip.sweepISMPluginVersion(clusterService) + + assertTrue(skip.flag) + assertTrue(skip.hasLegacyPlugin) } }