Skip to content

Commit

Permalink
refactor(activity): Simplify intent activity, squash history into sin…
Browse files Browse the repository at this point in the history
…gle log (#83)
  • Loading branch information
robzienert committed Apr 6, 2018
1 parent 384aed4 commit f98f7b7
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ open class KeelConfiguration {

@Bean
@ConditionalOnMissingBean(IntentActivityRepository::class)
open fun memoryIntentActivityRepository(): IntentActivityRepository = MemoryIntentActivityRepository(properties)
open fun memoryIntentActivityRepository(): IntentActivityRepository = MemoryIntentActivityRepository()

@Bean open fun clock(): Clock = Clock.systemDefaultZone()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import org.springframework.boot.context.properties.ConfigurationProperties
class KeelProperties {
var prettyPrintJson: Boolean = false
var immediatelyRunIntents: Boolean = true
var maxConvergenceLogEntriesPerIntent: Int = 720 // one entry every 30 s, this will keep 6 hours of logs
var maxConvergenceLogEntriesPerIntent: Int = 1000

var intentPackages: List<String> = listOf("com.netflix.spinnaker.keel.intent")
var intentSpecPackages: List<String> = listOf("com.netflix.spinnaker.keel.intent")
var policyPackages: List<String> = listOf("com.netflix.spinnaker.keel.policy")
var attributePackages: List<String> = listOf("com.netflix.spinnaker.keel.attribute")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,70 @@
*/
package com.netflix.spinnaker.keel

import com.netflix.spinnaker.keel.dryrun.ChangeType
import com.netflix.spinnaker.keel.state.FieldState
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonSubTypes.Type
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonTypeInfo.As.PROPERTY
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id.NAME
import com.fasterxml.jackson.annotation.JsonTypeName
import com.netflix.spinnaker.keel.model.ListCriteria
import java.time.Instant

/**
* @param intentId The ID of the intent that is being evaluated.
* @param changeType The type of change that took place this cycle.
* @param orchestrations A resultant (if any) list of orchestration IDs from the intent.
* @param messages Human-friendly messages about the change.
* @param diff The diff between current and desired state.
* @param actor Who (or what) initiated the operation.
* @param timestampMillis The timestamp in millis the record was created.
*/
data class IntentConvergenceRecord(
val intentId: String,
val changeType: ChangeType,
val orchestrations: List<String>?,
val messages: List<String>?,
val diff: Set<FieldState>,
val actor: String,
val timestampMillis: Long
@JsonTypeInfo(use = NAME, include = PROPERTY, property = "kind")
@JsonSubTypes(
Type(IntentChangeRecord::class),
Type(IntentConvergenceRecord::class)
)
sealed class ActivityRecord {
abstract val intentId: String
abstract val actor: String
val timestamp: Instant = Instant.now()
}

interface IntentActivityRepository {
enum class IntentChangeAction { UPSERT, DELETE }

fun addOrchestration(intentId: String, orchestrationId: String)
/**
* An activity record for whenever a change is made to [intentId]
*/
@JsonTypeName("IntentChange")
data class IntentChangeRecord(
override val intentId: String,
override val actor: String,
val action: IntentChangeAction,
val value: Intent<IntentSpec>
) : ActivityRecord()

fun addOrchestrations(intentId: String, orchestrations: List<String>)
/**
* Activity record whenever an intent is converged.
*/
@JsonTypeName("IntentConvergence")
data class IntentConvergenceRecord(
override val intentId: String,
override val actor: String,
val result: ConvergeResult
) : ActivityRecord()

fun getHistory(intentId: String): List<String>
/**
* Find the [ActivityRecord] class for [name]. Everyone loves reflection.
*/
fun activityRecordClassForName(name: String): Class<ActivityRecord>? {
return ActivityRecord::class.annotations
.filterIsInstance<JsonSubTypes>()
.firstOrNull()
?.let { subTypes ->
@Suppress("UNCHECKED_CAST")
subTypes.value.find { it.name == name }?.value as Class<ActivityRecord>?
}
}

fun logConvergence(intentConvergenceRecord: IntentConvergenceRecord)
/**
* Responsible for recording all activity related to intents. This is primarily meant for diagnostics and auditing.
*/
interface IntentActivityRepository {

fun getLog(intentId: String): List<IntentConvergenceRecord>
fun record(activity: ActivityRecord)

/**
* Permalink to a specific log message, identified by timestampMillis
* @param intentId The ID of the intent that is being evaluated.
* @param timestampMillis The timestamp of the log message,
* used as the unique identifier of the message, in milliseconds.
*/
fun getLogEntry(intentId: String, timestampMillis: Long): IntentConvergenceRecord?
fun getHistory(intentId: String, criteria: ListCriteria): List<ActivityRecord>

/**
* If orchestrationId is passed in as a link to the task in orca, strip the leading path
* off so that we're storing the actual orchestrationId
*/
fun parseOrchestrationId(orchestrationId: String) = orchestrationId.removePrefix("/tasks/")
fun <T : ActivityRecord> getHistory(intentId: String, kind: Class<T>, criteria: ListCriteria): List<T>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.keel.event

import com.netflix.spinnaker.keel.IntentActivityRepository
import com.netflix.spinnaker.keel.IntentChangeAction.DELETE
import com.netflix.spinnaker.keel.IntentChangeAction.UPSERT
import com.netflix.spinnaker.keel.IntentChangeRecord
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component

@Component
class IntentActivityListener(
private val intentActivityRepository: IntentActivityRepository
) {

@EventListener(AfterIntentUpsertEvent::class, AfterIntentDeleteEvent::class)
fun recordUpsert(event: IntentAwareEvent) {
// TODO rz - actor needs to be pulled out of AuthenticatedRequest
intentActivityRepository.record(IntentChangeRecord(
event.intent.id(),
action = if (event is AfterIntentUpsertEvent) UPSERT else DELETE,
actor = "TODO: unknown",
value = event.intent
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,73 +15,58 @@
*/
package com.netflix.spinnaker.keel.memory

import com.netflix.spinnaker.config.KeelProperties
import com.netflix.spinnaker.keel.ActivityRecord
import com.netflix.spinnaker.keel.IntentActivityRepository
import com.netflix.spinnaker.keel.IntentConvergenceRecord
import com.netflix.spinnaker.keel.model.ListCriteria
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.PostConstruct

class MemoryIntentActivityRepository
@Autowired constructor(
private val keelProperties: KeelProperties
) : IntentActivityRepository {
class MemoryIntentActivityRepository : IntentActivityRepository {

private val log = LoggerFactory.getLogger(javaClass)

private val orchestrations: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()

private val convergenceLog: ConcurrentHashMap<String, MutableList<IntentConvergenceRecord>> = ConcurrentHashMap()
private val activities: ConcurrentHashMap<String, MutableList<ActivityRecord>> = ConcurrentHashMap()

@PostConstruct
fun init() {
log.info("Using ${javaClass.simpleName}")
}

override fun addOrchestration(intentId: String, orchestrationId: String) {
val orchestrationUUID = parseOrchestrationId(orchestrationId)
if (!orchestrations.containsKey(intentId)) {
orchestrations[intentId] = mutableSetOf()
}
if (orchestrations[intentId]?.contains(orchestrationUUID) == false) {
orchestrations[intentId]?.add(orchestrationUUID)
}
override fun record(activity: ActivityRecord) {
ensureIntentLog(activity.intentId)
activities[activity.intentId]!!.add(activity)
}

override fun getHistory(intentId: String, criteria: ListCriteria): List<ActivityRecord> {
ensureIntentLog(intentId)
return activities[intentId]!!.let { limitOffset(it, criteria) }
}

override fun addOrchestrations(intentId: String, orchestrations: List<String>) {
orchestrations.forEach { addOrchestration(intentId, it) }
override fun <T : ActivityRecord> getHistory(intentId: String, kind: Class<T>, criteria: ListCriteria): List<T> {
ensureIntentLog(intentId)
return activities[intentId]!!
.filterIsInstance(kind)
.let { limitOffset(it, criteria) }
}

override fun getHistory(intentId: String) = orchestrations.getOrDefault(intentId, mutableSetOf()).toList()
private fun ensureIntentLog(intentId: String) {
if (!activities.containsKey(intentId)) {
activities[intentId] = mutableListOf()
}
}

override fun logConvergence(intentConvergenceRecord: IntentConvergenceRecord) {
val intentId = intentConvergenceRecord.intentId
if (!convergenceLog.containsKey(intentId)){
convergenceLog[intentId] = mutableListOf(intentConvergenceRecord)
} else {
if (convergenceLog[intentId] == null) {
convergenceLog[intentId] = mutableListOf(intentConvergenceRecord)
private fun <T : ActivityRecord> limitOffset(list: List<T>, criteria: ListCriteria): List<T> =
list.let {
val size = it.size
if (size <= criteria.offset) {
listOf()
} else {
convergenceLog[intentId]?.let { l ->
l.add(intentConvergenceRecord)
// Drop oldest entries if we're over the message limit
val numMsgsLeft = keelProperties.maxConvergenceLogEntriesPerIntent - l.count()
if (numMsgsLeft < 0){
convergenceLog[intentId] = l.drop(-1*numMsgsLeft).toMutableList()
}
var lastIndex = criteria.offset + criteria.limit
if (lastIndex >= size) {
lastIndex = size
}
it.subList(criteria.offset, lastIndex).toList()
}
}
}

override fun getLog(intentId: String): List<IntentConvergenceRecord>
= convergenceLog[intentId] ?: emptyList()

// if there are multiple messages with the same timestamp, return the first
override fun getLogEntry(intentId: String, timestampMillis: Long)
= convergenceLog[intentId]?.filter { it.timestampMillis == timestampMillis }?.toList()?.also {
// The same intent shouldn't be processed more than once at the exact same time.
if (it.size > 1) log.warn("Two messages with the same timestampMillis. This shouldn't happen.")
}?.first()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.keel.model

// TODO validation
interface ListCriteria {
val limit: Int
val offset: Int
}

data class PagingListCriteria(
override val limit: Int = 10,
override val offset: Int = 0
) : ListCriteria
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@ import com.netflix.spinnaker.keel.*
import com.netflix.spinnaker.keel.dryrun.ChangeSummary
import net.logstash.logback.argument.StructuredArguments.value
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component
import java.time.Clock
import java.util.concurrent.TimeUnit

@Component(value = "orcaIntentLauncher")
open class OrcaIntentLauncher
@Autowired constructor(
open class OrcaIntentLauncher(
private val intentProcessors: List<IntentProcessor<*>>,
private val orcaService: OrcaService,
private val registry: Registry,
private val applicationEventPublisher: ApplicationEventPublisher,
private val intentActivityRepository: IntentActivityRepository,
private val clock: Clock
private val intentActivityRepository: IntentActivityRepository
) : IntentLauncher<OrcaLaunchedIntentResult> {

private val log = LoggerFactory.getLogger(javaClass)
Expand All @@ -61,14 +57,10 @@ open class OrcaIntentLauncher
orcaService.orchestrate(it).ref
}

intentActivityRepository.logConvergence(IntentConvergenceRecord(
intentActivityRepository.record(IntentConvergenceRecord(
intentId = intent.id(),
changeType = result.changeSummary.type,
orchestrations = orchestrationIds,
messages = result.changeSummary.message,
diff = result.changeSummary.diff,
actor = "keel:scheduledConvergence",
timestampMillis = clock.millis()
actor = "keel:scheduledConverge",
result = result
))

log.info(
Expand Down
Loading

0 comments on commit f98f7b7

Please sign in to comment.