Skip to content

Commit

Permalink
don't use deprecated influxdb methods (#10)
Browse files Browse the repository at this point in the history
* don't use deprecated influxdb methods

* minor import cleanup

* minor cleanup, use Either in influxdbclient to store error message
  • Loading branch information
tomnis authored Nov 8, 2018
1 parent 9ee73f2 commit 7b4f7ab
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ResponseTimeCollector(testId: String) extends AbstractMeasurementCollector
* Called prior to starting an individual test invocation.
*/
override def startMeasurement(): Unit = {
if (InfluxDBClient.client.isEmpty) {
if (InfluxDBClient.maybeClient.isLeft) {
this.isEnabled = false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.workday.warp.common.utils.StackTraceFilter
import com.workday.warp.persistence.CorePersistenceAware
import com.workday.warp.persistence.TablesLike._
import com.workday.warp.persistence.Tables._
import org.influxdb.dto.{BatchPoints, Point, Pong}
import org.influxdb.dto.{BatchPoints, Point, Pong, Query, QueryResult}
import org.influxdb.{InfluxDB, InfluxDBFactory}
import org.pmw.tinylog.Logger

Expand All @@ -34,12 +34,12 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
* @param seriesName the series (also referred to as a measurement) to use for persistence.
*/
def persistHeapHistogram(histo: HeapHistogram, dbName: String, seriesName: String, warpTestName: String): Try[Unit] = {
InfluxDBClient.client match {
case None => Failure(new RuntimeException("unable to connect to influxdb"))
case Some(client) =>
InfluxDBClient.maybeClient match {
case Left(error) => Failure(new WarpConfigurationException(error))
case Right(client) =>
// create the database if necessary
if (!this.databaseExists(dbName).getOrElse(false)) {
client.createDatabase(dbName)
this.createDatabase(dbName)
}

// timestamp to use for all points in this batch
Expand Down Expand Up @@ -120,12 +120,12 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
testExecutions: Iterable[T],
threshold: Option[Duration] = None): Try[Unit] = {

InfluxDBClient.client match {
case None => Failure(new RuntimeException("unable to connect to influxdb"))
case Some(client) =>
InfluxDBClient.maybeClient match {
case Left(error) => Failure(new WarpConfigurationException(error))
case Right(client) =>
// create the database if necessary
if (!this.databaseExists(dbName).getOrElse(false)) {
client.createDatabase(dbName)
this.createDatabase(dbName)
}

val points = addPoints(dbName, seriesName, testExecutions, threshold)
Expand Down Expand Up @@ -157,9 +157,20 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
* @return true iff databaseName exists as a database in InfluxDB and we have a successful connection.
*/
def databaseExists(databaseName: String): Try[Boolean] = {
InfluxDBClient.client match {
case None => Failure(new WarpConfigurationException(InfluxDBClient.error))
case Some(client) => Try(client.describeDatabases.asScala.exists(_.equals(databaseName)))
val showQuery: Query = new Query("SHOW DATABASES", databaseName)
InfluxDBClient.maybeClient match {
case Left(error) => Failure(new WarpConfigurationException(error))
case Right(client) => Try {
val results: Seq[QueryResult.Result] = client.query(showQuery).getResults.asScala
val databaseNames: Seq[String] = for {
res <- results
serie <- res.getSeries.asScala
value <- serie.getValues.asScala
name <- value.asScala
} yield name.toString

databaseNames.exists(_.equals(databaseName))
}
}
}

Expand All @@ -169,10 +180,27 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
*
* @param database name of the database to delete
*/
def deleteDatabase(database: String): Try[Unit] = {
InfluxDBClient.client match {
case None => Failure(new WarpConfigurationException(InfluxDBClient.error))
case Some(client) => Try(client.deleteDatabase(database))
def dropDatabase(database: String): Try[Unit] = {
val dropQuery: Query = new Query(s"""DROP DATABASE "$database"""", database)

InfluxDBClient.maybeClient match {
case Left(error) => Failure(new WarpConfigurationException(error))
case Right(client) => Try(client.query(dropQuery))
}
}


/**
*
* @param database
* @return
*/
def createDatabase(database: String): Try[Unit] = {
val createQuery: Query = new Query(s"""CREATE DATABASE "$database"""", database)

InfluxDBClient.maybeClient match {
case Left(error) => Failure(new WarpConfigurationException(error))
case Right(client) => Try(client.query(createQuery))
}
}

Expand All @@ -181,9 +209,9 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
* @return a Pong object describing the deployed influxdb server
*/
def ping: Try[Pong] = {
InfluxDBClient.client match {
case None => Failure(new WarpConfigurationException(InfluxDBClient.error))
case Some(client) => Try(client.ping)
InfluxDBClient.maybeClient match {
case Left(error) => Failure(new WarpConfigurationException(error))
case Right(client) => Try(client.ping)
}
}
}
Expand All @@ -196,15 +224,8 @@ object InfluxDBClient {
private val user: String = WARP_INFLUXDB_USER.value
private val password: String = WARP_INFLUXDB_PASSWORD.value

/** [[Option]] containing an [[InfluxDB]]. Use this to write datapoints to influxdb. */
val client: Option[InfluxDB] = this.connect


/** a simple error message containing url, user, password we attempted to connect with. */
lazy val error: String = {
s"unable to connect to influxdb at ${this.url} using credentials (user = ${this.user}, password = ${this.password})"
}

/** [[Either]] containing an error message, or an [[InfluxDB]]. Use this to write datapoints to influxdb. */
val maybeClient: Either[String, InfluxDB] = this.connect(this.url, this.user, this.password)

/**
* Constructs a [[BatchPoints]].
Expand All @@ -222,14 +243,17 @@ object InfluxDBClient {


/** @return an InfluxDB connection based on the values set in WarpProperty. */
private[this] def connect: Option[InfluxDB] = {
val influx: InfluxDB = InfluxDBFactory.connect(this.url, this.user, this.password)
protected[influxdb] def connect(url: String, user: String, password: String): Either[String, InfluxDB] = {
val influx: InfluxDB = InfluxDBFactory.connect(url, user, password)

Try(influx.ping) match {
case Failure(exception) =>
Logger.warn(this.error, exception.getMessage)
None
case Success(_) => Option(influx)
val error: String =
s"unable to connect to influxdb at $url using credentials (user = $user, password = $password)"
Logger.warn(error, exception.getMessage)
Left(error)
case Success(_) =>
Right(influx)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.workday.warp.persistence.influxdb

import java.util.Date
import java.util.{Date, UUID}

import com.workday.warp.common.category.IntegrationTest
import com.workday.warp.common.heaphistogram.{HeapHistogram, HeapHistogramEntry}
import com.workday.warp.common.spec.WarpJUnitSpec
import com.workday.warp.persistence.{Connection, CorePersistenceAware}
import com.workday.warp.persistence.TablesLike.TestExecutionRowLike
import com.workday.warp.persistence.TablesLike.RowTypeClasses._
import org.influxdb.InfluxDB
import org.influxdb.dto.Pong
import org.junit.Test
import org.junit.experimental.categories.Category
Expand All @@ -20,15 +21,21 @@ import scala.util.Try
class InfluxDBClientSpec extends WarpJUnitSpec with CorePersistenceAware with InfluxDBClient {


@Test
@Category(Array(classOf[IntegrationTest]))
def failedConnection(): Unit = {
val maybeClient: Either[String, InfluxDB] = InfluxDBClient.connect("http://localhost:1234/bogus/", "dsjak", "sjk")
maybeClient.isLeft should be (true)
}


/** Checks that we can establish a connection to influxdb. */
@Test
@Category(Array(classOf[IntegrationTest]))
def testPing(): Unit = {
val ping: Try[Pong] = this.ping
ping.isSuccess should be (true)
ping.get.getVersion should not be "unknown"

InfluxDBClient.error should startWith ("unable to connect to influxdb at")
}


Expand All @@ -44,7 +51,7 @@ class InfluxDBClientSpec extends WarpJUnitSpec with CorePersistenceAware with In
val histo: HeapHistogram = new HeapHistogram(List(e1, e2, e3, e4))

this.persistHeapHistogram(histo, "testHeapHistograms", "testSeries", "com.workday.warp.test").get
this.deleteDatabase("testHeapHistograms").get
this.dropDatabase("testHeapHistograms").get
}


Expand All @@ -55,6 +62,23 @@ class InfluxDBClientSpec extends WarpJUnitSpec with CorePersistenceAware with In
Connection.refresh()
val testExecution: TestExecutionRowLike = this.persistenceUtils.createTestExecution(this.getTestId, new Date, 1.0, 1.5)
this.persistThreshold("testResponseTimes", "testResponseTimes", testExecution).get
this.deleteDatabase("testResponseTimes").get
this.dropDatabase("testResponseTimes").get
}


/** Checks that we can persist response times and thresholds. */
@Test
@Category(Array(classOf[IntegrationTest]))
def createDatabase(): Unit = {
val dbName: String = s"schema-${UUID.randomUUID().toString}"

val exists: Boolean = this.databaseExists(dbName).get
if (exists) {
this.dropDatabase(dbName).get
}

this.createDatabase(dbName).get
this.databaseExists(dbName).get should be (true)
this.dropDatabase(dbName).get
}
}

0 comments on commit 7b4f7ab

Please sign in to comment.