Skip to content

Commit

Permalink
Merge branch 'release/2.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
seratch committed Mar 2, 2015
2 parents 59d12a9 + 71550c9 commit 5f1da64
Show file tree
Hide file tree
Showing 115 changed files with 1,775 additions and 775 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ before_script:
- psql -c "CREATE USER octoparts_app WITH PASSWORD '';" -U postgres
- psql -c "GRANT ALL PRIVILEGES ON DATABASE octoparts_test to octoparts_app;" -U postgres

script: "sbt clean coverage test"
script: sbt clean coverage test && find $HOME/.sbt -name "*.lock" -type f -delete && find $HOME/.ivy2/cache -name "*[\[\]\(\)]*.properties" -type f -delete

after_success: "sbt coverageAggregate coveralls"
after_success: sbt coverageAggregate coveralls

sudo: false

Expand Down
8 changes: 6 additions & 2 deletions app/com/m3/octoparts/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import java.io.File
import java.util.concurrent.TimeUnit

import _root_.controllers.ControllersModule
import com.beachape.zipkin.ZipkinHeaderFilter
import com.beachape.zipkin.services.ZipkinServiceLike
import com.kenshoo.play.metrics.MetricsFilter
import com.m3.octoparts.cache.CacheModule
import com.m3.octoparts.http.HttpModule
Expand All @@ -12,6 +14,7 @@ import com.m3.octoparts.logging.PartRequestLogger
import com.beachape.logging.LTSVLogger
import com.m3.octoparts.repository.{ ConfigsRepository, RepositoriesModule }
import com.netflix.hystrix.strategy.HystrixPlugins
import com.twitter.zipkin.gen.Span
import com.typesafe.config.ConfigFactory
import com.wordnik.swagger.config.{ ConfigFactory => SwaggerConfigFactory }
import com.wordnik.swagger.model.ApiInfo
Expand All @@ -26,7 +29,7 @@ import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._
import scala.util.control.NonFatal

object Global extends WithFilters(MetricsFilter) with ScaldiSupport {
object Global extends WithFilters(ZipkinHeaderFilter(ZipkinServiceHolder.ZipkinService, r => s"${r.method} - ${r.path}"), MetricsFilter) with ScaldiSupport {

val info = ApiInfo(
title = "Octoparts",
Expand All @@ -48,6 +51,7 @@ object Global extends WithFilters(MetricsFilter) with ScaldiSupport {
new Module {
// Random stuff that doesn't belong in other modules
bind[PartRequestLogger] to PartRequestLogger
bind[ZipkinServiceLike] to ZipkinServiceHolder.ZipkinService
}

/**
Expand Down Expand Up @@ -111,7 +115,7 @@ object Global extends WithFilters(MetricsFilter) with ScaldiSupport {
*/
private def checkForDodgyPartIds(): Unit = {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

implicit val emptySpan = new Span() // empty span -> doesn't trace
val configsRepo = inject[ConfigsRepository]
for {
configs <- configsRepo.findAllConfigs()
Expand Down
42 changes: 42 additions & 0 deletions app/com/m3/octoparts/ZipkinServiceHolder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.m3.octoparts

import java.net.InetAddress

import com.beachape.zipkin.services.{ BraveZipkinService, NoopZipkinService, ZipkinServiceLike }
import com.github.kristofa.brave.zipkin.ZipkinSpanCollector
import play.api.{ Logger, Play }

object ZipkinServiceHolder {

private implicit val app = play.api.Play.current
private implicit val ex = play.api.libs.concurrent.Execution.Implicits.defaultContext

val ZipkinService: ZipkinServiceLike = if (Play.isTest) {
NoopZipkinService
} else {
val maybeService = for {
zipkinHost <- Play.configuration.getString("zipkin.host")
zipkinPort <- Play.configuration.getInt("zipkin.port")
env <- Play.configuration.getString("application.env")
} yield {
val zipkinSpanCollector = new ZipkinSpanCollector(zipkinHost, zipkinPort)
sys.addShutdownHook(zipkinSpanCollector.close())
val currentHostName = InetAddress.getLocalHost.getHostName
val currentRunningPort = Play.configuration.getInt("http.port").getOrElse(9000)
new BraveZipkinService(
currentHostName,
currentRunningPort, s"Octoparts - $env",
zipkinSpanCollector,
Seq(
!_.startsWith("OPTION"),
!_.startsWith("GET - /assets")
)
)
}
maybeService.getOrElse {
Logger.warn("Zipkin configs are missing in the current environment, falling back to NoopZipkinService")
NoopZipkinService
}
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.m3.octoparts.aggregator.handler

import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.http.HttpClientPool
import scaldi.Module

class AggregatorHandlersModule extends Module {

implicit val glueContext = play.api.libs.concurrent.Execution.Implicits.defaultContext

bind[HttpHandlerFactory] to new SimpleHttpHandlerFactory(inject[HttpClientPool])
bind[HttpHandlerFactory] to new SimpleHttpHandlerFactory(inject[HttpClientPool], inject[ZipkinServiceLike])

}
3 changes: 2 additions & 1 deletion app/com/m3/octoparts/aggregator/handler/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.m3.octoparts.aggregator.handler
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config.ShortPartParam
import com.twitter.zipkin.gen.Span

import scala.concurrent.Future

Expand All @@ -22,6 +23,6 @@ trait Handler {
// Used primarily for creating a PartResponse, but also for logging purposes
def partId: String

def process(partRequestInfo: PartRequestInfo, arguments: HandlerArguments): Future[PartResponse]
def process(partRequestInfo: PartRequestInfo, arguments: HandlerArguments)(implicit parentSpan: Span): Future[PartResponse]

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.m3.octoparts.aggregator.handler

import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.model.config.HttpPartConfig

trait HttpHandlerFactory {

/**
* For tracing Http request times
*/
implicit def zipkinService: ZipkinServiceLike

/**
* @param config a HttpCommandConfig entry
* @return a handler ready to be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package com.m3.octoparts.aggregator.handler

import java.net.{ URI, URLEncoder }

import com.beachape.zipkin.TracedFuture
import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.http._
import com.m3.octoparts.hystrix._
import com.m3.octoparts.model.{ HttpMethod, PartResponse }
import com.m3.octoparts.model.config._
import com.netaporter.uri.Uri
import com.netaporter.uri.dsl._
import com.twitter.zipkin.gen.Span

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.matching.Regex
Expand All @@ -22,6 +25,8 @@ trait HttpPartRequestHandler extends Handler {

implicit def executionContext: ExecutionContext

implicit def zipkinService: ZipkinServiceLike

def httpClient: HttpClientLike

def uriToInterpolate: String
Expand Down Expand Up @@ -52,29 +57,43 @@ trait HttpPartRequestHandler extends Handler {
* @param hArgs Preparsed HystrixArguments
* @return Future[PartResponse]
*/
def process(partRequestInfo: PartRequestInfo, hArgs: HandlerArguments): Future[PartResponse] = {
hystrixExecutor.future {
createBlockingHttpRetrieve(partRequestInfo, hArgs).retrieve()
}.map {
createPartResponse
def process(partRequestInfo: PartRequestInfo, hArgs: HandlerArguments)(implicit parentSpan: Span): Future[PartResponse] = {
TracedFuture(s"Http request for - ${partRequestInfo.partRequest.partId}") { maybeSpan =>
hystrixExecutor.future(
createBlockingHttpRetrieve(partRequestInfo, hArgs, maybeSpan).retrieve(),
maybeContents => HttpResponse(
status = 203, // 203 -> Non-authoritative info
body = maybeContents,
fromFallback = true,
message = "From fallback"
)
).map {
createPartResponse
}
}
}

/**
* Returns a BlockingHttpRetrieve command
*
* @param partRequestInfo the part request that spawned this HttpRetrieve
* @param hArgs Handler arguments
* @param tracingSpan [[Span]] generated for tracing; the details of this will be forwarded as headers
* @return a command object that will perform an HTTP request on demand
*/
def createBlockingHttpRetrieve(partRequestInfo: PartRequestInfo, hArgs: HandlerArguments): BlockingHttpRetrieve = {
def createBlockingHttpRetrieve(partRequestInfo: PartRequestInfo, hArgs: HandlerArguments, tracingSpan: Option[Span]): BlockingHttpRetrieve = {
new BlockingHttpRetrieve {
val httpClient = handler.httpClient
def method = httpMethod
val uri = new URI(buildUri(hArgs))
val maybeBody = hArgs.collectFirst {
case (p, values) if p.paramType == ParamType.Body && values.nonEmpty => values.head
}
val headers = collectHeaders(hArgs) ++ buildTracingHeaders(partRequestInfo)
val headers = {
collectHeaders(hArgs) ++
buildTracingHeaders(partRequestInfo) ++
tracingSpan.fold(Map.empty[String, String])(zipkinService.spanToIdsMap)
}
}
}

Expand All @@ -101,6 +120,7 @@ trait HttpPartRequestHandler extends Handler {
charset = httpResp.charset,
cacheControl = httpResp.cacheControl,
contents = httpResp.body,
retrievedFromLocalContents = httpResp.fromFallback,
errors = if (httpResp.status < 400 || additionalValidStatuses.contains(httpResp.status)) Nil else Seq(httpResp.message)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
package com.m3.octoparts.aggregator.handler

import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.http.HttpClientPool
import com.m3.octoparts.hystrix.HystrixExecutor
import com.m3.octoparts.model.config.HttpPartConfig

import scala.concurrent.ExecutionContext

class SimpleHttpHandlerFactory(httpClientPool: HttpClientPool)(
class SimpleHttpHandlerFactory(httpClientPool: HttpClientPool, implicit val zipkinService: ZipkinServiceLike)(
implicit executionContext: ExecutionContext) extends HttpHandlerFactory {

override def makeHandler(config: HttpPartConfig) = {
def makeHandler(config: HttpPartConfig) = {
// Get or create the HTTP client corresponding to this partId
val httpClient = httpClientPool.getOrCreate(config.partId)
val httpClient = httpClientPool.getOrCreate(HttpClientPool.HttpPartConfigClientKey(config))

new SimpleHttpPartRequestHandler(
config.partId,
httpClient,
config.uriToInterpolate,
config.method,
config.additionalValidStatuses,
config.parameters,
HystrixExecutor(config.hystrixConfigItem)
partId = config.partId,
httpClient = httpClient,
uriToInterpolate = config.uriToInterpolate,
httpMethod = config.method,
additionalValidStatuses = config.additionalValidStatuses,
registeredParams = config.parameters,
hystrixExecutor = HystrixExecutor(config)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.m3.octoparts.aggregator.handler

import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.http._
import com.m3.octoparts.hystrix._
import com.m3.octoparts.model.HttpMethod
Expand All @@ -22,5 +23,5 @@ class SimpleHttpPartRequestHandler(
val httpMethod: HttpMethod.Value = HttpMethod.Get,
val additionalValidStatuses: Set[Int] = Set.empty,
val registeredParams: Set[PartParam] = Set.empty,
val hystrixExecutor: HystrixExecutor)(implicit val executionContext: ExecutionContext)
val hystrixExecutor: HystrixExecutor)(implicit val executionContext: ExecutionContext, implicit val zipkinService: ZipkinServiceLike)
extends HttpPartRequestHandler
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.m3.octoparts.aggregator.service

import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.aggregator.handler.HttpHandlerFactory
import com.m3.octoparts.cache.{ CacheOps, PartResponseCachingSupport }
import com.m3.octoparts.logging.PartRequestLogger
Expand All @@ -16,7 +17,8 @@ class AggregatorServicesModule extends Module {

bind[PartRequestServiceBase] to new PartRequestService(
inject[ConfigsRepository],
inject[HttpHandlerFactory]
inject[HttpHandlerFactory],
inject[ZipkinServiceLike]
) with PartResponseCachingSupport with PartResponseLocalContentSupport {
val cacheOps = inject[CacheOps]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.m3.octoparts.aggregator.service

import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.aggregator.handler.HttpHandlerFactory
import com.m3.octoparts.repository.ConfigsRepository

Expand All @@ -10,5 +11,6 @@ import scala.concurrent.ExecutionContext
*/
class PartRequestService(
val repository: ConfigsRepository,
val handlerFactory: HttpHandlerFactory)(implicit val executionContext: ExecutionContext)
val handlerFactory: HttpHandlerFactory,
implicit val zipkinService: ZipkinServiceLike)(implicit val executionContext: ExecutionContext)
extends PartRequestServiceBase
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.m3.octoparts.aggregator.service

import com.beachape.logging.LTSVLogger
import com.beachape.zipkin.services.ZipkinServiceLike
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.aggregator.handler.HttpHandlerFactory
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config.{ HttpPartConfig, ShortPartParam }
import com.m3.octoparts.repository.ConfigsRepository
import com.twitter.zipkin.gen.Span

import scala.concurrent.{ ExecutionContext, Future }

Expand All @@ -16,8 +18,13 @@ import scala.concurrent.{ ExecutionContext, Future }
* children classes / decorators
*/
trait PartRequestServiceBase extends RequestParamSupport {

import com.beachape.zipkin.FutureEnrichment._

implicit def executionContext: ExecutionContext

protected implicit def zipkinService: ZipkinServiceLike

def repository: ConfigsRepository

def handlerFactory: HttpHandlerFactory
Expand All @@ -33,7 +40,7 @@ trait PartRequestServiceBase extends RequestParamSupport {
* @param pReq Part Request
* @return Future[PartResponse]
*/
def responseFor(pReq: PartRequestInfo): Future[PartResponse] = {
def responseFor(pReq: PartRequestInfo)(implicit parentSpan: Span): Future[PartResponse] = {
val fMaybeCi = repository.findConfigByPartId(pReq.partRequest.partId)
fMaybeCi.flatMap {
case Some(ci) => {
Expand Down Expand Up @@ -79,7 +86,7 @@ trait PartRequestServiceBase extends RequestParamSupport {
* trait, but may be used for decorator purposes in Stackable traits.
* @return Future[PartResponse], which includes adding deprecation notices
*/
protected def processWithConfig(ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, Seq[String]]): Future[PartResponse] = {
protected def processWithConfig(ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, Seq[String]])(implicit parentSpan: Span): Future[PartResponse] = {
val handler = handlerFactory.makeHandler(ci)
val fResp = handler.process(partRequestInfo, params)
fResp.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,32 @@ package com.m3.octoparts.aggregator.service
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config.{ HttpPartConfig, ShortPartParam }
import com.twitter.zipkin.gen.Span

import scala.concurrent.Future

trait PartResponseLocalContentSupport extends PartRequestServiceBase {

override def processWithConfig(ci: HttpPartConfig,
partRequestInfo: PartRequestInfo,
params: Map[ShortPartParam, Seq[String]]): Future[PartResponse] = {
params: Map[ShortPartParam, Seq[String]])(implicit parentSpan: Span): Future[PartResponse] = {
if (ci.localContentsEnabled) {
Future(createPartResponse(ci, partRequestInfo))
Future.successful(createPartResponse(ci, partRequestInfo))
} else {
super.processWithConfig(ci, partRequestInfo, params)
super.processWithConfig(ci, partRequestInfo, params).map { pr =>
if (pr.statusCode.contains(503) && ci.hystrixConfigItem.localContentsAsFallback)
createPartResponse(ci, partRequestInfo)
else
pr
}
}
}

private def createPartResponse(ci: HttpPartConfig,
partRequestInfo: PartRequestInfo) = PartResponse(
ci.partId,
id = partRequestInfo.partRequestId,
statusCode = Some(200),
statusCode = Some(203),
contents = ci.localContents,
retrievedFromLocalContents = true
)
Expand Down
Loading

0 comments on commit 5f1da64

Please sign in to comment.