Skip to content

Commit

Permalink
skipping execution based on cluster service (#1219)
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 authored Aug 21, 2024
1 parent b7ddbb6 commit d6da55c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client)
val skipFlag = SkipExecution()
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner =
RollupRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PluginVersionSweepCoordinator(
private val skipExecution: SkipExecution,
settings: Settings,
private val threadPool: ThreadPool,
clusterService: ClusterService,
var clusterService: ClusterService,
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
LifecycleListener(),
ClusterStateListener {
Expand Down Expand Up @@ -58,7 +58,7 @@ class PluginVersionSweepCoordinator(

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
skipExecution.sweepISMPluginVersion()
skipExecution.sweepISMPluginVersion(clusterService)
initBackgroundSweepISMPluginVersionExecution()
}
}
Expand All @@ -77,7 +77,7 @@ class PluginVersionSweepCoordinator(
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
skipExecution.sweepISMPluginVersion(clusterService)
}
} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@
package org.opensearch.indexmanagement.indexstatemanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.core.action.ActionListener
import org.opensearch.Version
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
) {
class SkipExecution {
private val logger = LogManager.getLogger(javaClass)

@Volatile
Expand All @@ -31,53 +25,27 @@ class SkipExecution(
final var hasLegacyPlugin: Boolean = false
private set

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
client.execute(
NodesInfoAction.INSTANCE, request,
object : ActionListener<NodesInfoResponse> {
override fun onResponse(response: NodesInfoResponse) {
val versionSet = mutableSetOf<String>()
val legacyVersionSet = mutableSetOf<String>()

response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos }
.forEach {
it.forEach { nodePlugin ->
if (nodePlugin.name == "opensearch-index-management" ||
nodePlugin.name == "opensearch_index_management"
) {
versionSet.add(nodePlugin.version)
}

if (nodePlugin.name == "opendistro-index-management" ||
nodePlugin.name == "opendistro_index_management"
) {
legacyVersionSet.add(nodePlugin.version)
}
}
}

if ((versionSet.size + legacyVersionSet.size) > 1) {
flag = true
logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]")
} else {
flag = false
}

if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) {
hasLegacyPlugin = true
logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster")
} else {
hasLegacyPlugin = false
}
}

override fun onFailure(e: Exception) {
logger.error("Failed sweeping nodes for ISM plugin versions: $e")
flag = false
}
},
)
fun sweepISMPluginVersion(clusterService: ClusterService) {
try {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val currentMinVersion = clusterService.state().nodes.minNodeVersion
val currentMaxVersion = clusterService.state().nodes.maxNodeVersion

if (currentMinVersion != null && !currentMinVersion.equals(currentMaxVersion)) {
flag = true
logger.info("There are multiple versions of Index Management plugins in the cluster: [$currentMaxVersion, $currentMinVersion]")
} else {
flag = false
}

if (currentMinVersion.major > Version.CURRENT.major && currentMinVersion != currentMaxVersion) {
hasLegacyPlugin = true
logger.info("Found legacy plugin versions [$currentMinVersion] and opensearch plugins versions [$currentMaxVersion] in the cluster")
} else {
hasLegacyPlugin = false
}
} catch (e: Exception) {
logger.error("Unable to fetch node versions from cluster service", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,88 @@

package org.opensearch.indexmanagement.indexstatemanagement.coordinator

import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import org.junit.Before
import org.mockito.Mockito
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
import org.opensearch.core.common.transport.TransportAddress
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.test.OpenSearchTestCase

class SkipExecutionTests : OpenSearchAllocationTestCase() {
private lateinit var client: Client
class SkipExecutionTests : OpenSearchTestCase() {
private var clusterService: ClusterService = mock()
private lateinit var clusterState: ClusterState
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
skip = SkipExecution(client)
skip = SkipExecution()
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
fun `test sweepISMPluginVersion should set flag to false and hasLegacyPlugin to false when all nodes have the same version`() {
val version = Version.CURRENT
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertFalse(skip.flag)
assertFalse(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to false when all nodes have the different versions`() {
val version1 = Version.CURRENT
val version2 = Version.V_2_0_0
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version1)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version2)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), version2)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertTrue(skip.flag)
assertFalse(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true when there are different versions including current version`() {
val minVersion = Version.fromString("7.10.0")
val maxVersion = Version.CURRENT
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertTrue(skip.flag)
assertTrue(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true with different versions`() {
val minVersion = Version.fromString("7.10.0")
val maxVersion = Version.V_2_0_0
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)

val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertTrue(skip.flag)
assertTrue(skip.hasLegacyPlugin)
}
}

0 comments on commit d6da55c

Please sign in to comment.