Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index Management Action Metrics #1195

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
Expand All @@ -27,12 +28,109 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
fun postExecute(
logger: Logger,
indexManagementActionMetrics: IndexManagementActionsMetrics,
step: Step,
startingManagedIndexMetaData: ManagedIndexMetaData,
): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
val updatedStepMetaData = step.getUpdatedManagedIndexMetadata(startingManagedIndexMetaData)
emitTelemetry(indexManagementActionMetrics, updatedStepMetaData, logger)
this.context = null
return this
}

private fun emitTelemetry(
indexManagementActionMetrics: IndexManagementActionsMetrics,
updatedStepMetaData: ManagedIndexMetaData,
logger: Logger,
) {
when (context?.metadata?.actionMetaData?.name) {
r1walz marked this conversation as resolved.
Show resolved Hide resolved
IndexManagementActionsMetrics.ROLLOVER -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ROLLOVER,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.FORCE_MERGE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.FORCE_MERGE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.DELETE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.DELETE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.REPLICA_COUNT -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.REPLICA_COUNT,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.TRANSITION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.TRANSITION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.NOTIFICATION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.NOTIFICATION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.CLOSE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.CLOSE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SET_INDEX_PRIORITY -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SET_INDEX_PRIORITY, // problem in test
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.OPEN -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.OPEN,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.MOVE_SHARD -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.MOVE_SHARD,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SET_READ_ONLY -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SET_READ_ONLY,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SHRINK -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SHRINK,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SNAPSHOT -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SNAPSHOT,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.ALIAS_ACTION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ALIAS_ACTION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.ALLOCATION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ALLOCATION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

else -> {
logger.info(
"Action Metrics is not supported for this action [%s]",
context?.metadata?.actionMetaData?.name,
)
}
}
}

abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData

abstract fun isIdempotent(): Boolean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AliasActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AllocationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.MoveShardActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ShrinkActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.MetricsRegistry
import org.opensearch.telemetry.metrics.tags.Tags

abstract class ActionMetrics {
abstract val actionName: String

fun createTags(context: StepContext): Tags {
val tags = Tags.create()
.addTag("index_name", context.metadata.index)
r1walz marked this conversation as resolved.
Show resolved Hide resolved
.addTag("policy_id", context.metadata.policyID)
.addTag("node_id", context.clusterService.nodeName ?: "")
.addTag("index_uuid", context.metadata.indexUuid)
return tags
}

abstract fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
)
}

class IndexManagementActionsMetrics private constructor() {
private lateinit var metricsRegistry: MetricsRegistry
private lateinit var actionMetricsMap: Map<String, ActionMetrics>

companion object {
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }

const val ROLLOVER = "rollover"
const val NOTIFICATION = "notification"
const val FORCE_MERGE = "force_merge"
const val DELETE = "delete"
const val REPLICA_COUNT = "replica_count"
const val TRANSITION = "transition"
const val CLOSE = "close"
const val SET_INDEX_PRIORITY = "set_index_priority"
const val OPEN = "open"
const val MOVE_SHARD = "move_shard"
const val SET_READ_ONLY = "set_read_only"
const val SHRINK = "shrink"
const val SNAPSHOT = "snapshot"
const val ALIAS_ACTION = "alias_action"
const val ALLOCATION = "allocation"

private object HOLDER {
val instance = IndexManagementActionsMetrics()
}
}

fun initialize(metricsRegistry: MetricsRegistry) {
this.metricsRegistry = metricsRegistry

RolloverActionMetrics.instance.initializeCounters(metricsRegistry)
NotificationActionMetrics.instance.initializeCounters(metricsRegistry)
ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry)
DeleteActionMetrics.instance.initializeCounters(metricsRegistry)
ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry)
TransitionActionMetrics.instance.initializeCounters(metricsRegistry)
CloseActionMetrics.instance.initializeCounters(metricsRegistry)
SetIndexPriorityActionMetrics.instance.initializeCounters(metricsRegistry)
OpenActionMetrics.instance.initializeCounters(metricsRegistry)
MoveShardActionMetrics.instance.initializeCounters(metricsRegistry)
SetReadOnlyActionMetrics.instance.initializeCounters(metricsRegistry)
ShrinkActionMetrics.instance.initializeCounters(metricsRegistry)
SnapshotActionMetrics.instance.initializeCounters(metricsRegistry)
AliasActionMetrics.instance.initializeCounters(metricsRegistry)
AllocationActionMetrics.instance.initializeCounters(metricsRegistry)

actionMetricsMap = mapOf(
ROLLOVER to RolloverActionMetrics.instance,
NOTIFICATION to NotificationActionMetrics.instance,
FORCE_MERGE to ForceMergeActionMetrics.instance,
DELETE to DeleteActionMetrics.instance,
REPLICA_COUNT to ReplicaCountActionMetrics.instance,
TRANSITION to TransitionActionMetrics.instance,
CLOSE to CloseActionMetrics.instance,
SET_INDEX_PRIORITY to SetIndexPriorityActionMetrics.instance,
OPEN to OpenActionMetrics.instance,
MOVE_SHARD to MoveShardActionMetrics.instance,
SET_READ_ONLY to SetReadOnlyActionMetrics.instance,
SHRINK to ShrinkActionMetrics.instance,
SNAPSHOT to SnapshotActionMetrics.instance,
ALIAS_ACTION to AliasActionMetrics.instance,
ALLOCATION to AllocationActionMetrics.instance,
)
}

fun getActionMetrics(actionName: String): ActionMetrics? {
return actionMetricsMap[actionName]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class AliasActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.ALIAS_ACTION
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Alias Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Alias Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Alias Actions", "milliseconds")
}

companion object {
val instance: AliasActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = AliasActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val aliasActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALIAS_ACTION) as AliasActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
aliasActionMetrics.successes.add(1.0, context.let { aliasActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
aliasActionMetrics.failures.add(1.0, context.let { aliasActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
aliasActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { aliasActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class AllocationActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.ALLOCATION
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Allocation Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Allocation Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Allocation Actions", "milliseconds")
}

companion object {
val instance: AllocationActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = AllocationActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val allocationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALLOCATION) as AllocationActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
allocationActionMetrics.successes.add(1.0, context.let { allocationActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
allocationActionMetrics.failures.add(1.0, context.let { allocationActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
allocationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { allocationActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class CloseActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.CLOSE
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Close Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Close Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Close Actions", "milliseconds")
}

companion object {
val instance: CloseActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = CloseActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val closeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
closeActionMetrics.successes.add(1.0, context.let { closeActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
closeActionMetrics.failures.add(1.0, context.let { closeActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
closeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { closeActionMetrics.createTags(it) })
}
}
Loading
Loading