Skip to content

Commit

Permalink
SpikeFilter settings (#99)
Browse files Browse the repository at this point in the history
* WIP arbiter history, try jdk 14

* add-exports jvmArg, jdk 17

* WIP

* WIP flapping

* arbiter changes to support flapping detection and notification settings

* notification settings schema and case class

* notification setings doc

* scalastyle fix

* use spike filtering nomenclature

* minor naming change

* spike filter tidy up

* minor refactoring

* scalafix

---------

Co-authored-by: tomas mccandless <tomas.mccandless@workday.com>
  • Loading branch information
tomnis and tomas mccandless authored Jul 26, 2023
1 parent 8236bc4 commit cfeaef7
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait Tables {
import slick.jdbc.{GetResult => GR}

/** DDL for all tables. Call .create to execute. */
lazy val schema = Array(Build.schema, Measurement.schema, MeasurementName.schema, TagName.schema, TestDefinition.schema, TestDefinitionMetaTag.schema, TestDefinitionTag.schema, TestExecution.schema, TestExecutionMetaTag.schema, TestExecutionTag.schema).reduceLeft(_ ++ _)
lazy val schema = Array(Build.schema, Measurement.schema, MeasurementName.schema, SpikeFilterSettings.schema, TagName.schema, TestDefinition.schema, TestDefinitionMetaTag.schema, TestDefinitionTag.schema, TestExecution.schema, TestExecutionMetaTag.schema, TestExecutionTag.schema).reduceLeft(_ ++ _)
@deprecated("Use .schema instead of .ddl", "3.0")
def ddl = schema

Expand Down Expand Up @@ -54,6 +54,19 @@ trait Tables {
def idMeasurementName(row: MeasurementNameRowWrapper): Int = row.idMeasurementName
def name(row: MeasurementNameRowWrapper): String = row.name
}
implicit object SpikeFilterSettingsRowTypeClassObject extends SpikeFilterSettingsRowLikeType[SpikeFilterSettingsRow] {
def idTestDefinition(row: SpikeFilterSettingsRow): Int = row.idTestDefinition
def spikeFilterEnabled(row: SpikeFilterSettingsRow): Boolean = row.spikeFilterEnabled
def responseTimeRequirement(row: SpikeFilterSettingsRow): Double = row.responseTimeRequirement
def alertOnNth(row: SpikeFilterSettingsRow): Int = row.alertOnNth
}

implicit object SpikeFilterSettingsRowWrapperTypeClassObject extends SpikeFilterSettingsRowLikeType[SpikeFilterSettingsRowWrapper] {
def idTestDefinition(row: SpikeFilterSettingsRowWrapper): Int = row.idTestDefinition
def spikeFilterEnabled(row: SpikeFilterSettingsRowWrapper): Boolean = row.spikeFilterEnabled
def responseTimeRequirement(row: SpikeFilterSettingsRowWrapper): Double = row.responseTimeRequirement
def alertOnNth(row: SpikeFilterSettingsRowWrapper): Int = row.alertOnNth
}
implicit object TagNameRowTypeClassObject extends TagNameRowLikeType[TagNameRow] {
def idTagName(row: TagNameRow): Int = row.idTagName
def name(row: TagNameRow): String = row.name
Expand Down Expand Up @@ -269,6 +282,42 @@ trait Tables {
/** Collection-like TableQuery object for table MeasurementName */
lazy val MeasurementName = new TableQuery(tag => new MeasurementName(tag))

/** Entity class storing rows of table SpikeFilterSettings
* @param idTestDefinition Database column idTestDefinition SqlType(INT), PrimaryKey
* @param spikeFilterEnabled Database column spikeFilterEnabled SqlType(BIT), Default(false)
* @param responseTimeRequirement Database column responseTimeRequirement SqlType(DOUBLE)
* @param alertOnNth Database column alertOnNth SqlType(INT), Default(1) */
class SpikeFilterSettingsRowWrapper(val idTestDefinition: Int, val spikeFilterEnabled: Boolean = false, val responseTimeRequirement: Double, val alertOnNth: Int = 1) extends SpikeFilterSettingsRowLike
case class SpikeFilterSettingsRow(override val idTestDefinition: Int, override val spikeFilterEnabled: Boolean = false, override val responseTimeRequirement: Double, override val alertOnNth: Int = 1) extends SpikeFilterSettingsRowWrapper(idTestDefinition, spikeFilterEnabled, responseTimeRequirement, alertOnNth)
implicit def SpikeFilterSettingsRowWrapper2SpikeFilterSettingsRow(x: SpikeFilterSettingsRowWrapper): SpikeFilterSettingsRow = SpikeFilterSettingsRow(x.idTestDefinition, x.spikeFilterEnabled, x.responseTimeRequirement, x.alertOnNth)
implicit def SpikeFilterSettingsRow2SpikeFilterSettingsRowWrapper(x: SpikeFilterSettingsRow): SpikeFilterSettingsRowWrapper = new SpikeFilterSettingsRowWrapper(x.idTestDefinition, x.spikeFilterEnabled, x.responseTimeRequirement, x.alertOnNth)
implicit def SpikeFilterSettingsRowFromTypeClass[T: SpikeFilterSettingsRowLikeType](x: T): SpikeFilterSettingsRow = SpikeFilterSettingsRow(implicitly[SpikeFilterSettingsRowLikeType[T]].idTestDefinition(x), implicitly[SpikeFilterSettingsRowLikeType[T]].spikeFilterEnabled(x), implicitly[SpikeFilterSettingsRowLikeType[T]].responseTimeRequirement(x), implicitly[SpikeFilterSettingsRowLikeType[T]].alertOnNth(x))
/** GetResult implicit for fetching SpikeFilterSettingsRow objects using plain SQL queries */
implicit def GetResultSpikeFilterSettingsRow(implicit e0: GR[Int], e1: GR[Boolean], e2: GR[Double]): GR[SpikeFilterSettingsRow] = GR{
prs => import prs._
SpikeFilterSettingsRow.tupled((<<[Int], <<[Boolean], <<[Double], <<[Int]))
}
/** Table description of table SpikeFilterSettings. Objects of this class serve as prototypes for rows in queries. */
class SpikeFilterSettings(_tableTag: Tag) extends profile.api.Table[SpikeFilterSettingsRow](_tableTag, None, "SpikeFilterSettings") with SpikeFilterSettingsLike {
def * = (idTestDefinition, spikeFilterEnabled, responseTimeRequirement, alertOnNth).<>(SpikeFilterSettingsRow.tupled, SpikeFilterSettingsRow.unapply)
/** Maps whole row to an option. Useful for outer joins. */
def ? = ((Rep.Some(idTestDefinition), Rep.Some(spikeFilterEnabled), Rep.Some(responseTimeRequirement), Rep.Some(alertOnNth))).shaped.<>({r=>import r._; _1.map(_=> SpikeFilterSettingsRow.tupled((_1.get, _2.get, _3.get, _4.get)))}, (_:Any) => throw new Exception("Inserting into ? projection not supported."))

/** Database column idTestDefinition SqlType(INT), PrimaryKey */
val idTestDefinition: Rep[Int] = column[Int]("idTestDefinition", O.PrimaryKey)
/** Database column spikeFilterEnabled SqlType(BIT), Default(false) */
val spikeFilterEnabled: Rep[Boolean] = column[Boolean]("spikeFilterEnabled", O.Default(false))
/** Database column responseTimeRequirement SqlType(DOUBLE) */
val responseTimeRequirement: Rep[Double] = column[Double]("responseTimeRequirement")
/** Database column alertOnNth SqlType(INT), Default(1) */
val alertOnNth: Rep[Int] = column[Int]("alertOnNth", O.Default(1))

/** Foreign key referencing TestDefinition (database name definition_SpikeFilterSettings) */
lazy val testDefinitionFk = foreignKey("definition_SpikeFilterSettings", idTestDefinition, TestDefinition)(r => r.idTestDefinition, onUpdate=ForeignKeyAction.Restrict, onDelete=ForeignKeyAction.Cascade)
}
/** Collection-like TableQuery object for table SpikeFilterSettings */
lazy val SpikeFilterSettings = new TableQuery(tag => new SpikeFilterSettings(tag))

/** Entity class storing rows of table TagName
* @param idTagName Database column idTagName SqlType(INT), AutoInc, PrimaryKey
* @param name Database column name SqlType(VARCHAR), Length(255,true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import slick.lifted.Rep
import annotation.implicitNotFound

trait TablesLike {
val CORE_TABLES: Map[String, String] = Map(("BuildRow","BuildRowWrapper(idBuild,major,minor,patch,firstTested,lastTested)"),("MeasurementRow","MeasurementRowWrapper(idTestExecution,idMeasurementName,result)"),("MeasurementNameRow","MeasurementNameRowWrapper(idMeasurementName,name)"),("TagNameRow","TagNameRowWrapper(idTagName,name,nameType,isUserGenerated)"),("TestDefinitionRow","TestDefinitionRowWrapper(idTestDefinition,methodSignature,active,productName,subProductName,className,methodName,documentation)"),("TestDefinitionMetaTagRow","TestDefinitionMetaTagRowWrapper(idTestDefinitionTag,idTagName,value)"),("TestDefinitionTagRow","TestDefinitionTagRowWrapper(idTestDefinitionTag,idTestDefinition,idTagName,value)"),("TestExecutionRow","TestExecutionRowWrapper(idTestExecution,idTestDefinition,idBuild,passed,responseTime,responseTimeRequirement,startTime,endTime)"),("TestExecutionMetaTagRow","TestExecutionMetaTagRowWrapper(idTestExecutionTag,idTagName,value)"),("TestExecutionTagRow","TestExecutionTagRowWrapper(idTestExecutionTag,idTestExecution,idTagName,value)"))
val CORE_TABLES: Map[String, String] = Map(("BuildRow","BuildRowWrapper(idBuild,major,minor,patch,firstTested,lastTested)"),("MeasurementRow","MeasurementRowWrapper(idTestExecution,idMeasurementName,result)"),("MeasurementNameRow","MeasurementNameRowWrapper(idMeasurementName,name)"),("SpikeFilterSettingsRow","SpikeFilterSettingsRowWrapper(idTestDefinition,spikeFilterEnabled,responseTimeRequirement,alertOnNth)"),("TagNameRow","TagNameRowWrapper(idTagName,name,nameType,isUserGenerated)"),("TestDefinitionRow","TestDefinitionRowWrapper(idTestDefinition,methodSignature,active,productName,subProductName,className,methodName,documentation)"),("TestDefinitionMetaTagRow","TestDefinitionMetaTagRowWrapper(idTestDefinitionTag,idTagName,value)"),("TestDefinitionTagRow","TestDefinitionTagRowWrapper(idTestDefinitionTag,idTestDefinition,idTagName,value)"),("TestExecutionRow","TestExecutionRowWrapper(idTestExecution,idTestDefinition,idBuild,passed,responseTime,responseTimeRequirement,startTime,endTime)"),("TestExecutionMetaTagRow","TestExecutionMetaTagRowWrapper(idTestExecutionTag,idTagName,value)"),("TestExecutionTagRow","TestExecutionTagRowWrapper(idTestExecutionTag,idTestExecution,idTagName,value)"))
/** Supertrait for entity classes storing rows of table BuildLike
*
* idBuild: Database column idBuild SqlType(INT), AutoInc, PrimaryKey
Expand Down Expand Up @@ -86,6 +86,34 @@ trait TablesLike {
val name: Rep[String]
}

/** Supertrait for entity classes storing rows of table SpikeFilterSettingsLike
*
* idTestDefinition: Database column idTestDefinition SqlType(INT), PrimaryKey
* spikeFilterEnabled: Database column spikeFilterEnabled SqlType(BIT), Default(false)
* responseTimeRequirement: Database column responseTimeRequirement SqlType(DOUBLE)
* alertOnNth: Database column alertOnNth SqlType(INT), Default(1) */
trait SpikeFilterSettingsRowLike {
val idTestDefinition: Int
val spikeFilterEnabled: Boolean
val responseTimeRequirement: Double
val alertOnNth: Int
}
/** Type Class for SpikeFilterSettingsRowLike **/
@implicitNotFound("Could not find an implicit value for evidence of type class SpikeFilterSettingsRowLikeType[${T}]. You might pass an (implicit ev: SpikeFilterSettingsRowLikeType[${T}]) parameter to your method or import Tables.RowTypeClasses._")
trait SpikeFilterSettingsRowLikeType[T] {
def idTestDefinition(row: T): Int
def spikeFilterEnabled(row: T): Boolean
def responseTimeRequirement(row: T): Double
def alertOnNth(row: T): Int
}
/** Supertrait for Table descriptions of table SpikeFilterSettingsLike */
trait SpikeFilterSettingsLike {
val idTestDefinition: Rep[Int]
val spikeFilterEnabled: Rep[Boolean]
val responseTimeRequirement: Rep[Double]
val alertOnNth: Rep[Int]
}

/** Supertrait for entity classes storing rows of table TagNameLike
*
* idTagName: Database column idTagName SqlType(INT), AutoInc, PrimaryKey
Expand Down Expand Up @@ -323,6 +351,12 @@ trait TablesLike {
def idMeasurementName(row: MeasurementNameRowLike): Int = row.idMeasurementName
def name(row: MeasurementNameRowLike): String = row.name
}
implicit object SpikeFilterSettingsRowLikeTypeClassObject extends SpikeFilterSettingsRowLikeType[SpikeFilterSettingsRowLike] {
def idTestDefinition(row: SpikeFilterSettingsRowLike): Int = row.idTestDefinition
def spikeFilterEnabled(row: SpikeFilterSettingsRowLike): Boolean = row.spikeFilterEnabled
def responseTimeRequirement(row: SpikeFilterSettingsRowLike): Double = row.responseTimeRequirement
def alertOnNth(row: SpikeFilterSettingsRowLike): Int = row.alertOnNth
}
implicit object TagNameRowLikeTypeClassObject extends TagNameRowLikeType[TagNameRowLike] {
def idTagName(row: TagNameRowLike): Int = row.idTagName
def name(row: TagNameRowLike): String = row.name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;
SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='TRADITIONAL,ALLOW_INVALID_DATES';


CREATE TABLE IF NOT EXISTS `SpikeFilterSettings` (
`idTestDefinition` INT(11) NOT NULL COMMENT 'Foreign key pointing to the test definition.',
`spikeFilterEnabled` TINYINT(1) NOT NULL DEFAULT FALSE COMMENT 'Whether arbiter spike filter is enabled.',
`responseTimeRequirement` DOUBLE NOT NULL,
`alertOnNth` INT(11) NOT NULL DEFAULT 1,
PRIMARY KEY (`idTestDefinition`),
CONSTRAINT `definition_SpikeFilterSettings`
FOREIGN KEY (`idTestDefinition`)
REFERENCES `TestDefinition` (`idTestDefinition`)
ON DELETE CASCADE
ON UPDATE NO ACTION
)
ENGINE = InnoDB
DEFAULT CHARACTER SET = utf8;


SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,29 @@ trait ArbiterLike extends PersistenceAware with CanReadHistory {
* @return a wrapped error with a useful message, or None if the measured test passed its requirement.
*/
final def voteWithSpikeFilter[T: TestExecutionRowLikeType](ballot: Ballot, testExecution: T): Option[Throwable] = {
val (spikeFilterEnabled, alertOnNth) = spikeFilterSettings
val (spikeFilterEnabled, alertOnNth) = spikeFilterSettings(testExecution)
voteWithSpikeFilter(ballot, testExecution, spikeFilterEnabled, alertOnNth)
}


/**
* Whether spike filtering is enabled.
* TODO should consult notification settings db table in addition to properties.
* Whether spike filtering is enabled (notification settings).
*
* @return notification settings.
* Order of precedence:
* - WarpProperties
* - DB
* - defaults to off
*
* @return spike filtering settings.
*/
def spikeFilterSettings: (Boolean, Int) = {
(WARP_ARBITER_SPIKE_FILTER_ENABLED.value.toBoolean, WARP_ARBITER_SPIKE_FILTER_ALERT_ON_NTH.value.toInt)
def spikeFilterSettings[T: TestExecutionRowLikeType](testExecution: T): (Boolean, Int) = {
var settings: (Boolean, Int) = this.persistenceUtils.getSpikeFilterSettings(testExecution)
.map(setting => (setting.spikeFilterEnabled, setting.alertOnNth))
.getOrElse((false, 1))
// allow individual overrides from properties if they are present
Option(WARP_ARBITER_SPIKE_FILTER_ENABLED.value).foreach(f => settings = settings.copy(_1 = f.toBoolean))
Option(WARP_ARBITER_SPIKE_FILTER_ALERT_ON_NTH.value).foreach(f => settings = settings.copy(_2 = f.toInt))
settings
}


Expand All @@ -95,7 +105,9 @@ trait ArbiterLike extends PersistenceAware with CanReadHistory {
* @param testExecution [[TestExecutionRowLikeType]] we are voting on.
* @return true iff the test passed.
*/
def passed[T: TestExecutionRowLikeType](ballot: Ballot, testExecution: T): Boolean = this.vote(ballot, testExecution).isEmpty
def passed[T: TestExecutionRowLikeType](ballot: Ballot, testExecution: T): Boolean = {
this.voteWithSpikeFilter(ballot, testExecution).isEmpty
}


/**
Expand All @@ -105,7 +117,9 @@ trait ArbiterLike extends PersistenceAware with CanReadHistory {
* @param testExecution [[TestExecutionRowLikeType]] we are voting on.
*/
def collectVote[T: TestExecutionRowLikeType](ballot: Ballot,
testExecution: T): Unit = ballot.registerVote(this.vote(ballot, testExecution))
testExecution: T): Unit = {
ballot.registerVote(this.voteWithSpikeFilter(ballot, testExecution))
}


/**
Expand All @@ -115,16 +129,13 @@ trait ArbiterLike extends PersistenceAware with CanReadHistory {
* @param testExecution [[TestExecutionRowLikeType]] we are voting on.
*/
def voteAndThrow[T: TestExecutionRowLikeType](ballot: Ballot,
testExecution: T): Unit = this.maybeThrow(this.vote(ballot, testExecution))
testExecution: T): Unit = {
this.maybeThrow(this.voteWithSpikeFilter(ballot, testExecution))
}


/** Throws an exception iff the measured test did not pass its requirement. */
def maybeThrow(maybeError: Option[Throwable]): Unit = {
maybeError match {
case Some(error) => throw error
case None =>
}
}
def maybeThrow(maybeError: Option[Throwable]): Unit = maybeError.foreach(throw _)


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,23 @@ trait HasCoreWarpProperties extends WarpPropertyLike {
*/
val WARP_PERSISTENCE_RETRIES: PropertyEntry = PropertyEntry("wd.warp.persistence.retries", isRequired = false, "4")

/**
* Whether arbiter flapping detection is enabled.
*
* Required: No
*/
val WARP_ARBITER_SPIKE_FILTER_ENABLED: PropertyEntry = PropertyEntry(
"wd.warp.arbiter.spike.filter.enabled",
isRequired = false,
"true"
)

/**
* Arbiter flapping number of consecutive failures (depending on arbiter implementation votes)
* required before failing a build.
*
* Required: No
*/
val WARP_ARBITER_SPIKE_FILTER_ALERT_ON_NTH: PropertyEntry = PropertyEntry(
"wd.warp.arbiter.spike.filter.alert.on.nth", isRequired = false, "1"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,5 +344,30 @@ trait AbstractQueries {
def updateTestExecutionThreshold[T: TestExecutionRowLikeType](testExecution: T, newThreshold: Double): DBIO[Int]


/**
* Creates a [[DBIO]] for reading prior test executions.
*
* @param testExecution text execution to read history.
* @param limit history length.
* @return a [[DBIO]] (not yet executed) for reading test execution history.
*/
def getPriorTestExecutionsQuery[T: TestExecutionRowLikeType](testExecution: T, limit: Int): DBIO[Seq[TestExecutionRowLike]]


/**
* Creates a [[DBIO]] for reading spike filter settings.
*
* @param testExecution test execution to read spike filter settings for.
* @return a [[DBIO]] (not yet executed) for reading spike filter settings for the given test execution.
*/
def getSpikeFilterSettingsQuery[T: TestExecutionRowLikeType](testExecution: T): DBIO[Option[SpikeFilterSettingsRowLike]]


/**
* Creates a [[DBIO]] for bulk writing spike filter settings.
*
* @param settings collection of settings to write.
* @return a [[DBIO]] (not yet executed) for writing a collection of spike filter settings.
*/
def writeSpikeFilterSettingsQuery[T: SpikeFilterSettingsRowLikeType](settings: Seq[T]): DBIO[Int]
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,32 @@ trait CorePersistenceAware extends PersistenceAware with WarpLogging {
* @tparam T
* @return a collection of prior test executions.
*/
override def getPriorTestExecutions[T: TestExecutionRowLikeType](testExecution: T, limit: Int): Seq[TablesLike.TestExecutionRowLike] = {
override def getPriorTestExecutions[T: TestExecutionRowLikeType](testExecution: T, limit: Int): Seq[TestExecutionRowLike] = {
this.synchronously(getPriorTestExecutionsQuery(testExecution, limit))
}

/**
* Reads spike filter settings for the given test execution.
*
* @param testExecution execution to look up spike filter settings for.
* @tparam T
* @return spike filter settings for the given test execution.
*/
override def getSpikeFilterSettings[T: TestExecutionRowLikeType](testExecution: T): Option[SpikeFilterSettingsRowLike] = {
this.synchronously(getSpikeFilterSettingsQuery(testExecution))
}


/**
* Writes a collection of spike filter settings.
*
* @param settings
* @tparam T
* @return number of rows affected.
*/
override def writeSpikeFilterSettings[T: SpikeFilterSettingsRowLikeType](settings: Seq[T]): Int = {
this.synchronously(writeSpikeFilterSettingsQuery(settings))
}
}
}

Expand Down
Loading

0 comments on commit cfeaef7

Please sign in to comment.