diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt index 8de821936..b6fb65187 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData @@ -27,12 +28,109 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) { abstract suspend fun execute(): Step - fun postExecute(logger: Logger): Step { + fun postExecute( + logger: Logger, + indexManagementActionMetrics: IndexManagementActionsMetrics, + step: Step, + startingManagedIndexMetaData: ManagedIndexMetaData, + ): Step { logger.info("Finished executing $name for ${context?.metadata?.index}") + val updatedStepMetaData = step.getUpdatedManagedIndexMetadata(startingManagedIndexMetaData) + emitTelemetry(indexManagementActionMetrics, updatedStepMetaData, logger) this.context = null return this } + private fun emitTelemetry( + indexManagementActionMetrics: IndexManagementActionsMetrics, + updatedStepMetaData: ManagedIndexMetaData, + logger: Logger, + ) { + when (context?.metadata?.actionMetaData?.name) { + IndexManagementActionsMetrics.ROLLOVER -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.ROLLOVER, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.FORCE_MERGE -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.FORCE_MERGE, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.DELETE -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.DELETE, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.REPLICA_COUNT -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.REPLICA_COUNT, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.TRANSITION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.TRANSITION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.NOTIFICATION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.NOTIFICATION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.CLOSE -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.CLOSE, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SET_INDEX_PRIORITY -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SET_INDEX_PRIORITY, // problem in test + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.OPEN -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.OPEN, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.MOVE_SHARD -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.MOVE_SHARD, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SET_READ_ONLY -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SET_READ_ONLY, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SHRINK -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SHRINK, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SNAPSHOT -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SNAPSHOT, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.ALIAS_ACTION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.ALIAS_ACTION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.ALLOCATION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.ALLOCATION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + else -> { + logger.info( + "Action Metrics is not supported for this action [%s]", + context?.metadata?.actionMetaData?.name, + ) + } + } + } + abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData abstract fun isIdempotent(): Boolean diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt new file mode 100644 index 000000000..725893a21 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AliasActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AllocationActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.MoveShardActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ShrinkActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags + +abstract class ActionMetrics { + abstract val actionName: String + + fun createTags(context: StepContext): Tags { + val tags = Tags.create() + .addTag("index_name", context.metadata.index) + .addTag("policy_id", context.metadata.policyID) + .addTag("node_id", context.clusterService.nodeName ?: "") + .addTag("index_uuid", context.metadata.indexUuid) + return tags + } + + abstract fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) +} + +class IndexManagementActionsMetrics private constructor() { + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var actionMetricsMap: Map + + companion object { + val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance } + + const val ROLLOVER = "rollover" + const val NOTIFICATION = "notification" + const val FORCE_MERGE = "force_merge" + const val DELETE = "delete" + const val REPLICA_COUNT = "replica_count" + const val TRANSITION = "transition" + const val CLOSE = "close" + const val SET_INDEX_PRIORITY = "set_index_priority" + const val OPEN = "open" + const val MOVE_SHARD = "move_shard" + const val SET_READ_ONLY = "set_read_only" + const val SHRINK = "shrink" + const val SNAPSHOT = "snapshot" + const val ALIAS_ACTION = "alias_action" + const val ALLOCATION = "allocation" + + private object HOLDER { + val instance = IndexManagementActionsMetrics() + } + } + + fun initialize(metricsRegistry: MetricsRegistry) { + this.metricsRegistry = metricsRegistry + + RolloverActionMetrics.instance.initializeCounters(metricsRegistry) + NotificationActionMetrics.instance.initializeCounters(metricsRegistry) + ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry) + DeleteActionMetrics.instance.initializeCounters(metricsRegistry) + ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry) + TransitionActionMetrics.instance.initializeCounters(metricsRegistry) + CloseActionMetrics.instance.initializeCounters(metricsRegistry) + SetIndexPriorityActionMetrics.instance.initializeCounters(metricsRegistry) + OpenActionMetrics.instance.initializeCounters(metricsRegistry) + MoveShardActionMetrics.instance.initializeCounters(metricsRegistry) + SetReadOnlyActionMetrics.instance.initializeCounters(metricsRegistry) + ShrinkActionMetrics.instance.initializeCounters(metricsRegistry) + SnapshotActionMetrics.instance.initializeCounters(metricsRegistry) + AliasActionMetrics.instance.initializeCounters(metricsRegistry) + AllocationActionMetrics.instance.initializeCounters(metricsRegistry) + + actionMetricsMap = mapOf( + ROLLOVER to RolloverActionMetrics.instance, + NOTIFICATION to NotificationActionMetrics.instance, + FORCE_MERGE to ForceMergeActionMetrics.instance, + DELETE to DeleteActionMetrics.instance, + REPLICA_COUNT to ReplicaCountActionMetrics.instance, + TRANSITION to TransitionActionMetrics.instance, + CLOSE to CloseActionMetrics.instance, + SET_INDEX_PRIORITY to SetIndexPriorityActionMetrics.instance, + OPEN to OpenActionMetrics.instance, + MOVE_SHARD to MoveShardActionMetrics.instance, + SET_READ_ONLY to SetReadOnlyActionMetrics.instance, + SHRINK to ShrinkActionMetrics.instance, + SNAPSHOT to SnapshotActionMetrics.instance, + ALIAS_ACTION to AliasActionMetrics.instance, + ALLOCATION to AllocationActionMetrics.instance, + ) + } + + fun getActionMetrics(actionName: String): ActionMetrics? { + return actionMetricsMap[actionName] + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt new file mode 100644 index 000000000..5652a292e --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class AliasActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.ALIAS_ACTION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Alias Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Alias Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Alias Actions", "milliseconds") + } + + companion object { + val instance: AliasActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = AliasActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val aliasActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALIAS_ACTION) as AliasActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + aliasActionMetrics.successes.add(1.0, context.let { aliasActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + aliasActionMetrics.failures.add(1.0, context.let { aliasActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + aliasActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { aliasActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt new file mode 100644 index 000000000..363fde91d --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class AllocationActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.ALLOCATION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Allocation Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Allocation Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Allocation Actions", "milliseconds") + } + + companion object { + val instance: AllocationActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = AllocationActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val allocationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALLOCATION) as AllocationActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + allocationActionMetrics.successes.add(1.0, context.let { allocationActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + allocationActionMetrics.failures.add(1.0, context.let { allocationActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + allocationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { allocationActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt new file mode 100644 index 000000000..1f301872d --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class CloseActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.CLOSE + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Close Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Close Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Close Actions", "milliseconds") + } + + companion object { + val instance: CloseActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = CloseActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val closeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + closeActionMetrics.successes.add(1.0, context.let { closeActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + closeActionMetrics.failures.add(1.0, context.let { closeActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + closeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { closeActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt new file mode 100644 index 000000000..b8d236364 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class DeleteActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.DELETE + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Delete Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Delete Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Delete Action", "milliseconds") + } + + companion object { + val instance: DeleteActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = DeleteActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val deleteActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.DELETE) as DeleteActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + deleteActionMetrics.successes.add(1.0, context.let { deleteActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + deleteActionMetrics.failures.add(1.0, context.let { deleteActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + deleteActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { deleteActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt new file mode 100644 index 000000000..5b7858ee1 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class ForceMergeActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.FORCE_MERGE + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Force Merge Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Force Merge Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Force Merge Action", "milliseconds") + } + + companion object { + val instance: ForceMergeActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = ForceMergeActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val forceMergeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.FORCE_MERGE) as ForceMergeActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + forceMergeActionMetrics.successes.add(1.0, context.let { forceMergeActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + forceMergeActionMetrics.failures.add(1.0, context.let { forceMergeActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + forceMergeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { forceMergeActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt new file mode 100644 index 000000000..2f86100d9 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class MoveShardActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.MOVE_SHARD + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Move Shard Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Move Shard Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Move Shard Actions", "milliseconds") + } + + companion object { + val instance: MoveShardActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = MoveShardActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val moveShardActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.MOVE_SHARD) as MoveShardActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + moveShardActionMetrics.successes.add(1.0, context.let { moveShardActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + moveShardActionMetrics.failures.add(1.0, context.let { moveShardActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + moveShardActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { moveShardActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt new file mode 100644 index 000000000..0faa9b9fd --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class NotificationActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.NOTIFICATION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Notification Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Notification Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Notification Action", "milliseconds") + } + + companion object { + val instance: NotificationActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = NotificationActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val notificationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.NOTIFICATION) as NotificationActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + notificationActionMetrics.successes.add(1.0, context.let { notificationActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + notificationActionMetrics.failures.add(1.0, context.let { notificationActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + notificationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { notificationActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt new file mode 100644 index 000000000..ad29a5f8a --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class OpenActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.OPEN + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Open Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Open Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Open Actions", "milliseconds") + } + + companion object { + val instance: OpenActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = OpenActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val openActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.OPEN) as OpenActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + openActionMetrics.successes.add(1.0, context.let { openActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + openActionMetrics.failures.add(1.0, context.let { openActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + openActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { openActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt new file mode 100644 index 000000000..94e60b892 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class ReplicaCountActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.REPLICA_COUNT + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Replica Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Replica Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Replica Count Action", "milliseconds") + } + + companion object { + val instance: ReplicaCountActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = ReplicaCountActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val replicaCountActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.REPLICA_COUNT) as ReplicaCountActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + replicaCountActionMetrics.successes.add(1.0, context.let { replicaCountActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + replicaCountActionMetrics.failures.add(1.0, context.let { replicaCountActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + replicaCountActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { replicaCountActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt new file mode 100644 index 000000000..3d2965702 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class RolloverActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.ROLLOVER + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Rollover Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Rollover Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Rollover Actions", "milliseconds") + } + + companion object { + val instance: RolloverActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = RolloverActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val rolloverActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ROLLOVER) as RolloverActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + rolloverActionMetrics.successes.add(1.0, context.let { rolloverActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + rolloverActionMetrics.failures.add(1.0, context.let { rolloverActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + rolloverActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { rolloverActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt new file mode 100644 index 000000000..023461829 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class SetIndexPriorityActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SET_INDEX_PRIORITY + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Set Index Priority Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Set Index Priority Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Set Index Priority Actions", "milliseconds") + } + + companion object { + val instance: SetIndexPriorityActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = SetIndexPriorityActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val setIndexPriorityActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SET_INDEX_PRIORITY) as SetIndexPriorityActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + setIndexPriorityActionMetrics.successes.add(1.0, context.let { setIndexPriorityActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + setIndexPriorityActionMetrics.failures.add(1.0, context.let { setIndexPriorityActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + setIndexPriorityActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { setIndexPriorityActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt new file mode 100644 index 000000000..a3a8b4737 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class SetReadOnlyActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SET_READ_ONLY + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Set Read Only Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Set Read Only Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Set Read Only Actions", "milliseconds") + } + + companion object { + val instance: SetReadOnlyActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = SetReadOnlyActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val setReadOnlyActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SET_READ_ONLY) as SetReadOnlyActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + setReadOnlyActionMetrics.successes.add(1.0, context.let { setReadOnlyActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + setReadOnlyActionMetrics.failures.add(1.0, context.let { setReadOnlyActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + setReadOnlyActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { setReadOnlyActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt new file mode 100644 index 000000000..704feca3c --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class ShrinkActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SHRINK + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Shrink Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Shrink Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Shrink Actions", "milliseconds") + } + + companion object { + val instance: ShrinkActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = ShrinkActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val shrinkActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SHRINK) as ShrinkActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + shrinkActionMetrics.successes.add(1.0, context.let { shrinkActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + shrinkActionMetrics.failures.add(1.0, context.let { shrinkActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + shrinkActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { shrinkActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt new file mode 100644 index 000000000..10d74e195 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class SnapshotActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SNAPSHOT + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Snapshot Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Snapshot Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Snapshot Actions", "milliseconds") + } + + companion object { + val instance: SnapshotActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = SnapshotActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val snapshotActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SNAPSHOT) as SnapshotActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + snapshotActionMetrics.successes.add(1.0, context.let { snapshotActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + snapshotActionMetrics.failures.add(1.0, context.let { snapshotActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + snapshotActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { snapshotActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt new file mode 100644 index 000000000..8049ab25e --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class TransitionActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.TRANSITION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Transition Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Transition Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Transition Actions", "milliseconds") + } + + companion object { + val instance: TransitionActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = TransitionActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val transitionActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.TRANSITION) as TransitionActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + transitionActionMetrics.successes.add(1.0, context.let { transitionActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + transitionActionMetrics.failures.add(1.0, context.let { transitionActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + transitionActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { transitionActionMetrics.createTags(it) }) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 3a67de075..7660d48e1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -145,6 +145,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManage import org.opensearch.indexmanagement.spi.IndexManagementExtension import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.transform.TargetIndexMappingService import org.opensearch.indexmanagement.transform.TransformRunner @@ -184,10 +185,13 @@ import org.opensearch.plugins.ExtensiblePlugin import org.opensearch.plugins.NetworkPlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.SystemIndexPlugin +import org.opensearch.plugins.TelemetryAwarePlugin import org.opensearch.repositories.RepositoriesService import org.opensearch.rest.RestController import org.opensearch.rest.RestHandler import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.tracing.Tracer import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.RemoteClusterService import org.opensearch.transport.TransportInterceptor @@ -196,7 +200,8 @@ import org.opensearch.watcher.ResourceWatcherService import java.util.function.Supplier @Suppress("TooManyFunctions") -class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() { +class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, + TelemetryAwarePlugin, Plugin() { private val logger = LogManager.getLogger(javaClass) lateinit var indexManagementIndices: IndexManagementIndices lateinit var actionValidation: ActionValidation @@ -210,6 +215,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin private val extensions = mutableSetOf() private val extensionCheckerMap = mutableMapOf() lateinit var indexOperationActionFilter: IndexOperationActionFilter + private lateinit var metricsRegistry: MetricsRegistry companion object { const val PLUGINS_BASE_URI = "/_plugins" @@ -374,8 +380,11 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin namedWriteableRegistry: NamedWriteableRegistry, indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier, + tracer: Tracer, + metricsRegistry: MetricsRegistry, ): Collection { val settings = environment.settings() + this.metricsRegistry = metricsRegistry this.clusterService = clusterService QueryShardContextFactory.init( client, @@ -385,6 +394,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin namedWriteableRegistry, environment, ) + + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) val jvmService = JvmService(environment.settings()) val transformRunner = @@ -453,6 +464,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin .registerThreadPool(threadPool) .registerExtensionChecker(extensionChecker) .registerIndexMetadataProvider(indexMetadataProvider) + .registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) val managedIndexCoordinator = ManagedIndexCoordinator( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index ea7ed3765..003a612ce 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -86,6 +86,7 @@ import org.opensearch.indexmanagement.opensearchapi.withClosableContext import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData @@ -121,6 +122,7 @@ object ManagedIndexRunner : private lateinit var skipExecFlag: SkipExecution private lateinit var threadPool: ThreadPool private lateinit var extensionStatusChecker: ExtensionStatusChecker + lateinit var indexManagementActionMetrics: IndexManagementActionsMetrics private lateinit var indexMetadataProvider: IndexMetadataProvider private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED private var validationServiceEnabled: Boolean = DEFAULT_ACTION_VALIDATION_ENABLED @@ -221,6 +223,11 @@ object ManagedIndexRunner : return this } + fun registerIndexManagementActionMetrics(indexManagementActionsMetrics: IndexManagementActionsMetrics): Any { + this.indexManagementActionMetrics = indexManagementActionsMetrics + return this + } + override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { if (job !is ManagedIndexConfig) { throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}") @@ -446,7 +453,8 @@ object ManagedIndexRunner : managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user, ), ) { - step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger) + step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute() + .postExecute(logger, indexManagementActionMetrics, step, startingManagedIndexMetaData) } var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt index a7e2851e6..09fc4af00 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -9,8 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.close.CloseIndexResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,15 +22,23 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.close.AttemptCloseStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant import kotlin.IllegalArgumentException class AttemptCloseStepTests : OpenSearchTestCase() { @@ -34,18 +46,32 @@ class AttemptCloseStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var closeActionMetrics: CloseActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + closeActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics + } fun `test close step sets step status to completed when successful`() { val closeIndexResponse = CloseIndexResponse(true, true, listOf()) val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(closeActionMetrics.successes).add(eq(1.0), any()) } } @@ -54,12 +80,13 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(closeActionMetrics.failures).add(eq(1.0), any()) } } @@ -68,12 +95,13 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(closeActionMetrics.failures).add(eq(1.0), any()) } } @@ -85,7 +113,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } @@ -110,13 +138,14 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(closeActionMetrics.failures).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt index 4dcd6e2f1..2f99551ab 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt @@ -8,8 +8,12 @@ package org.opensearch.indexmanagement.indexstatemanagement.step import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito.doAnswer import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient @@ -18,32 +22,57 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.delete.AttemptDeleteStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase +import java.time.Instant class AttemptDeleteStepTests : OpenSearchTestCase() { private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var deleteActionMetrics: DeleteActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + deleteActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.DELETE) as DeleteActionMetrics + } fun `test delete step sets step status to completed when successful`() { val acknowledgedResponse = AcknowledgedResponse(true) val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData( + "test", "indexUuid", "policy_id", null, null, null, null, null, null, null, + ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null, + ) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(deleteActionMetrics.successes).add(eq(1.0), any()) } } @@ -52,12 +81,13 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(deleteActionMetrics.failures).add(eq(1.0), any()) } } @@ -66,13 +96,14 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(deleteActionMetrics.failures).add(eq(1.0), any()) } } @@ -81,10 +112,10 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt index c0797e3d9..b0986c7ad 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt @@ -9,8 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.open.OpenIndexResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,32 +22,53 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class AttemptOpenStepTests : OpenSearchTestCase() { private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var openActionMetrics: OpenActionMetrics + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + openActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.OPEN) as OpenActionMetrics + } fun `test open step sets step status to failed when not acknowledged`() { val openIndexResponse = OpenIndexResponse(false, false) val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("open", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptOpenStep.preExecute(logger, context).execute() + attemptOpenStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptOpenStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(openActionMetrics.failures).add(eq(1.0), any()) } } @@ -52,12 +77,13 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("open", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptOpenStep.preExecute(logger, context).execute() + attemptOpenStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptOpenStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(openActionMetrics.failures).add(eq(1.0), any()) } } @@ -66,13 +92,14 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("open", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptOpenStep.preExecute(logger, context).execute() + attemptOpenStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptOpenStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(openActionMetrics.failures).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt index 69f674f12..b720614d6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt @@ -9,9 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient @@ -24,15 +27,23 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase +import java.time.Instant class AttemptRolloverStepTests : OpenSearchTestCase() { private val clusterService: ClusterService = mock() @@ -42,9 +53,20 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { private val oldIndexName = "old_index" private val newIndexName = "new_index" val alias = "alias" + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var rolloverActionMetrics: RolloverActionMetrics @Before fun setup() { + // Setup metrics + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + rolloverActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.ROLLOVER) as RolloverActionMetrics + // mock rollover target val clusterState: ClusterState = mock() val metadata: Metadata = mock() @@ -78,15 +100,16 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { oldIndexName, "indexUuid", "policy_id", null, null, null, null, null, null, - null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null, rolledOverIndexName = newIndexName, ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasNotAckMessage(oldIndexName, newIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.failures).add(eq(1.0), any()) } } @@ -103,15 +126,16 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { oldIndexName, "indexUuid", "policy_id", null, null, null, null, null, null, - null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null, rolledOverIndexName = newIndexName, ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getFailedCopyAliasMessage(oldIndexName, newIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.failures).add(eq(1.0), any()) } } @@ -128,15 +152,16 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { oldIndexName, "indexUuid", "policy_id", null, null, null, null, null, null, - null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null, rolledOverIndexName = newIndexName, ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasIndexNotFoundMessage(newIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.failures).add(eq(1.0), any()) } } @@ -153,15 +178,16 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { oldIndexName, "indexUuid", "policy_id", null, null, null, null, null, null, - null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null, rolledOverIndexName = null, ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasRolledOverIndexNotFoundMessage(oldIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.successes).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt index 6842b4dc4..a2437a67f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt @@ -11,6 +11,8 @@ import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,21 +20,41 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityAction import org.opensearch.indexmanagement.indexstatemanagement.step.indexpriority.AttemptSetIndexPriorityStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var setIndexPriorityActionMetrics: SetIndexPriorityActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + setIndexPriorityActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.SET_INDEX_PRIORITY) as SetIndexPriorityActionMetrics + } fun `test set priority step sets step status to completed when successful`() { val acknowledgedResponse = AcknowledgedResponse(true) @@ -40,10 +62,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } @@ -55,10 +77,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } @@ -70,10 +92,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) @@ -86,10 +108,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt index 30f2e6639..825695fe3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt @@ -8,8 +8,12 @@ package org.opensearch.indexmanagement.indexstatemanagement.step import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito.doAnswer import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient @@ -18,34 +22,56 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction import org.opensearch.indexmanagement.indexstatemanagement.step.replicacount.AttemptReplicaCountStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var replicaCountActionMetrics: ReplicaCountActionMetrics + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + replicaCountActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.REPLICA_COUNT) as ReplicaCountActionMetrics + } fun `test replica step sets step status to failed when not acknowledged`() { val replicaCountResponse = AcknowledgedResponse(false) val client = getClient(getAdminClient(getIndicesAdminClient(replicaCountResponse, null))) runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) - val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("replica_count", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) + val attemptReplicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - replicaCountStep.preExecute(logger, context).execute() - val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + attemptReplicaCountStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptReplicaCountStep, managedIndexMetaData) + val updatedManagedIndexMetaData = attemptReplicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(replicaCountActionMetrics.failures).add(eq(1.0), any()) } } @@ -55,12 +81,13 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) - val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("replica_count", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) + val attemptReplicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - replicaCountStep.preExecute(logger, context).execute() - val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + attemptReplicaCountStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptReplicaCountStep, managedIndexMetaData) + val updatedManagedIndexMetaData = attemptReplicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(replicaCountActionMetrics.failures).add(eq(1.0), any()) } } @@ -70,13 +97,14 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) - val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("replica_count", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) + val attemptReplicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - replicaCountStep.preExecute(logger, context).execute() - val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + attemptReplicaCountStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptReplicaCountStep, managedIndexMetaData) + val updatedManagedIndexMetaData = attemptReplicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(replicaCountActionMetrics.failures).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt index 2cc7040db..34f9c4fa6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt @@ -10,9 +10,13 @@ import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentMatchers +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -22,10 +26,13 @@ import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.randomSnapshotActionConfig import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -35,6 +42,9 @@ import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript import org.opensearch.snapshots.ConcurrentSnapshotExecutionException +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException @@ -43,13 +53,22 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val snapshotAction = randomSnapshotActionConfig("repo", "snapshot-name") - private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("snapshot", 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var snapshotActionMetrics: SnapshotActionMetrics @Before fun settings() { whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST))) whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(MockTemplateScript.Factory("snapshot-name")) + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + snapshotActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.SNAPSHOT) as SnapshotActionMetrics } fun `test snapshot response when block`() { @@ -60,27 +79,30 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(snapshotActionMetrics.successes).add(ArgumentMatchers.eq(1.0), any()) } whenever(response.status()).doReturn(RestStatus.OK) runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(snapshotActionMetrics.successes, Mockito.times(2)).add(ArgumentMatchers.eq(1.0), any()) } whenever(response.status()).doReturn(RestStatus.INTERNAL_SERVER_ERROR) runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(snapshotActionMetrics.failures).add(ArgumentMatchers.eq(1.0), any()) } } @@ -90,10 +112,11 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "example", updatedManagedIndexMetaData.info!!["cause"]) + verify(snapshotActionMetrics.failures).add(ArgumentMatchers.eq(1.0), any()) } } @@ -129,10 +152,11 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "some error", updatedManagedIndexMetaData.info!!["cause"]) + verify(snapshotActionMetrics.failures).add(ArgumentMatchers.eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt index 496c4f3b4..6f6d2cc16 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -9,9 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.rollover.RolloverInfo import org.opensearch.action.admin.indices.stats.CommonStats import org.opensearch.action.admin.indices.stats.IndicesStatsResponse @@ -28,17 +31,24 @@ import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.index.shard.DocsStats import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException import java.time.Instant @@ -46,6 +56,8 @@ import java.time.Instant class AttemptTransitionStepTests : OpenSearchTestCase() { private val indexName: String = "test" private val indexUUID: String = "indexUuid" + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var transitionActionMetrics: TransitionActionMetrics @Suppress("UNCHECKED_CAST") private val indexMetadata: IndexMetadata = @@ -70,6 +82,13 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { @Before fun `setup settings`() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + transitionActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.TRANSITION) as TransitionActionMetrics whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(ManagedIndexSettings.RESTRICTED_INDEX_PATTERN))) } @@ -83,14 +102,15 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings, lockService) - attemptTransitionStep.preExecute(logger, context).execute() + attemptTransitionStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptTransitionStep, managedIndexMetadata) val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage(indexName), updatedManagedIndexMetaData.info!!["message"]) + verify(transitionActionMetrics.failures).add(eq(1.0), any()) } } @@ -101,14 +121,15 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings, lockService) - attemptTransitionStep.preExecute(logger, context).execute() + attemptTransitionStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptTransitionStep, managedIndexMetadata) val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "example", updatedManagedIndexMetaData.info!!["cause"]) + verify(transitionActionMetrics.failures).add(eq(1.0), any()) } } @@ -119,14 +140,15 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings, lockService) - attemptTransitionStep.preExecute(logger, context).execute() + attemptTransitionStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptTransitionStep, managedIndexMetadata) val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(transitionActionMetrics.failures).add(eq(1.0), any()) } } @@ -134,7 +156,7 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, mock(), clusterService, mutableMapOf()) runBlocking { val completedStartTime = Instant.now() - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", null)), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) Thread.sleep(50) // Make sure we give enough time for the instants to be different diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt index a3a70ca6c..beed36762 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt @@ -9,8 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,32 +22,54 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.readonly.SetReadOnlyStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class SetReadOnlyStepTests : OpenSearchTestCase() { private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var setReadOnlyActionMetrics: SetReadOnlyActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + setReadOnlyActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.SET_READ_ONLY) as SetReadOnlyActionMetrics + } fun `test read only step sets step status to failed when not acknowledged`() { val setReadOnlyResponse = AcknowledgedResponse(false) val client = getClient(getAdminClient(getIndicesAdminClient(setReadOnlyResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_read_only", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - setReadOnlyStep.preExecute(logger, context).execute() + setReadOnlyStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, setReadOnlyStep, managedIndexMetaData) val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(setReadOnlyActionMetrics.failures).add(eq(1.0), any()) } } @@ -52,12 +78,13 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_read_only", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - setReadOnlyStep.preExecute(logger, context).execute() + setReadOnlyStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, setReadOnlyStep, managedIndexMetaData) val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(setReadOnlyActionMetrics.failures).add(eq(1.0), any()) } } @@ -66,13 +93,14 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_read_only", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - setReadOnlyStep.preExecute(logger, context).execute() + setReadOnlyStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, setReadOnlyStep, managedIndexMetaData) val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(setReadOnlyActionMetrics.failures).add(eq(1.0), any()) } }