Skip to content

Commit

Permalink
Update snapshot management requests
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <cwperx@amazon.com>
  • Loading branch information
cwperks committed Nov 26, 2024
1 parent b15acc6 commit 8a310d2
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,31 @@

package org.opensearch.indexmanagement.snapshotmanagement.api.transport.start

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.action.update.UpdateRequest
import org.opensearch.action.ValidateActions.addValidationError
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class StartSMRequest : UpdateRequest {
class StartSMRequest : ActionRequest {

val id: String
get() = field

@Throws(IOException::class)
constructor(sin: StreamInput) : super(sin)
constructor(sin: StreamInput) : super(sin) {
this.id = sin.readString()
}

constructor(id: String) {
super.id(id)
this.id = id
}

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (super.id().isEmpty()) {
validationException = ValidateActions.addValidationError("id is missing", validationException)
if (this.id.isEmpty()) {
validationException = addValidationError("id is missing", validationException)
}
return validationException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.action.update.UpdateRequest
import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -57,7 +58,7 @@ constructor(
user: User?,
threadContext: ThreadContext.StoredContext,
): AcknowledgedResponse {
val smPolicy = client.getSMPolicy(request.id())
val smPolicy = client.getSMPolicy(request.id)

// Check if the requested user has permission on the resource, throwing an exception if the user does not
verifyUserHasPermissionForResource(user, smPolicy.user, filterByEnabled, "snapshot management policy", smPolicy.policyName)
Expand All @@ -71,7 +72,8 @@ constructor(

private suspend fun enableSMPolicy(updateRequest: StartSMRequest): Boolean {
val now = Instant.now().toEpochMilli()
updateRequest.index(INDEX_MANAGEMENT_INDEX).doc(
val updateReq = UpdateRequest(INDEX_MANAGEMENT_INDEX, updateRequest.id)
updateReq.doc(
mapOf(
SMPolicy.SM_TYPE to
mapOf(
Expand All @@ -83,12 +85,12 @@ constructor(
)
val updateResponse: UpdateResponse =
try {
client.suspendUntil { update(updateRequest, it) }
client.suspendUntil { update(updateReq, it) }
} catch (e: VersionConflictEngineException) {
log.error("VersionConflictEngineException while trying to enable snapshot management policy id [${updateRequest.id()}]: $e")
log.error("VersionConflictEngineException while trying to enable snapshot management policy id [${updateRequest.id}]: $e")
throw OpenSearchStatusException(conflictExceptionMessage, RestStatus.INTERNAL_SERVER_ERROR)
} catch (e: Exception) {
log.error("Failed trying to enable snapshot management policy id [${updateRequest.id()}]: $e")
log.error("Failed trying to enable snapshot management policy id [${updateRequest.id}]: $e")
throw OpenSearchStatusException("Failed while trying to enable SM Policy", RestStatus.INTERNAL_SERVER_ERROR)
}
return updateResponse.result == DocWriteResponse.Result.UPDATED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,31 @@

package org.opensearch.indexmanagement.snapshotmanagement.api.transport.stop

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.action.update.UpdateRequest
import org.opensearch.action.ValidateActions.addValidationError
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class StopSMRequest : UpdateRequest {
class StopSMRequest : ActionRequest {

val id: String
get() = field

@Throws(IOException::class)
constructor(sin: StreamInput) : super(sin)
constructor(sin: StreamInput) : super(sin) {
this.id = sin.readString()
}

constructor(id: String) {
super.id(id)
this.id = id
}

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (super.id().isEmpty()) {
validationException = ValidateActions.addValidationError("id is missing", validationException)
if (this.id.isEmpty()) {
validationException = addValidationError("id is missing", validationException)
}
return validationException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.action.update.UpdateRequest
import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -57,7 +58,7 @@ constructor(
user: User?,
threadContext: ThreadContext.StoredContext,
): AcknowledgedResponse {
val smPolicy = client.getSMPolicy(request.id())
val smPolicy = client.getSMPolicy(request.id)

// Check if the requested user has permission on the resource, throwing an exception if the user does not
verifyUserHasPermissionForResource(user, smPolicy.user, filterByEnabled, "snapshot management policy", smPolicy.policyName)
Expand All @@ -71,7 +72,8 @@ constructor(

private suspend fun disableSMPolicy(updateRequest: StopSMRequest): Boolean {
val now = Instant.now().toEpochMilli()
updateRequest.index(INDEX_MANAGEMENT_INDEX).doc(
val updateReq = UpdateRequest(INDEX_MANAGEMENT_INDEX, updateRequest.id)
updateReq.doc(
mapOf(
SMPolicy.SM_TYPE to
mapOf(
Expand All @@ -83,12 +85,12 @@ constructor(
)
val updateResponse: UpdateResponse =
try {
client.suspendUntil { update(updateRequest, it) }
client.suspendUntil { update(updateReq, it) }
} catch (e: VersionConflictEngineException) {
log.error("VersionConflictEngineException while trying to disable snapshot management policy id [${updateRequest.id()}]: $e")
log.error("VersionConflictEngineException while trying to disable snapshot management policy id [${updateRequest.id}]: $e")
throw OpenSearchStatusException(conflictExceptionMessage, RestStatus.INTERNAL_SERVER_ERROR)
} catch (e: Exception) {
log.error("Failed trying to disable snapshot management policy id [${updateRequest.id()}]: $e")
log.error("Failed trying to disable snapshot management policy id [${updateRequest.id}]: $e")
throw OpenSearchStatusException("Failed while trying to disable SM Policy", RestStatus.INTERNAL_SERVER_ERROR)
}
// TODO update metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.action

import org.opensearch.action.DocWriteRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.update.UpdateRequest
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.index.seqno.SequenceNumbers
Expand Down Expand Up @@ -82,22 +83,22 @@ class RequestTests : OpenSearchTestCase() {

fun `test start sm policy request`() {
val id = "some_id"
val req = StartSMRequest(id).index(INDEX_MANAGEMENT_INDEX)
val req = UpdateRequest(INDEX_MANAGEMENT_INDEX, id)

val out = BytesStreamOutput().apply { req.writeTo(this) }
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val streamedReq = StartSMRequest(sin)
assertEquals(id, streamedReq.id())
assertEquals(id, streamedReq.id)
}

fun `test stop sm policy request`() {
val id = "some_id"
val req = StopSMRequest(id).index(INDEX_MANAGEMENT_INDEX)
val req = UpdateRequest(INDEX_MANAGEMENT_INDEX, id)

val out = BytesStreamOutput().apply { req.writeTo(this) }
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val streamedReq = StopSMRequest(sin)
assertEquals(id, streamedReq.id())
assertEquals(id, streamedReq.id)
}

fun `test explain sm policy request`() {
Expand Down

0 comments on commit 8a310d2

Please sign in to comment.