Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index Management Action Metrics #1195

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
Expand All @@ -25,7 +26,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
return this
}

abstract suspend fun execute(): Step
abstract suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step
r1walz marked this conversation as resolved.
Show resolved Hide resolved

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.telemetry.metrics.MetricsRegistry
import org.opensearch.telemetry.metrics.tags.Tags

abstract class ActionMetrics {
abstract val actionName: String

fun createTags(context: StepContext): Tags {
val tags = Tags.create()
.addTag("index_name", context.metadata.index)
r1walz marked this conversation as resolved.
Show resolved Hide resolved
.addTag("policy_id", context.metadata.policyID)
.addTag("node_id", context.clusterService.nodeName ?: "")
return tags
}
}

class IndexManagementActionsMetrics private constructor() {
private lateinit var metricsRegistry: MetricsRegistry
private lateinit var actionMetricsMap: Map<String, ActionMetrics>

companion object {
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }

const val ROLLOVER = "rollover"
const val NOTIFICATION = "notification"
const val FORCE_MERGE = "force_merge"
const val DELETE = "delete"
const val REPLICA_COUNT = "replica_count"
const val TRANSITION = "transition"

private object HOLDER {
val instance = IndexManagementActionsMetrics()
}
}

fun initialize(metricsRegistry: MetricsRegistry) {
this.metricsRegistry = metricsRegistry

RolloverActionMetrics.instance.initializeCounters(metricsRegistry)
NotificationActionMetrics.instance.initializeCounters(metricsRegistry)
ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry)
DeleteActionMetrics.instance.initializeCounters(metricsRegistry)
ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry)
TransitionActionMetrics.instance.initializeCounters(metricsRegistry)

actionMetricsMap = mapOf(
ROLLOVER to RolloverActionMetrics.instance,
NOTIFICATION to NotificationActionMetrics.instance,
FORCE_MERGE to ForceMergeActionMetrics.instance,
DELETE to DeleteActionMetrics.instance,
REPLICA_COUNT to ReplicaCountActionMetrics.instance,
TRANSITION to TransitionActionMetrics.instance,
)
}

fun getActionMetrics(actionName: String): ActionMetrics? {
return actionMetricsMap[actionName]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class DeleteActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.DELETE
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Delete Action Successes", "count")
r1walz marked this conversation as resolved.
Show resolved Hide resolved
failures = metricsRegistry.createCounter("${actionName}_failures", "Delete Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Delete Action", "milliseconds")
}

companion object {
val instance: DeleteActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = DeleteActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class ForceMergeActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.FORCE_MERGE
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Force Merge Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Force Merge Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Force Merge Action", "milliseconds")
}

companion object {
val instance: ForceMergeActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = ForceMergeActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class NotificationActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.NOTIFICATION
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Notification Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Notification Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Notification Action", "milliseconds")
}

companion object {
val instance: NotificationActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = NotificationActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class ReplicaCountActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.REPLICA_COUNT
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Replica Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Replica Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Replica Count Action", "milliseconds")
}

companion object {
val instance: ReplicaCountActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = ReplicaCountActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class RolloverActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.ROLLOVER
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Rollover Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Rollover Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Rollover Actions", "milliseconds")
}

companion object {
val instance: RolloverActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = RolloverActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class TransitionActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.TRANSITION
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Transition Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Transition Action Failures", "count")
r1walz marked this conversation as resolved.
Show resolved Hide resolved
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Transition Actions", "milliseconds")
}

companion object {
val instance: TransitionActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = TransitionActionMetrics()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManage
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.transform.TargetIndexMappingService
import org.opensearch.indexmanagement.transform.TransformRunner
Expand Down Expand Up @@ -184,10 +185,13 @@ import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.NetworkPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.SystemIndexPlugin
import org.opensearch.plugins.TelemetryAwarePlugin
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
import org.opensearch.script.ScriptService
import org.opensearch.telemetry.metrics.MetricsRegistry
import org.opensearch.telemetry.tracing.Tracer
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.RemoteClusterService
import org.opensearch.transport.TransportInterceptor
Expand All @@ -196,7 +200,8 @@ import org.opensearch.watcher.ResourceWatcherService
import java.util.function.Supplier

@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() {
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin,
TelemetryAwarePlugin, Plugin() {
private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var actionValidation: ActionValidation
Expand All @@ -210,6 +215,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
private val extensions = mutableSetOf<String>()
private val extensionCheckerMap = mutableMapOf<String, StatusChecker>()
lateinit var indexOperationActionFilter: IndexOperationActionFilter
private lateinit var metricsRegistry: MetricsRegistry

companion object {
const val PLUGINS_BASE_URI = "/_plugins"
Expand Down Expand Up @@ -374,8 +380,11 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
namedWriteableRegistry: NamedWriteableRegistry,
indexNameExpressionResolver: IndexNameExpressionResolver,
repositoriesServiceSupplier: Supplier<RepositoriesService>,
tracer: Tracer,
metricsRegistry: MetricsRegistry,
): Collection<Any> {
val settings = environment.settings()
this.metricsRegistry = metricsRegistry
this.clusterService = clusterService
QueryShardContextFactory.init(
client,
Expand All @@ -385,6 +394,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
namedWriteableRegistry,
environment,
)

IndexManagementActionsMetrics.instance.initialize(metricsRegistry)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
val jvmService = JvmService(environment.settings())
val transformRunner =
Expand Down Expand Up @@ -453,6 +464,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerThreadPool(threadPool)
.registerExtensionChecker(extensionChecker)
.registerIndexMetadataProvider(indexMetadataProvider)
.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance)

val managedIndexCoordinator =
ManagedIndexCoordinator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import org.opensearch.indexmanagement.opensearchapi.withClosableContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
Expand Down Expand Up @@ -121,6 +122,7 @@ object ManagedIndexRunner :
private lateinit var skipExecFlag: SkipExecution
private lateinit var threadPool: ThreadPool
private lateinit var extensionStatusChecker: ExtensionStatusChecker
lateinit var indexManagementActionMetrics: IndexManagementActionsMetrics
private lateinit var indexMetadataProvider: IndexMetadataProvider
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED
private var validationServiceEnabled: Boolean = DEFAULT_ACTION_VALIDATION_ENABLED
Expand Down Expand Up @@ -221,6 +223,11 @@ object ManagedIndexRunner :
return this
}

fun registerIndexManagementActionMetrics(indexManagementActionsMetrics: IndexManagementActionsMetrics): Any {
this.indexManagementActionMetrics = indexManagementActionsMetrics
return this
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ManagedIndexConfig) {
throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
Expand Down Expand Up @@ -446,7 +453,7 @@ object ManagedIndexRunner :
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user,
),
) {
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger)
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute(indexManagementActionMetrics).postExecute(logger)
}
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

Expand Down
Loading
Loading