Skip to content

Commit

Permalink
Merge branch 'release/2.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Birchall committed Oct 27, 2014
2 parents 0fbfc3d + c564564 commit 3ecbba1
Show file tree
Hide file tree
Showing 90 changed files with 1,294 additions and 678 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ before_script:
- psql -c "CREATE USER octoparts_app WITH PASSWORD '';" -U postgres
- psql -c "GRANT ALL PRIVILEGES ON DATABASE octoparts_test to octoparts_app;" -U postgres

script: "sbt coveralls"
script: "sbt coveralls test"
56 changes: 52 additions & 4 deletions app/com/m3/octoparts/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import _root_.controllers.ControllersModule
import com.kenshoo.play.metrics.MetricsFilter
import com.m3.octoparts.cache.CacheModule
import com.m3.octoparts.http.HttpModule
import com.m3.octoparts.hystrix.{ HystrixMetricsLogger, HystrixModule }
import com.m3.octoparts.hystrix.{ KeyAndBuilderValuesHystrixPropertiesStrategy, HystrixMetricsLogger, HystrixModule }
import com.m3.octoparts.logging.PartRequestLogger
import com.m3.octoparts.repository.RepositoriesModule
import com.beachape.logging.LTSVLogger
import com.m3.octoparts.repository.{ ConfigsRepository, RepositoriesModule }
import com.netflix.hystrix.strategy.HystrixPlugins
import com.typesafe.config.ConfigFactory
import com.wordnik.swagger.config.{ ConfigFactory => SwaggerConfigFactory }
import com.wordnik.swagger.model.ApiInfo
import org.apache.commons.lang3.StringUtils
import play.api._
import play.api.libs.concurrent.Akka
import play.api.mvc._
Expand All @@ -21,6 +24,7 @@ import scaldi.play.ScaldiSupport

import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._
import scala.util.control.NonFatal

object Global extends WithFilters(MetricsFilter) with ScaldiSupport {

Expand Down Expand Up @@ -73,27 +77,71 @@ object Global extends WithFilters(MetricsFilter) with ScaldiSupport {
case (_, env) => env
}
}
Logger.debug(s"Play environment = $playEnv (mode = $mode, application.env = ${config.getString("application.env")}). Loading extra config from application.$playEnv.conf, if it exists.")
LTSVLogger.debug("Play environment" -> playEnv, "mode" -> mode, "application.env" -> config.getString("application.env"), "message" -> "Loading extra config...")
val modeSpecificConfig = config ++ Configuration(ConfigFactory.load(s"application.$playEnv.conf"))
super.onLoadConfig(modeSpecificConfig, path, classloader, mode)
}

override def onStart(app: Application) = {
// Need to do this as early as possible, before Hystrix gets instantiated
setHystrixPropertiesStrategy(app)

super.onStart(app)

startPeriodicTasks(app)
checkForDodgyPartIds()
}

/**
* Register any tasks that should be run on the global Akka scheduler.
* These tasks will automatically stop running when the app shuts down.
*/
def startPeriodicTasks(implicit app: Application): Unit = {
private def startPeriodicTasks(implicit app: Application): Unit = {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

val hystrixLoggingInterval = app.configuration.underlying.getDuration("hystrix.logging.intervalMs", TimeUnit.MILLISECONDS).toInt.millis
Akka.system.scheduler.schedule(hystrixLoggingInterval, hystrixLoggingInterval) {
HystrixMetricsLogger.logHystrixMetrics()
}
}

/**
* Check if there are any registered parts with leading/trailing spaces in their partIds.
* Output warning logs if we find any, as they can be a nightmare to debug and are best avoided.
*/
private def checkForDodgyPartIds(): Unit = {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

val configsRepo = inject[ConfigsRepository]
for {
configs <- configsRepo.findAllConfigs()
config <- configs
} {
val trimmed = StringUtils.strip(config.partId)
if (trimmed != config.partId) {
LTSVLogger.warn("message" -> "This partId is suspicious - it has leading/trailing spaces", "partId" -> s"'${config.partId}'")
}
}
}

/**
* Tries to set the Hystrix properties strategy to [[KeyAndBuilderValuesHystrixPropertiesStrategy]]
*
* Resist the temptation to do a HystrixPlugins.getInstance().getPropertiesStrategy first to do
* checking, as that actually also sets the strategy if it isn't already set.
*/
private def setHystrixPropertiesStrategy(app: Application): Unit = {
// If it's defined, we don't need to set anything
if (sys.props.get("hystrix.plugin.HystrixPropertiesStrategy.implementation").isEmpty) {
LTSVLogger.info("-Dhystrix.plugin.HystrixPropertiesStrategy.implementation is not set. Defaulting to" -> "com.m3.octoparts.hystrix.KeyAndBuilderValuesHystrixPropertiesStrategy")
try {
HystrixPlugins.getInstance().registerPropertiesStrategy(new KeyAndBuilderValuesHystrixPropertiesStrategy)
} catch {
case NonFatal(e) =>
val currentStrategy = HystrixPlugins.getInstance().getPropertiesStrategy.getClass
LTSVLogger.info(e, "Current Hystrix Properties Strategy:" -> currentStrategy)
}
}
}

}
11 changes: 11 additions & 0 deletions app/com/m3/octoparts/OctopartsMetricsRegistry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.m3.octoparts

import com.codahale.metrics.SharedMetricRegistries
import play.api.Play

object OctopartsMetricsRegistry {
/**
* Same as [[com.kenshoo.play.metrics.MetricsRegistry.default]] when a Play app is running; uses the default name ("default") otherwise
*/
val default = SharedMetricRegistries.getOrCreate(Play.maybeApplication.flatMap(_.configuration.getString("metrics.name")).getOrElse("default"))
}
4 changes: 2 additions & 2 deletions app/com/m3/octoparts/aggregator/handler/Handler.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.m3.octoparts.aggregator.handler

import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config._
import com.m3.octoparts.model.config.ShortPartParam

import scala.concurrent.Future

Expand All @@ -16,7 +16,7 @@ import scala.concurrent.Future
*/
trait Handler {

type HandlerArguments = Map[ShortPartParam, String]
type HandlerArguments = Map[ShortPartParam, Seq[String]]

// Used primarily for creating a PartResponse, but also for logging purposes
def partId: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.m3.octoparts.aggregator.handler

import java.net.{ URI, URLEncoder }

import com.m3.octoparts.http.{ HttpResponse, _ }
import com.m3.octoparts.http._
import com.m3.octoparts.hystrix._
import com.m3.octoparts.model.{ HttpMethod, PartResponse }
import com.m3.octoparts.model.config._
Expand Down Expand Up @@ -67,10 +67,10 @@ trait HttpPartRequestHandler extends Handler {
def createBlockingHttpRetrieve(hArgs: HandlerArguments): BlockingHttpRetrieve = {
new BlockingHttpRetrieve {
val httpClient = handler.httpClient
val method = httpMethod
def method = httpMethod
val uri = new URI(buildUri(hArgs))
val maybeBody = hArgs.collectFirst {
case (p, v) if p.paramType == ParamType.Body => v
case (p, values) if p.paramType == ParamType.Body && values.nonEmpty => values.head
}
val headers = collectHeaders(hArgs)
}
Expand Down Expand Up @@ -106,10 +106,23 @@ trait HttpPartRequestHandler extends Handler {
* @return Map[String, String]
*/
def collectHeaders(hArgs: HandlerArguments): Seq[(String, String)] = {
hArgs.toSeq.collect {
case (p, v) if p.paramType == ParamType.Header => p.outputName -> v
case (p, v) if p.paramType == ParamType.Cookie => "Cookie" -> (escapeCookie(p.outputName) + "=" + escapeCookie(v))
// group Cookies. According to RFC 6265, at most one Cookie header may be sent.
val cookieHeadersElements = for {
(p, values) <- hArgs if p.paramType == ParamType.Cookie
cookieName = escapeCookie(p.outputName)
v <- values
} yield {
s"$cookieName=${escapeCookie(v)}"
}
val cookieHeaderValue = if (cookieHeadersElements.isEmpty) None else Some(cookieHeadersElements.mkString("; "))

// for other headers, no grouping is done. Note: duplicate headers are allowed!
cookieHeaderValue.map("Cookie" -> _).toSeq ++ (for {
(p, values) <- hArgs if p.paramType == ParamType.Header if p.outputName != "Cookie"
v <- values
} yield {
p.outputName -> v
})
}

/**
Expand All @@ -121,12 +134,17 @@ trait HttpPartRequestHandler extends Handler {
*/
private[handler] def buildUri(hArgs: HandlerArguments): Uri = {
val baseUri = interpolate(uriToInterpolate) { key =>
val ThePathParam = ShortPartParam(key, ParamType.Path)
val maybeParamsVal: Option[String] = hArgs.collectFirst {
case (p, v) if p.paramType == ParamType.Path && p.outputName == key => v
case (ThePathParam, v) if v.nonEmpty => v.head
}
maybeParamsVal.getOrElse("")
}
baseUri.addParams(hArgs.collect { case (p, v) if p.paramType == ParamType.Query => (p.outputName, v) }.toSeq)
val kvs = for {
(p, values) <- hArgs if p.paramType == ParamType.Query
v <- values
} yield p.outputName -> v
baseUri.addParams(kvs.toSeq)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,13 @@ trait PartRequestServiceBase extends RequestParamSupport with Logging {
* @return Future[PartResponse]
*/
def responseFor(pReq: PartRequestInfo): Future[PartResponse] = {
Option(pReq.partRequest.partId).map { partId =>
val fMaybeCi = repository.findConfigByPartId(partId)
fMaybeCi.flatMap {
_.fold {
unsupported(pReq)
} {
ci =>
val params = combineParams(ci.parameters, pReq)
processWithConfig(ci, pReq, params)
}
val fMaybeCi = repository.findConfigByPartId(pReq.partRequest.partId)
fMaybeCi.flatMap {
case Some(ci) => {
val params = combineParams(ci.parameters, pReq)
processWithConfig(ci, pReq, params)
}
}.getOrElse {
// sanity validation
Future.failed(new IllegalArgumentException("partId is missing"))
case None => unsupported(pReq)
}
}

Expand Down Expand Up @@ -87,7 +80,7 @@ trait PartRequestServiceBase extends RequestParamSupport with Logging {
* trait, but may be used for decorator purposes in Stackable traits.
* @return Future[PartResponse], which includes adding deprecation notices
*/
protected def processWithConfig(ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, String]): Future[PartResponse] = {
protected def processWithConfig(ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, Seq[String]]): Future[PartResponse] = {
val handler = handlerFactory.makeHandler(ci)
val fResp = handler.process(params)
fResp.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ import java.util.concurrent.RejectedExecutionException

import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.logging.{ LogUtil, PartRequestLogger }
import com.beachape.logging.LTSVLogger
import com.m3.octoparts.model.PartResponse
import com.netflix.hystrix.exception.HystrixRuntimeException
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType
import org.apache.http.conn.ConnectTimeoutException
import play.api.Logger
import skinny.util.LTSV

import scala.concurrent._
import scala.concurrent.duration._
Expand All @@ -26,47 +25,47 @@ trait PartServiceErrorHandler extends LogUtil {
private def logRejection(partRequestInfo: PartRequestInfo, aReqTimeout: Duration, message: String): PartResponse = {
val partId = partRequestInfo.partRequest.partId
val requestMeta = partRequestInfo.requestMeta
Logger.warn(LTSV.dump("Part" -> partId, "Execution rejected" -> message))
LTSVLogger.warn("Part" -> partId, "Execution rejected" -> message)
partRequestLogger.logTimeout(partId, requestMeta.id, requestMeta.serviceId, aReqTimeout.toMillis)
PartResponse(partId, partRequestInfo.partRequestId, errors = Seq(message))
}

private def logTimeout(partRequestInfo: PartRequestInfo, aReqTimeout: Duration, message: String): PartResponse = {
val partId = partRequestInfo.partRequest.partId
val requestMeta = partRequestInfo.requestMeta
Logger.warn(LTSV.dump("Part" -> partId, "Timed out" -> aReqTimeout.toString))
LTSVLogger.warn("Part" -> partId, "Timed out" -> aReqTimeout.toString)
partRequestLogger.logTimeout(partId, requestMeta.id, requestMeta.serviceId, aReqTimeout.toMillis)
PartResponse(partId, partRequestInfo.partRequestId, errors = Seq(message))
}

private def logInvalid(partRequestInfo: PartRequestInfo, duration: Duration, message: String): PartResponse = {
val partId = partRequestInfo.partRequest.partId
val requestMeta = partRequestInfo.requestMeta
Logger.warn(LTSV.dump("Part" -> partId, "Invalid" -> message))
LTSVLogger.warn("Part" -> partId, "Invalid" -> message)
partRequestLogger.logFailure(partId, requestMeta.id, requestMeta.serviceId, statusCode = None)
PartResponse(partId, partRequestInfo.partRequestId, errors = Seq(message))
}

private def logShortCircuit(partRequestInfo: PartRequestInfo, duration: Duration, message: String): PartResponse = {
val partId = partRequestInfo.partRequest.partId
val requestMeta = partRequestInfo.requestMeta
Logger.warn(LTSV.dump("Part" -> partId, "Hystrix" -> message))
LTSVLogger.warn("Part" -> partId, "Hystrix" -> message)
partRequestLogger.logFailure(partId, requestMeta.id, requestMeta.serviceId, statusCode = None)
PartResponse(partId, partRequestInfo.partRequestId, errors = Seq(message))
}

private def logIOException(partRequestInfo: PartRequestInfo, duration: Duration, io: Throwable) = {
val partId = partRequestInfo.partRequest.partId
val requestMeta = partRequestInfo.requestMeta
Logger.warn(LTSV.dump("Part" -> partId), io)
LTSVLogger.warn(io, "Part" -> partId)
partRequestLogger.logFailure(partId, requestMeta.id, requestMeta.serviceId, statusCode = None)
PartResponse(partId, partRequestInfo.partRequestId, errors = Seq(io.toString))
}

private def logOtherException(partRequestInfo: PartRequestInfo, duration: Duration, err: Throwable) = {
val partId = partRequestInfo.partRequest.partId
val requestMeta = partRequestInfo.requestMeta
Logger.error(LTSV.dump("Part" -> partId), err)
LTSVLogger.error(err, "Part" -> partId)
partRequestLogger.logFailure(partId, requestMeta.id, requestMeta.serviceId, statusCode = None)
PartResponse(partId, partRequestInfo.partRequestId, errors = Seq(err.toString))
}
Expand Down
21 changes: 9 additions & 12 deletions app/com/m3/octoparts/aggregator/service/PartsService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package com.m3.octoparts.aggregator.service
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.future.RichFutureWithTimeout._
import com.m3.octoparts.future.RichFutureWithTiming._
import com.m3.octoparts.future.{ RichFutureWithTimeout, RichFutureWithTiming }
import com.m3.octoparts.logging.{ LogUtil, PartRequestLogger }
import com.beachape.logging.LTSVLogger
import com.m3.octoparts.model.{ AggregateResponse, PartResponse, ResponseMeta, _ }
import play.api.Logger
import skinny.util.LTSV

import scala.concurrent._
import scala.concurrent.duration._
Expand All @@ -18,7 +16,7 @@ import scala.language.postfixOps
*/
class PartsService(partRequestService: PartRequestServiceBase,
val partRequestLogger: PartRequestLogger = PartRequestLogger,
maximumAggReqTimeout: Duration = 5 seconds)(implicit val executionContext: ExecutionContext)
maximumAggReqTimeout: Duration = 5.seconds)(implicit val executionContext: ExecutionContext)
extends PartServiceErrorHandler with LogUtil {

/**
Expand Down Expand Up @@ -51,7 +49,7 @@ class PartsService(partRequestService: PartRequestServiceBase,
.timeoutIn(aReqTimeout)
.time {
(partResponse, duration) =>
Logger.debug(LTSV.dump("Part" -> pReq.partId, "Response time" -> duration.toString, "From cache" -> partResponse.retrievedFromCache.toString))
LTSVLogger.debug("Part" -> pReq.partId, "Response time" -> toRelevantUnit(duration).toString, "From cache" -> partResponse.retrievedFromCache.toString)
logPartResponse(requestMeta, partResponse, duration.toMillis)
})
}
Expand All @@ -60,13 +58,12 @@ class PartsService(partRequestService: PartRequestServiceBase,
val responseMeta = ResponseMeta(requestMeta.id, duration)
val aggregateResponse = AggregateResponse(responseMeta, partsResponses)

if (Logger.isDebugEnabled) {
Logger.debug(LTSV.dump(
"Request Id" -> responseMeta.id,
"Num parts" -> aggregateRequest.requests.size.toString,
"aggregateRequest" -> truncateValue(aggregateRequest),
"aggregateResponse" -> truncateValue(aggregateResponse)))
}
LTSVLogger.debug(
"Request Id" -> responseMeta.id,
"Num parts" -> aggregateRequest.requests.size.toString,
"aggregateRequest" -> truncateValue(aggregateRequest),
"aggregateResponse" -> truncateValue(aggregateResponse))

aggregateResponse
}
}
Expand Down
Loading

0 comments on commit 3ecbba1

Please sign in to comment.