From bce2eb9e77487b8d8d2f79b5e1665705c358e1d8 Mon Sep 17 00:00:00 2001 From: Harshita Kaushik <112249538+harshitakaushik-dev@users.noreply.github.com> Date: Thu, 4 Jul 2024 16:04:27 +0530 Subject: [PATCH] Index Management Action Metrics (#1195) * Initial integration of TelemetryAwarePlugin to ISM Signed-off-by: harycash * Initial integration of TelemetryAwarePlugin to ISM Signed-off-by: harycash * Initial integration of TelemetryAwarePlugin to ISM with Rollover Step Integration Signed-off-by: harycash * Initial integration of TelemetryAwarePlugin to ISM with Rollover Step Integration Signed-off-by: harycash * Additional actions metrics with requested changes from previous commit Signed-off-by: harycash * Fixed Build Issues Signed-off-by: harycash * Fixed Build Issues Signed-off-by: harycash * Fixed Build Issues, Added new metric : Cumulative Latency Signed-off-by: harycash * Fixed Build Issues, Added new metric : Cumulative Latency Signed-off-by: harycash * Requested Changes and Addition of Metrics to all the remaining Actions Signed-off-by: harycash * Updates on Action Metrics Signed-off-by: harycash * Updates on Action Metrics Signed-off-by: harycash * Build issues fixed Signed-off-by: harycash * Build issues fixed Signed-off-by: harycash --------- Signed-off-by: harycash Co-authored-by: harycash (cherry picked from commit c5425a52e6e7d5fd0f87bf770436527c8911ee08) Signed-off-by: harycash --- .../indexstatemanagement/Step.kt | 100 ++++++++++++++- .../metrics/IndexManagementActionsMetrics.kt | 116 ++++++++++++++++++ .../actionmetrics/AliasActionMetrics.kt | 53 ++++++++ .../actionmetrics/AllocationActionMetrics.kt | 53 ++++++++ .../actionmetrics/CloseActionMetrics.kt | 53 ++++++++ .../actionmetrics/DeleteActionMetrics.kt | 53 ++++++++ .../actionmetrics/ForceMergeActionMetrics.kt | 53 ++++++++ .../actionmetrics/MoveShardActionMetrics.kt | 53 ++++++++ .../NotificationActionMetrics.kt | 53 ++++++++ .../actionmetrics/OpenActionMetrics.kt | 53 ++++++++ .../ReplicaCountActionMetrics.kt | 53 ++++++++ .../actionmetrics/RolloverActionMetrics.kt | 53 ++++++++ .../SetIndexPriorityActionMetrics.kt | 53 ++++++++ .../actionmetrics/SetReadOnlyActionMetrics.kt | 53 ++++++++ .../actionmetrics/ShrinkActionMetrics.kt | 53 ++++++++ .../actionmetrics/SnapshotActionMetrics.kt | 53 ++++++++ .../actionmetrics/TransitionActionMetrics.kt | 53 ++++++++ .../indexmanagement/IndexManagementPlugin.kt | 44 ++++--- .../ManagedIndexRunner.kt | 10 +- .../step/AttemptCloseStepTests.kt | 47 +++++-- .../step/AttemptDeleteStepTests.kt | 47 +++++-- .../step/AttemptOpenStepTests.kt | 39 +++++- .../step/AttemptRolloverStepTests.kt | 106 ++++++++++------ .../step/AttemptSetIndexPriorityStepTests.kt | 38 ++++-- .../step/AttemptSetReplicaCountStepTests.kt | 52 ++++++-- .../step/AttemptSnapshotStepTests.kt | 36 +++++- .../step/AttemptTransitionStepTests.kt | 36 ++++-- .../step/SetReadOnlyStepTests.kt | 40 +++++- 28 files changed, 1389 insertions(+), 117 deletions(-) create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt index 9ff5c9fcb..ce84bb195 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData @@ -28,12 +29,109 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) { abstract suspend fun execute(): Step - fun postExecute(logger: Logger): Step { + fun postExecute( + logger: Logger, + indexManagementActionMetrics: IndexManagementActionsMetrics, + step: Step, + startingManagedIndexMetaData: ManagedIndexMetaData, + ): Step { logger.info("Finished executing $name for ${context?.metadata?.index}") + val updatedStepMetaData = step.getUpdatedManagedIndexMetadata(startingManagedIndexMetaData) + emitTelemetry(indexManagementActionMetrics, updatedStepMetaData, logger) this.context = null return this } + private fun emitTelemetry( + indexManagementActionMetrics: IndexManagementActionsMetrics, + updatedStepMetaData: ManagedIndexMetaData, + logger: Logger, + ) { + when (context?.metadata?.actionMetaData?.name) { + IndexManagementActionsMetrics.ROLLOVER -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.ROLLOVER, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.FORCE_MERGE -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.FORCE_MERGE, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.DELETE -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.DELETE, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.REPLICA_COUNT -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.REPLICA_COUNT, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.TRANSITION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.TRANSITION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.NOTIFICATION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.NOTIFICATION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.CLOSE -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.CLOSE, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SET_INDEX_PRIORITY -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SET_INDEX_PRIORITY, // problem in test + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.OPEN -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.OPEN, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.MOVE_SHARD -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.MOVE_SHARD, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SET_READ_ONLY -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SET_READ_ONLY, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SHRINK -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SHRINK, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.SNAPSHOT -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.SNAPSHOT, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.ALIAS_ACTION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.ALIAS_ACTION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + IndexManagementActionsMetrics.ALLOCATION -> indexManagementActionMetrics.getActionMetrics( + IndexManagementActionsMetrics.ALLOCATION, + ) + ?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData) + + else -> { + logger.info( + "Action Metrics is not supported for this action [%s]", + context?.metadata?.actionMetaData?.name, + ) + } + } + } + abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData abstract fun isIdempotent(): Boolean diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt new file mode 100644 index 000000000..725893a21 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/IndexManagementActionsMetrics.kt @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AliasActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AllocationActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.MoveShardActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ShrinkActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags + +abstract class ActionMetrics { + abstract val actionName: String + + fun createTags(context: StepContext): Tags { + val tags = Tags.create() + .addTag("index_name", context.metadata.index) + .addTag("policy_id", context.metadata.policyID) + .addTag("node_id", context.clusterService.nodeName ?: "") + .addTag("index_uuid", context.metadata.indexUuid) + return tags + } + + abstract fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) +} + +class IndexManagementActionsMetrics private constructor() { + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var actionMetricsMap: Map + + companion object { + val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance } + + const val ROLLOVER = "rollover" + const val NOTIFICATION = "notification" + const val FORCE_MERGE = "force_merge" + const val DELETE = "delete" + const val REPLICA_COUNT = "replica_count" + const val TRANSITION = "transition" + const val CLOSE = "close" + const val SET_INDEX_PRIORITY = "set_index_priority" + const val OPEN = "open" + const val MOVE_SHARD = "move_shard" + const val SET_READ_ONLY = "set_read_only" + const val SHRINK = "shrink" + const val SNAPSHOT = "snapshot" + const val ALIAS_ACTION = "alias_action" + const val ALLOCATION = "allocation" + + private object HOLDER { + val instance = IndexManagementActionsMetrics() + } + } + + fun initialize(metricsRegistry: MetricsRegistry) { + this.metricsRegistry = metricsRegistry + + RolloverActionMetrics.instance.initializeCounters(metricsRegistry) + NotificationActionMetrics.instance.initializeCounters(metricsRegistry) + ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry) + DeleteActionMetrics.instance.initializeCounters(metricsRegistry) + ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry) + TransitionActionMetrics.instance.initializeCounters(metricsRegistry) + CloseActionMetrics.instance.initializeCounters(metricsRegistry) + SetIndexPriorityActionMetrics.instance.initializeCounters(metricsRegistry) + OpenActionMetrics.instance.initializeCounters(metricsRegistry) + MoveShardActionMetrics.instance.initializeCounters(metricsRegistry) + SetReadOnlyActionMetrics.instance.initializeCounters(metricsRegistry) + ShrinkActionMetrics.instance.initializeCounters(metricsRegistry) + SnapshotActionMetrics.instance.initializeCounters(metricsRegistry) + AliasActionMetrics.instance.initializeCounters(metricsRegistry) + AllocationActionMetrics.instance.initializeCounters(metricsRegistry) + + actionMetricsMap = mapOf( + ROLLOVER to RolloverActionMetrics.instance, + NOTIFICATION to NotificationActionMetrics.instance, + FORCE_MERGE to ForceMergeActionMetrics.instance, + DELETE to DeleteActionMetrics.instance, + REPLICA_COUNT to ReplicaCountActionMetrics.instance, + TRANSITION to TransitionActionMetrics.instance, + CLOSE to CloseActionMetrics.instance, + SET_INDEX_PRIORITY to SetIndexPriorityActionMetrics.instance, + OPEN to OpenActionMetrics.instance, + MOVE_SHARD to MoveShardActionMetrics.instance, + SET_READ_ONLY to SetReadOnlyActionMetrics.instance, + SHRINK to ShrinkActionMetrics.instance, + SNAPSHOT to SnapshotActionMetrics.instance, + ALIAS_ACTION to AliasActionMetrics.instance, + ALLOCATION to AllocationActionMetrics.instance, + ) + } + + fun getActionMetrics(actionName: String): ActionMetrics? { + return actionMetricsMap[actionName] + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt new file mode 100644 index 000000000..5652a292e --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AliasActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class AliasActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.ALIAS_ACTION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Alias Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Alias Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Alias Actions", "milliseconds") + } + + companion object { + val instance: AliasActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = AliasActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val aliasActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALIAS_ACTION) as AliasActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + aliasActionMetrics.successes.add(1.0, context.let { aliasActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + aliasActionMetrics.failures.add(1.0, context.let { aliasActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + aliasActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { aliasActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt new file mode 100644 index 000000000..363fde91d --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/AllocationActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class AllocationActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.ALLOCATION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Allocation Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Allocation Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Allocation Actions", "milliseconds") + } + + companion object { + val instance: AllocationActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = AllocationActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val allocationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALLOCATION) as AllocationActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + allocationActionMetrics.successes.add(1.0, context.let { allocationActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + allocationActionMetrics.failures.add(1.0, context.let { allocationActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + allocationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { allocationActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt new file mode 100644 index 000000000..1f301872d --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/CloseActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class CloseActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.CLOSE + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Close Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Close Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Close Actions", "milliseconds") + } + + companion object { + val instance: CloseActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = CloseActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val closeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + closeActionMetrics.successes.add(1.0, context.let { closeActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + closeActionMetrics.failures.add(1.0, context.let { closeActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + closeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { closeActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt new file mode 100644 index 000000000..b8d236364 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/DeleteActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class DeleteActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.DELETE + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Delete Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Delete Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Delete Action", "milliseconds") + } + + companion object { + val instance: DeleteActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = DeleteActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val deleteActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.DELETE) as DeleteActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + deleteActionMetrics.successes.add(1.0, context.let { deleteActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + deleteActionMetrics.failures.add(1.0, context.let { deleteActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + deleteActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { deleteActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt new file mode 100644 index 000000000..5b7858ee1 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ForceMergeActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class ForceMergeActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.FORCE_MERGE + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Force Merge Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Force Merge Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Force Merge Action", "milliseconds") + } + + companion object { + val instance: ForceMergeActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = ForceMergeActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val forceMergeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.FORCE_MERGE) as ForceMergeActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + forceMergeActionMetrics.successes.add(1.0, context.let { forceMergeActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + forceMergeActionMetrics.failures.add(1.0, context.let { forceMergeActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + forceMergeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { forceMergeActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt new file mode 100644 index 000000000..2f86100d9 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/MoveShardActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class MoveShardActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.MOVE_SHARD + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Move Shard Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Move Shard Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Move Shard Actions", "milliseconds") + } + + companion object { + val instance: MoveShardActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = MoveShardActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val moveShardActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.MOVE_SHARD) as MoveShardActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + moveShardActionMetrics.successes.add(1.0, context.let { moveShardActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + moveShardActionMetrics.failures.add(1.0, context.let { moveShardActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + moveShardActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { moveShardActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt new file mode 100644 index 000000000..0faa9b9fd --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/NotificationActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class NotificationActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.NOTIFICATION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Notification Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Notification Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Notification Action", "milliseconds") + } + + companion object { + val instance: NotificationActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = NotificationActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val notificationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.NOTIFICATION) as NotificationActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + notificationActionMetrics.successes.add(1.0, context.let { notificationActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + notificationActionMetrics.failures.add(1.0, context.let { notificationActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + notificationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { notificationActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt new file mode 100644 index 000000000..ad29a5f8a --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/OpenActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class OpenActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.OPEN + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Open Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Open Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Open Actions", "milliseconds") + } + + companion object { + val instance: OpenActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = OpenActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val openActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.OPEN) as OpenActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + openActionMetrics.successes.add(1.0, context.let { openActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + openActionMetrics.failures.add(1.0, context.let { openActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + openActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { openActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt new file mode 100644 index 000000000..94e60b892 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ReplicaCountActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class ReplicaCountActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.REPLICA_COUNT + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Replica Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Replica Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Replica Count Action", "milliseconds") + } + + companion object { + val instance: ReplicaCountActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = ReplicaCountActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val replicaCountActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.REPLICA_COUNT) as ReplicaCountActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + replicaCountActionMetrics.successes.add(1.0, context.let { replicaCountActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + replicaCountActionMetrics.failures.add(1.0, context.let { replicaCountActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + replicaCountActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { replicaCountActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt new file mode 100644 index 000000000..3d2965702 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/RolloverActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class RolloverActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.ROLLOVER + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Rollover Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Rollover Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Rollover Actions", "milliseconds") + } + + companion object { + val instance: RolloverActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = RolloverActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val rolloverActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ROLLOVER) as RolloverActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + rolloverActionMetrics.successes.add(1.0, context.let { rolloverActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + rolloverActionMetrics.failures.add(1.0, context.let { rolloverActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + rolloverActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { rolloverActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt new file mode 100644 index 000000000..023461829 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetIndexPriorityActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class SetIndexPriorityActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SET_INDEX_PRIORITY + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Set Index Priority Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Set Index Priority Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Set Index Priority Actions", "milliseconds") + } + + companion object { + val instance: SetIndexPriorityActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = SetIndexPriorityActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val setIndexPriorityActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SET_INDEX_PRIORITY) as SetIndexPriorityActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + setIndexPriorityActionMetrics.successes.add(1.0, context.let { setIndexPriorityActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + setIndexPriorityActionMetrics.failures.add(1.0, context.let { setIndexPriorityActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + setIndexPriorityActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { setIndexPriorityActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt new file mode 100644 index 000000000..a3a8b4737 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SetReadOnlyActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class SetReadOnlyActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SET_READ_ONLY + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Set Read Only Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Set Read Only Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Set Read Only Actions", "milliseconds") + } + + companion object { + val instance: SetReadOnlyActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = SetReadOnlyActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val setReadOnlyActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SET_READ_ONLY) as SetReadOnlyActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + setReadOnlyActionMetrics.successes.add(1.0, context.let { setReadOnlyActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + setReadOnlyActionMetrics.failures.add(1.0, context.let { setReadOnlyActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + setReadOnlyActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { setReadOnlyActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt new file mode 100644 index 000000000..704feca3c --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/ShrinkActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class ShrinkActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SHRINK + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Shrink Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Shrink Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Shrink Actions", "milliseconds") + } + + companion object { + val instance: ShrinkActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = ShrinkActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val shrinkActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SHRINK) as ShrinkActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + shrinkActionMetrics.successes.add(1.0, context.let { shrinkActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + shrinkActionMetrics.failures.add(1.0, context.let { shrinkActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + shrinkActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { shrinkActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt new file mode 100644 index 000000000..10d74e195 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/SnapshotActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class SnapshotActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.SNAPSHOT + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Snapshot Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Snapshot Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Snapshot Actions", "milliseconds") + } + + companion object { + val instance: SnapshotActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = SnapshotActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val snapshotActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.SNAPSHOT) as SnapshotActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + snapshotActionMetrics.successes.add(1.0, context.let { snapshotActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + snapshotActionMetrics.failures.add(1.0, context.let { snapshotActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + snapshotActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { snapshotActionMetrics.createTags(it) }) + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt new file mode 100644 index 000000000..8049ab25e --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/metrics/actionmetrics/TransitionActionMetrics.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics + +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry + +class TransitionActionMetrics private constructor() : ActionMetrics() { + override val actionName: String = IndexManagementActionsMetrics.TRANSITION + lateinit var successes: Counter + lateinit var failures: Counter + lateinit var cumulativeLatency: Counter + + fun initializeCounters(metricsRegistry: MetricsRegistry) { + successes = metricsRegistry.createCounter("${actionName}_successes", "Transition Action Successes", "count") + failures = metricsRegistry.createCounter("${actionName}_failures", "Transition Action Failures", "count") + cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Transition Actions", "milliseconds") + } + + companion object { + val instance: TransitionActionMetrics by lazy { HOLDER.instance } + } + + private object HOLDER { + val instance = TransitionActionMetrics() + } + + override fun emitMetrics( + context: StepContext, + indexManagementActionsMetrics: IndexManagementActionsMetrics, + stepMetaData: StepMetaData?, + ) { + val transitionActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.TRANSITION) as TransitionActionMetrics + val stepStatus = stepMetaData?.stepStatus + if (stepStatus == StepStatus.COMPLETED) { + transitionActionMetrics.successes.add(1.0, context.let { transitionActionMetrics.createTags(it) }) + } + if (stepStatus == StepStatus.FAILED) { + transitionActionMetrics.failures.add(1.0, context.let { transitionActionMetrics.createTags(it) }) + } + val endTime = System.currentTimeMillis() + val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime) + transitionActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { transitionActionMetrics.createTags(it) }) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index b6b631573..93e42aa29 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -149,6 +149,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManage import org.opensearch.indexmanagement.spi.IndexManagementExtension import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.transform.TargetIndexMappingService import org.opensearch.indexmanagement.transform.TransformRunner @@ -188,10 +189,13 @@ import org.opensearch.plugins.ExtensiblePlugin import org.opensearch.plugins.NetworkPlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.SystemIndexPlugin +import org.opensearch.plugins.TelemetryAwarePlugin import org.opensearch.repositories.RepositoriesService import org.opensearch.rest.RestController import org.opensearch.rest.RestHandler import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.tracing.Tracer import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.RemoteClusterService import org.opensearch.transport.TransportInterceptor @@ -200,8 +204,8 @@ import org.opensearch.watcher.ResourceWatcherService import java.util.function.Supplier @Suppress("TooManyFunctions") -class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() { - +class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, + TelemetryAwarePlugin, Plugin() { private val logger = LogManager.getLogger(javaClass) lateinit var indexManagementIndices: IndexManagementIndices lateinit var actionValidation: ActionValidation @@ -215,6 +219,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin private val extensions = mutableSetOf() private val extensionCheckerMap = mutableMapOf() lateinit var indexOperationActionFilter: IndexOperationActionFilter + private lateinit var metricsRegistry: MetricsRegistry companion object { const val PLUGINS_BASE_URI = "/_plugins" @@ -379,8 +384,11 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin namedWriteableRegistry: NamedWriteableRegistry, indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier, + tracer: Tracer, + metricsRegistry: MetricsRegistry, ): Collection { val settings = environment.settings() + this.metricsRegistry = metricsRegistry this.clusterService = clusterService QueryShardContextFactory.init( client, @@ -390,6 +398,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin namedWriteableRegistry, environment, ) + + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) val jvmService = JvmService(environment.settings()) val transformRunner = TransformRunner.initialize( @@ -440,20 +450,22 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin indexMetadataServices.forEach { indexMetadataProvider.addMetadataServices(it) } val extensionChecker = ExtensionStatusChecker(extensionCheckerMap, clusterService) - val managedIndexRunner = ManagedIndexRunner - .registerClient(client) - .registerClusterService(clusterService) - .registerValidationService(actionValidation) - .registerNamedXContentRegistry(xContentRegistry) - .registerScriptService(scriptService) - .registerSettings(settings) - .registerConsumers() // registerConsumers must happen after registerSettings/clusterService - .registerIMIndex(indexManagementIndices) - .registerHistoryIndex(indexStateManagementHistory) - .registerSkipFlag(skipFlag) - .registerThreadPool(threadPool) - .registerExtensionChecker(extensionChecker) - .registerIndexMetadataProvider(indexMetadataProvider) + val managedIndexRunner = + ManagedIndexRunner + .registerClient(client) + .registerClusterService(clusterService) + .registerValidationService(actionValidation) + .registerNamedXContentRegistry(xContentRegistry) + .registerScriptService(scriptService) + .registerSettings(settings) + .registerConsumers() // registerConsumers must happen after registerSettings/clusterService + .registerIMIndex(indexManagementIndices) + .registerHistoryIndex(indexStateManagementHistory) + .registerSkipFlag(skipFlag) + .registerThreadPool(threadPool) + .registerExtensionChecker(extensionChecker) + .registerIndexMetadataProvider(indexMetadataProvider) + .registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices) val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index d0a3e0269..b83fa43c3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -88,6 +88,7 @@ import org.opensearch.indexmanagement.opensearchapi.withClosableContext import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData @@ -124,6 +125,7 @@ object ManagedIndexRunner : private lateinit var skipExecFlag: SkipExecution private lateinit var threadPool: ThreadPool private lateinit var extensionStatusChecker: ExtensionStatusChecker + lateinit var indexManagementActionMetrics: IndexManagementActionsMetrics private lateinit var indexMetadataProvider: IndexMetadataProvider private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED private var validationServiceEnabled: Boolean = DEFAULT_ACTION_VALIDATION_ENABLED @@ -224,6 +226,11 @@ object ManagedIndexRunner : return this } + fun registerIndexManagementActionMetrics(indexManagementActionsMetrics: IndexManagementActionsMetrics): Any { + this.indexManagementActionMetrics = indexManagementActionsMetrics + return this + } + override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { if (job !is ManagedIndexConfig) { throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}") @@ -451,7 +458,8 @@ object ManagedIndexRunner : managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user, ), ) { - step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger) + step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute() + .postExecute(logger, indexManagementActionMetrics, step, startingManagedIndexMetaData) } var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt index 657284e61..2ca96bdee 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -9,8 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.close.CloseIndexResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,15 +22,23 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.close.AttemptCloseStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant import kotlin.IllegalArgumentException class AttemptCloseStepTests : OpenSearchTestCase() { @@ -35,18 +47,32 @@ class AttemptCloseStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var closeActionMetrics: CloseActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + closeActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics + } fun `test close step sets step status to completed when successful`() { val closeIndexResponse = CloseIndexResponse(true, true, listOf()) val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(closeActionMetrics.successes).add(eq(1.0), any()) } } @@ -55,12 +81,13 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(closeActionMetrics.failures).add(eq(1.0), any()) } } @@ -69,12 +96,13 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(closeActionMetrics.failures).add(eq(1.0), any()) } } @@ -86,7 +114,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } @@ -111,13 +139,14 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("close", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptCloseStep.preExecute(logger, context).execute() + attemptCloseStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptCloseStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(closeActionMetrics.failures).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt index 7574d89fa..15934e243 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt @@ -8,8 +8,12 @@ package org.opensearch.indexmanagement.indexstatemanagement.step import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito.doAnswer import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient @@ -18,14 +22,22 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.delete.AttemptDeleteStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase +import java.time.Instant class AttemptDeleteStepTests : OpenSearchTestCase() { @@ -33,18 +45,35 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var deleteActionMetrics: DeleteActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + deleteActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.DELETE) as DeleteActionMetrics + } fun `test delete step sets step status to completed when successful`() { val acknowledgedResponse = AcknowledgedResponse(true) val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData( + "test", "indexUuid", "policy_id", null, null, null, null, null, null, null, + ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null, + ) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(deleteActionMetrics.successes).add(eq(1.0), any()) } } @@ -53,12 +82,13 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(deleteActionMetrics.failures).add(eq(1.0), any()) } } @@ -67,13 +97,14 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(deleteActionMetrics.failures).add(eq(1.0), any()) } } @@ -82,10 +113,10 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("delete", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptDeleteStep.preExecute(logger, context).execute() + attemptDeleteStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptDeleteStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt index c421e3be6..abfe4ff16 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt @@ -9,8 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.open.OpenIndexResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,14 +22,22 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class AttemptOpenStepTests : OpenSearchTestCase() { @@ -33,18 +45,31 @@ class AttemptOpenStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var openActionMetrics: OpenActionMetrics + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + openActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.OPEN) as OpenActionMetrics + } fun `test open step sets step status to failed when not acknowledged`() { val openIndexResponse = OpenIndexResponse(false, false) val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("open", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptOpenStep.preExecute(logger, context).execute() + attemptOpenStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptOpenStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(openActionMetrics.failures).add(eq(1.0), any()) } } @@ -53,12 +78,13 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("open", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptOpenStep.preExecute(logger, context).execute() + attemptOpenStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptOpenStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(openActionMetrics.failures).add(eq(1.0), any()) } } @@ -67,13 +93,14 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("open", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptOpenStep.preExecute(logger, context).execute() + attemptOpenStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptOpenStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(openActionMetrics.failures).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt index 056030162..b720614d6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt @@ -9,9 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient @@ -24,18 +27,25 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase +import java.time.Instant class AttemptRolloverStepTests : OpenSearchTestCase() { - private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY @@ -43,17 +53,29 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { private val oldIndexName = "old_index" private val newIndexName = "new_index" val alias = "alias" + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var rolloverActionMetrics: RolloverActionMetrics @Before fun setup() { + // Setup metrics + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + rolloverActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.ROLLOVER) as RolloverActionMetrics + // mock rollover target val clusterState: ClusterState = mock() val metadata: Metadata = mock() val indexMetadata: IndexMetadata = mock() - val settings = Settings.builder() - .put(ManagedIndexSettings.ROLLOVER_ALIAS.key, alias) - .put(ManagedIndexSettings.ROLLOVER_SKIP.key, false) - .build() + val settings = + Settings.builder() + .put(ManagedIndexSettings.ROLLOVER_ALIAS.key, alias) + .put(ManagedIndexSettings.ROLLOVER_SKIP.key, false) + .build() whenever(clusterService.state()).thenReturn(clusterState) whenever(clusterState.metadata()).thenReturn(metadata) whenever(clusterState.metadata).thenReturn(metadata) @@ -73,19 +95,21 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { runBlocking { val rolloverAction = RolloverAction(null, null, null, null, true, 0) - val managedIndexMetaData = ManagedIndexMetaData( - oldIndexName, "indexUuid", "policy_id", - null, null, null, - null, null, null, - null, null, null, - null, null, rolledOverIndexName = newIndexName, - ) + val managedIndexMetaData = + ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, + null, null, rolledOverIndexName = newIndexName, + ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasNotAckMessage(oldIndexName, newIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.failures).add(eq(1.0), any()) } } @@ -97,19 +121,21 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { runBlocking { val rolloverAction = RolloverAction(null, null, null, null, true, 0) - val managedIndexMetaData = ManagedIndexMetaData( - oldIndexName, "indexUuid", "policy_id", - null, null, null, - null, null, null, - null, null, null, - null, null, rolledOverIndexName = newIndexName, - ) + val managedIndexMetaData = + ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, + null, null, rolledOverIndexName = newIndexName, + ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getFailedCopyAliasMessage(oldIndexName, newIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.failures).add(eq(1.0), any()) } } @@ -121,19 +147,21 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { runBlocking { val rolloverAction = RolloverAction(null, null, null, null, true, 0) - val managedIndexMetaData = ManagedIndexMetaData( - oldIndexName, "indexUuid", "policy_id", - null, null, null, - null, null, null, - null, null, null, - null, null, rolledOverIndexName = newIndexName, - ) + val managedIndexMetaData = + ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, + null, null, rolledOverIndexName = newIndexName, + ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasIndexNotFoundMessage(newIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.failures).add(eq(1.0), any()) } } @@ -145,24 +173,28 @@ class AttemptRolloverStepTests : OpenSearchTestCase() { runBlocking { val rolloverAction = RolloverAction(null, null, null, null, true, 0) - val managedIndexMetaData = ManagedIndexMetaData( - oldIndexName, "indexUuid", "policy_id", - null, null, null, - null, null, null, - null, null, null, - null, null, rolledOverIndexName = null, - ) + val managedIndexMetaData = + ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, ActionMetaData("rollover", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, + null, null, rolledOverIndexName = null, + ) val attemptRolloverStep = AttemptRolloverStep(rolloverAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptRolloverStep.preExecute(logger, context).execute() + attemptRolloverStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptRolloverStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasRolledOverIndexNotFoundMessage(oldIndexName), updatedManagedIndexMetaData.info?.get("message")) + verify(rolloverActionMetrics.successes).add(eq(1.0), any()) } } private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient( rolloverResponse: RolloverResponse?, aliasResponse: AcknowledgedResponse?, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt index 4397b685f..64fc4e7ec 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt @@ -11,6 +11,8 @@ import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,15 +20,22 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityAction import org.opensearch.indexmanagement.indexstatemanagement.step.indexpriority.AttemptSetIndexPriorityStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { @@ -34,6 +43,19 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var setIndexPriorityActionMetrics: SetIndexPriorityActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + setIndexPriorityActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.SET_INDEX_PRIORITY) as SetIndexPriorityActionMetrics + } fun `test set priority step sets step status to completed when successful`() { val acknowledgedResponse = AcknowledgedResponse(true) @@ -41,10 +63,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } @@ -56,10 +78,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } @@ -71,10 +93,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) @@ -87,10 +109,10 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_index_priority", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - attemptSetPriorityStep.preExecute(logger, context).execute() + attemptSetPriorityStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptSetPriorityStep, managedIndexMetaData) val updatedManagedIndexMetaData = attemptSetPriorityStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt index 093500a54..d7c7b98d4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt @@ -8,8 +8,12 @@ package org.opensearch.indexmanagement.indexstatemanagement.step import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito.doAnswer import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient @@ -18,15 +22,23 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction import org.opensearch.indexmanagement.indexstatemanagement.step.replicacount.AttemptReplicaCountStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { @@ -34,19 +46,33 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var replicaCountActionMetrics: ReplicaCountActionMetrics + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + replicaCountActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.REPLICA_COUNT) as ReplicaCountActionMetrics + } fun `test replica step sets step status to failed when not acknowledged`() { val replicaCountResponse = AcknowledgedResponse(false) val client = getClient(getAdminClient(getIndicesAdminClient(replicaCountResponse, null))) runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) - val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("replica_count", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) + val attemptReplicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - replicaCountStep.preExecute(logger, context).execute() - val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + attemptReplicaCountStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptReplicaCountStep, managedIndexMetaData) + val updatedManagedIndexMetaData = attemptReplicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(replicaCountActionMetrics.failures).add(eq(1.0), any()) } } @@ -56,12 +82,13 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) - val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("replica_count", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) + val attemptReplicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - replicaCountStep.preExecute(logger, context).execute() - val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + attemptReplicaCountStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptReplicaCountStep, managedIndexMetaData) + val updatedManagedIndexMetaData = attemptReplicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(replicaCountActionMetrics.failures).add(eq(1.0), any()) } } @@ -71,13 +98,14 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) - val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("replica_count", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) + val attemptReplicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - replicaCountStep.preExecute(logger, context).execute() - val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + attemptReplicaCountStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptReplicaCountStep, managedIndexMetaData) + val updatedManagedIndexMetaData = attemptReplicaCountStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(replicaCountActionMetrics.failures).add(eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt index 27a17032c..e89bc8fc4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt @@ -10,9 +10,13 @@ import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentMatchers +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -22,10 +26,13 @@ import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.randomSnapshotActionConfig import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -35,6 +42,9 @@ import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript import org.opensearch.snapshots.ConcurrentSnapshotExecutionException +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException @@ -44,13 +54,22 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val snapshotAction = randomSnapshotActionConfig("repo", "snapshot-name") - private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("snapshot", 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var snapshotActionMetrics: SnapshotActionMetrics @Before fun settings() { whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST))) whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(MockTemplateScript.Factory("snapshot-name")) + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + snapshotActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.SNAPSHOT) as SnapshotActionMetrics } fun `test snapshot response when block`() { @@ -61,27 +80,30 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(snapshotActionMetrics.successes).add(ArgumentMatchers.eq(1.0), any()) } whenever(response.status()).doReturn(RestStatus.OK) runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(snapshotActionMetrics.successes, Mockito.times(2)).add(ArgumentMatchers.eq(1.0), any()) } whenever(response.status()).doReturn(RestStatus.INTERNAL_SERVER_ERROR) runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(snapshotActionMetrics.failures).add(ArgumentMatchers.eq(1.0), any()) } } @@ -91,10 +113,11 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "example", updatedManagedIndexMetaData.info!!["cause"]) + verify(snapshotActionMetrics.failures).add(ArgumentMatchers.eq(1.0), any()) } } @@ -130,10 +153,11 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { runBlocking { val step = AttemptSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - step.preExecute(logger, context).execute() + step.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, step, metadata) val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "some error", updatedManagedIndexMetaData.info!!["cause"]) + verify(snapshotActionMetrics.failures).add(ArgumentMatchers.eq(1.0), any()) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt index 135f09faa..9db0035f3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -9,9 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.admin.indices.rollover.RolloverInfo import org.opensearch.action.admin.indices.stats.CommonStats import org.opensearch.action.admin.indices.stats.IndicesStatsResponse @@ -28,17 +31,24 @@ import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.index.shard.DocsStats import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException import java.time.Instant @@ -47,6 +57,8 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { private val indexName: String = "test" private val indexUUID: String = "indexUuid" + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var transitionActionMetrics: TransitionActionMetrics @Suppress("UNCHECKED_CAST") private val indexMetadata: IndexMetadata = mock { @@ -69,6 +81,13 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { @Before fun `setup settings`() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + transitionActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.TRANSITION) as TransitionActionMetrics whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(ManagedIndexSettings.RESTRICTED_INDEX_PATTERN))) } @@ -82,14 +101,15 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings, lockService) - attemptTransitionStep.preExecute(logger, context).execute() + attemptTransitionStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptTransitionStep, managedIndexMetadata) val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage(indexName), updatedManagedIndexMetaData.info!!["message"]) + verify(transitionActionMetrics.failures).add(eq(1.0), any()) } } @@ -100,14 +120,15 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings, lockService) - attemptTransitionStep.preExecute(logger, context).execute() + attemptTransitionStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptTransitionStep, managedIndexMetadata) val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "example", updatedManagedIndexMetaData.info!!["cause"]) + verify(transitionActionMetrics.failures).add(eq(1.0), any()) } } @@ -118,14 +139,15 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings, lockService) - attemptTransitionStep.preExecute(logger, context).execute() + attemptTransitionStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, attemptTransitionStep, managedIndexMetadata) val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(transitionActionMetrics.failures).add(eq(1.0), any()) } } @@ -133,7 +155,7 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { val indexMetadataProvider = IndexMetadataProvider(settings, mock(), clusterService, mutableMapOf()) runBlocking { val completedStartTime = Instant.now() - val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, ActionMetaData("transition", Instant.now().toEpochMilli(), 0, false, 1, null, null), StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) val transitionsAction = TransitionsAction(listOf(Transition("some_state", null)), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) Thread.sleep(50) // Make sure we give enough time for the instants to be different diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt index cbfafc407..8ab9939cf 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt @@ -9,8 +9,12 @@ import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.mockito.ArgumentMatchers.eq import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client @@ -18,14 +22,22 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.step.readonly.SetReadOnlyStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService +import org.opensearch.telemetry.metrics.Counter +import org.opensearch.telemetry.metrics.MetricsRegistry +import org.opensearch.telemetry.metrics.tags.Tags import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException +import java.time.Instant class SetReadOnlyStepTests : OpenSearchTestCase() { @@ -33,18 +45,32 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val lockService: LockService = LockService(mock(), clusterService) + private lateinit var metricsRegistry: MetricsRegistry + private lateinit var setReadOnlyActionMetrics: SetReadOnlyActionMetrics + + @Before + fun setup() { + metricsRegistry = mock() + whenever(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer { + mock() + } + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) + ManagedIndexRunner.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance) + setReadOnlyActionMetrics = IndexManagementActionsMetrics.instance.getActionMetrics(IndexManagementActionsMetrics.SET_READ_ONLY) as SetReadOnlyActionMetrics + } fun `test read only step sets step status to failed when not acknowledged`() { val setReadOnlyResponse = AcknowledgedResponse(false) val client = getClient(getAdminClient(getIndicesAdminClient(setReadOnlyResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_read_only", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - setReadOnlyStep.preExecute(logger, context).execute() + setReadOnlyStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, setReadOnlyStep, managedIndexMetaData) val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(setReadOnlyActionMetrics.failures).add(eq(1.0), any()) } } @@ -53,12 +79,13 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_read_only", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - setReadOnlyStep.preExecute(logger, context).execute() + setReadOnlyStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, setReadOnlyStep, managedIndexMetaData) val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + verify(setReadOnlyActionMetrics.failures).add(eq(1.0), any()) } } @@ -67,13 +94,14 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData("set_read_only", Instant.now().toEpochMilli(), 0, false, 1, null, null), null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) - setReadOnlyStep.preExecute(logger, context).execute() + setReadOnlyStep.preExecute(logger, context).execute().postExecute(logger, IndexManagementActionsMetrics.instance, setReadOnlyStep, managedIndexMetaData) val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + verify(setReadOnlyActionMetrics.failures).add(eq(1.0), any()) } }