Skip to content

Commit

Permalink
Set the rollover action to idempotent (#986)
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored Oct 5, 2023
1 parent b741c4b commit f543d93
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
info = mutableInfo.toMap()
}

override fun isIdempotent(): Boolean = false
override fun isIdempotent(): Boolean = true

@Suppress("TooManyFunctions")
companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.waitFor
import org.opensearch.rest.RestRequest
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -733,4 +734,65 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
Assert.assertEquals(alias.containsKey("test_alias2"), true)
Assert.assertEquals(alias.containsKey("test_alias3"), true)
}

fun `test rollover detects transient failure and continues executing`() {
val aliasName = "${testIndexName}_alias"
val indexNameBase = "${testIndexName}_index"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = RolloverAction(null, 1, null, null, false, 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(firstIndex, policyID, aliasName)

val managedIndexConfig = getExistingManagedIndexConfig(firstIndex)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) }

// Insert data to trigger rollover
insertSampleData(index = firstIndex, docCount = 5, delay = 0)
// Need to speed up to second execution where it will trigger the attempt rollover step
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover.", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"])
}
// Manually produce transaction failure
val response = client().makeRequest(
"POST", "$INDEX_MANAGEMENT_INDEX/_update/${managedIndexConfig.id}%23metadata",
StringEntity(
"{\n" +
" \"script\": {\n" +
" \"source\": \"ctx._source.managed_index_metadata.step.step_status = params.step_status\",\n" +
" \"lang\": \"painless\",\n" +
" \"params\": {\n" +
" \"step_status\": \"starting\"\n" +
" }\n" +
" }\n" +
"}",
ContentType.APPLICATION_JSON
)
)
assertEquals("Request failed", RestStatus.OK, response.restStatus())

// Execute again to see the transaction failure
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val metadata = getExplainManagedIndexMetaData(firstIndex)
assertEquals("Executing the wrong step", "attempt_rollover", metadata.stepMetaData?.name)
assertEquals("rollover step did not continue executing after detecting the transient failure.", Step.StepStatus.COMPLETED, metadata.stepMetaData?.stepStatus)
}
}
}

0 comments on commit f543d93

Please sign in to comment.