Skip to content

Commit

Permalink
Merge branch 'release/2.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Birchall committed Nov 19, 2014
2 parents 3ecbba1 + fc822fe commit f41dcf3
Show file tree
Hide file tree
Showing 42 changed files with 834 additions and 516 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import com.m3.octoparts.aggregator.handler.HttpHandlerFactory
import com.m3.octoparts.cache.{ CacheOps, PartResponseCachingSupport }
import com.m3.octoparts.logging.PartRequestLogger
import com.m3.octoparts.repository.ConfigsRepository
import play.api.libs.concurrent.Akka
import scaldi.Module

import scala.concurrent.duration._
import play.api.Play.current

class AggregatorServicesModule extends Module {

implicit val glueContext = play.api.libs.concurrent.Execution.Implicits.defaultContext
implicit val partsServiceContext = Akka.system.dispatchers.lookup("contexts.parts-service")

bind[PartRequestServiceBase] to new PartRequestService(
inject[ConfigsRepository],
Expand Down
14 changes: 6 additions & 8 deletions app/com/m3/octoparts/aggregator/service/PartsService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import scala.language.postfixOps
*/
class PartsService(partRequestService: PartRequestServiceBase,
val partRequestLogger: PartRequestLogger = PartRequestLogger,
maximumAggReqTimeout: Duration = 5.seconds)(implicit val executionContext: ExecutionContext)
maximumAggReqTimeout: FiniteDuration = 5.seconds)(implicit val executionContext: ExecutionContext)
extends PartServiceErrorHandler with LogUtil {

import com.m3.octoparts.logging.LTSVables._

/**
* Given an AggregateRequest, returns a Future[AggregateResponse]
*
Expand All @@ -35,7 +37,7 @@ class PartsService(partRequestService: PartRequestServiceBase,
*/
def processParts(aggregateRequest: AggregateRequest, noCache: Boolean = false): Future[AggregateResponse] = {
val requestMeta = aggregateRequest.requestMeta
val aReqTimeout = requestMeta.timeout.getOrElse(maximumAggReqTimeout) min maximumAggReqTimeout
val aReqTimeout: FiniteDuration = requestMeta.timeout.getOrElse(maximumAggReqTimeout) min maximumAggReqTimeout
val partsResponsesFutures = aggregateRequest.requests.map {
pReq =>
val partRequestInfo = PartRequestInfo(requestMeta, pReq, noCache)
Expand All @@ -49,7 +51,7 @@ class PartsService(partRequestService: PartRequestServiceBase,
.timeoutIn(aReqTimeout)
.time {
(partResponse, duration) =>
LTSVLogger.debug("Part" -> pReq.partId, "Response time" -> toRelevantUnit(duration).toString, "From cache" -> partResponse.retrievedFromCache.toString)
LTSVLogger.debug((requestMeta, partResponse), "Time taken" -> toRelevantUnit(duration))
logPartResponse(requestMeta, partResponse, duration.toMillis)
})
}
Expand All @@ -58,11 +60,7 @@ class PartsService(partRequestService: PartRequestServiceBase,
val responseMeta = ResponseMeta(requestMeta.id, duration)
val aggregateResponse = AggregateResponse(responseMeta, partsResponses)

LTSVLogger.debug(
"Request Id" -> responseMeta.id,
"Num parts" -> aggregateRequest.requests.size.toString,
"aggregateRequest" -> truncateValue(aggregateRequest),
"aggregateResponse" -> truncateValue(aggregateResponse))
LTSVLogger.debug((aggregateRequest, aggregateResponse))

aggregateResponse
}
Expand Down
5 changes: 3 additions & 2 deletions app/com/m3/octoparts/cache/CacheModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.m3.octoparts.cache.memcached._
import com.m3.octoparts.cache.dummy.{ DummyCache, DummyRawCache, DummyCacheOps, DummyLatestVersionCache }
import com.m3.octoparts.cache.key.MemcachedKeyGenerator
import com.m3.octoparts.cache.versioning.{ InMemoryLatestVersionCache, LatestVersionCache }
import play.api.Configuration
import play.api.{ Play, Configuration }
import scaldi.{ Condition, Module }
import shade.memcached.{ AuthConfiguration, Memcached, Protocol, Configuration => ShadeConfig }

Expand Down Expand Up @@ -52,7 +52,8 @@ class CacheModule extends Module {
}

when(cachingEnabled) {
bind[LatestVersionCache] to new InMemoryLatestVersionCache
lazy val maxInMemoryLVCKeys = inject[Configuration].getLong("caching.versionCachingSize").getOrElse(100000L)
bind[LatestVersionCache] to new InMemoryLatestVersionCache(maxInMemoryLVCKeys)
when(useInMemoryCache) {
bind[RawCache] to new InMemoryRawCache()(cacheExecutor) destroyWith (_.close())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package com.m3.octoparts.cache.versioning

import scala.collection.concurrent.TrieMap
import com.google.common.cache.{ Cache, CacheBuilder }

/**
* A simple implementation of [[LatestVersionCache]] that holds the latest versions as Maps
*/
class InMemoryLatestVersionCache extends LatestVersionCache {
class InMemoryLatestVersionCache(maxCacheKeys: Long) extends LatestVersionCache {

import com.m3.octoparts.cache.versioning.LatestVersionCache._

private val partVersions = new TrieMap[PartId, Version]
private val paramVersions = new TrieMap[VersionedParamKey, Version]
private[versioning] val partVersions = configureMemoryCache(CacheBuilder.newBuilder()).build[PartId, java.lang.Long]()
private[versioning] val paramVersions = configureMemoryCache(CacheBuilder.newBuilder()).build[VersionedParamKey, java.lang.Long]()

override def updatePartVersion(partId: PartId, version: Version): Unit = {
def updatePartVersion(partId: PartId, version: Version): Unit = {
partVersions.put(partId, version)
}

override def updateParamVersion(versionedParamKey: VersionedParamKey, version: Version): Unit = {
def updateParamVersion(versionedParamKey: VersionedParamKey, version: Version): Unit = {
paramVersions.put(versionedParamKey, version)
}

override def getPartVersion(partId: PartId) = partVersions.get(partId)
def getPartVersion(partId: PartId) = Option(partVersions.getIfPresent(partId)).map(Long.unbox)

override def getParamVersion(versionedParamKey: VersionedParamKey) = paramVersions.get(versionedParamKey)
def getParamVersion(versionedParamKey: VersionedParamKey) = Option(paramVersions.getIfPresent(versionedParamKey)).map(Long.unbox)

private def configureMemoryCache(builder: CacheBuilder[Object, Object]): CacheBuilder[Object, Object] = {
builder.maximumSize(maxCacheKeys)
}
}
37 changes: 17 additions & 20 deletions app/com/m3/octoparts/future/RichFutureWithTimeout.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
package com.m3.octoparts.future

import java.util.concurrent.{ Executors, TimeUnit }

import akka.actor.ActorSystem
import com.google.common.util.concurrent.ThreadFactoryBuilder
import play.api.libs.concurrent.Akka
import play.api.{ Mode, Play }

import scala.concurrent.{ Future, ExecutionContext, Promise, TimeoutException }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise, TimeoutException }
import scala.util.control.NonFatal

object RichFutureWithTimeout {

private val actorSystem = ActorSystem("FutureTimeoutSystem")
private val actorSystem = Play.maybeApplication.fold(ActorSystem("future-timeout-actor-system"))(Akka.system(_))

/*
Using a CachedThreadPool because this is for spawning many short-lived futures that just block
until the timeout time is over
*/
private val timeoutEC = {
val namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("future-timeout-%d").build()
ExecutionContext.fromExecutor(Executors.newCachedThreadPool(namedThreadFactory))
* this execution context is dedicated to scheduling future timeouts
**/
private val timeoutEC = try {
actorSystem.dispatchers.lookup("contexts.future-timeout")
} catch {
// for tests
case NonFatal(e) if Play.maybeApplication.fold(false)(_.mode == Mode.Test) => actorSystem.dispatcher
}

/**
* Rich Future with timeout support on an individual Future basis
*
* Extends from AnyVal for zero run-time conversion penalties.
* Should really be instantiated via implicit conversion from a Future instead.
*/
implicit class RichFutureWithTimeoutOps[A](val f: Future[A]) extends AnyVal {

/**
Expand All @@ -37,9 +32,11 @@ object RichFutureWithTimeout {
* @param timeout Duration until the Future is timed out
* @return a Future[A]
*/
def timeoutIn(timeout: Duration): Future[A] = {
def timeoutIn(timeout: FiniteDuration): Future[A] = {
val p = Promise[A]()
val cancellable = actorSystem.scheduler.scheduleOnce(FiniteDuration(timeout.toMillis, TimeUnit.MILLISECONDS)) {

// default values for the scheduler (10ms tick, 512 wheel size) is fine
val cancellable = actorSystem.scheduler.scheduleOnce(timeout) {
p.tryFailure(new TimeoutException(s"Timed out after $timeout"))
}(timeoutEC)
f.onComplete { r =>
Expand Down
4 changes: 2 additions & 2 deletions app/com/m3/octoparts/future/RichFutureWithTiming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object RichFutureWithTiming {
*
* @param tapper a callback that uses the future's result and time taken, e.g. for performance logging
*/
def time(tapper: (A, Duration) => Unit)(implicit executionContext: ExecutionContext): Future[A] = {
def time(tapper: (A, FiniteDuration) => Unit)(implicit executionContext: ExecutionContext): Future[A] = {
val startNanos = System.nanoTime()
future.onSuccess {
case r => tapper(r, Duration.fromNanos(System.nanoTime() - startNanos))
Expand All @@ -31,7 +31,7 @@ object RichFutureWithTiming {
*
* @param f a callback that takes the future's result and time taken, and transforms them into some other result
*/
def timeAndTransform[B](f: (A, Duration) => B)(implicit executionContext: ExecutionContext): Future[B] = {
def timeAndTransform[B](f: (A, FiniteDuration) => B)(implicit executionContext: ExecutionContext): Future[B] = {
val startNanos = System.nanoTime()
future.map {
r =>
Expand Down
36 changes: 30 additions & 6 deletions app/com/m3/octoparts/http/InstrumentedHttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import java.io.Closeable
import java.nio.charset.{ Charset, StandardCharsets }

import com.beachape.logging.LTSVLogger
import com.codahale.metrics.httpclient._
import com.codahale.metrics.{ Gauge, MetricRegistry }
import com.m3.octoparts.OctopartsMetricsRegistry
import com.m3.octoparts.util.TimingSupport
import org.apache.http.{ HttpClientConnection, HttpRequest }
import org.apache.http.client.HttpClient
import org.apache.http.client.config.{ CookieSpecs, RequestConfig }
import org.apache.http.client.methods.HttpUriRequest
import org.apache.http.conn.HttpClientConnectionManager
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.http.pool.PoolStats
import org.apache.http.protocol.{ HttpContext, HttpRequestExecutor }
import skinny.logging.Logging
import skinny.util.LTSV

Expand Down Expand Up @@ -41,6 +43,7 @@ class InstrumentedHttpClient(
import InstrumentedHttpClient._

private[http] val connectionManager = new InstrumentedHttpClientConnectionMgr
private val requestExecutor = new InstrumentedHttpRequestExecutor

// the underlying Apache HTTP client
private val httpClient = {
Expand All @@ -54,7 +57,7 @@ class InstrumentedHttpClient(

HttpClientBuilder
.create
.setRequestExecutor(InstrumentedRequestExecutor)
.setRequestExecutor(requestExecutor)
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(clientConfig)
.build
Expand All @@ -81,15 +84,15 @@ class InstrumentedHttpClient(
}
}

private[http] def registryName(key: String) = MetricRegistry.name(classOf[HttpClientConnectionManager], name, key)

/**
* A [[PoolingHttpClientConnectionManager]] which monitors the number of open connections.
*/
private[http] class InstrumentedHttpClientConnectionMgr extends PoolingHttpClientConnectionManager {
setDefaultMaxPerRoute(connectionPoolSize)
setMaxTotal(connectionPoolSize)

private[http] def registryName(key: String) = MetricRegistry.name(classOf[HttpClientConnectionManager], name, key)

gauges.foreach {
case (key, f) =>
val gaugeName = registryName(key)
Expand All @@ -110,11 +113,32 @@ class InstrumentedHttpClient(
}
}

def close() = httpClient.close()
private[http] class InstrumentedHttpRequestExecutor extends HttpRequestExecutor with Closeable {

private val registryName = MetricRegistry.name(classOf[HttpClient], name)

override def execute(request: HttpRequest, conn: HttpClientConnection, context: HttpContext) = {
val timerContext = OctopartsMetricsRegistry.default.timer(registryName).time
try {
super.execute(request, conn, context)
} finally {
timerContext.stop
}
}

def close() = OctopartsMetricsRegistry.default.remove(registryName)
}

def close() = {
try {
httpClient.close()
} finally {
requestExecutor.close()
}
}
}

private[http] object InstrumentedHttpClient {
private val InstrumentedRequestExecutor = new InstrumentedHttpRequestExecutor(OctopartsMetricsRegistry.default, HttpClientMetricNameStrategies.QUERYLESS_URL_AND_METHOD)

val gauges: Map[String, PoolStats => Int] = Map(
"available-connections" -> { ps: PoolStats => ps.getAvailable },
Expand Down
2 changes: 1 addition & 1 deletion app/com/m3/octoparts/hystrix/HystrixMetricsLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import play.api.Logger
*/
object HystrixMetricsLogger extends HystrixCommandMetricsRepository with LTSVLoggerLike {

val underlying = Logger("HystrixMetrics").underlyingLogger
val underlyingLogger = Logger("HystrixMetrics").underlyingLogger

/**
* Collect metrics on all registered Hystrix commands and write them to a log file.
Expand Down
56 changes: 56 additions & 0 deletions app/com/m3/octoparts/logging/LTSVables.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.m3.octoparts.logging

import com.beachape.logging.LTSVable
import com.m3.octoparts.model._

object LTSVables extends LogUtil {

implicit val requestMetaLTSVable: LTSVable[RequestMeta] = new LTSVable[RequestMeta] {
def toPairs(o: RequestMeta): Seq[(String, Any)] = Seq(
"RequestMeta id" -> o.id,
"RequestMeta requestUrl" -> o.requestUrl,
"RequestMeta timeout" -> o.timeout.map(toRelevantUnit(_)),
"RequestMeta serviceId" -> o.serviceId,
"RequestMeta sessionId" -> o.sessionId,
"RequestMeta" -> o
)
}

implicit val aggregateRequestLTSVable: LTSVable[AggregateRequest] = new LTSVable[AggregateRequest] {
def toPairs(o: AggregateRequest): Seq[(String, Any)] =
requestMetaLTSVable.toPairs(o.requestMeta) :+ "Requests length" -> o.requests.length
}

implicit val responseMetaLTSVable: LTSVable[ResponseMeta] = new LTSVable[ResponseMeta] {
def toPairs(o: ResponseMeta): Seq[(String, Any)] = Seq(
"ResponseMeta id" -> o.id,
"ResponseMeta processTime" -> toRelevantUnit(o.processTime),
"ResponseMeta" -> o
)
}

implicit val aggregateResponseLTSVable: LTSVable[AggregateResponse] = new LTSVable[AggregateResponse] {
def toPairs(o: AggregateResponse): Seq[(String, Any)] =
responseMetaLTSVable.toPairs(o.responseMeta) :+ "Responses length" -> o.responses.length
}

implicit val partResponseLTSVable: LTSVable[PartResponse] = new LTSVable[PartResponse] {
def toPairs(o: PartResponse): Seq[(String, Any)] = Seq(
"PartResponse id" -> o.id,
"PartResponse partId" -> o.partId,
"PartResponse errors" -> o.errors,
"PartResponse warnings" -> o.warnings,
"PartResponse statusCode" -> o.statusCode,
"PartResponse cacheControl" -> o.cacheControl,
"PartResponse retrievedFromCache" -> o.retrievedFromCache
)
}

/**
* If we need more, add more of these
*/
implicit def pairsLTSVable[A: LTSVable, B: LTSVable]: LTSVable[(A, B)] = new LTSVable[(A, B)] {
def toPairs(o: (A, B)): Seq[(String, Any)] = implicitly[LTSVable[A]].toPairs(o._1) ++ implicitly[LTSVable[B]].toPairs(o._2)
}

}
2 changes: 1 addition & 1 deletion app/com/m3/octoparts/logging/PartRequestLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ trait PartRequestLogger {

object PartRequestLogger extends PartRequestLogger with LTSVLoggerLike {

val underlying = Logger("PartRequests").underlyingLogger
val underlyingLogger = Logger("PartRequests").underlyingLogger

def logSuccess(partId: String, parentRequestId: String, serviceId: Option[String], cacheHit: Boolean, responseMs: Long): Unit = {
val hitOrMiss = if (cacheHit) "hit" else "miss"
Expand Down
4 changes: 2 additions & 2 deletions app/com/m3/octoparts/model/config/HttpPartConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ case class HttpPartConfig(id: Option[Long] = None, // None means that the record
hystrixConfig: Option[HystrixConfig] = None,
deprecatedInFavourOf: Option[String] = None,
cacheGroups: Set[CacheGroup] = Set.empty,
cacheTtl: Option[Duration] = Some(Duration.Zero), // in seconds
cacheTtl: Option[FiniteDuration] = Some(Duration.Zero), // in seconds
alertMailsEnabled: Boolean,
alertAbsoluteThreshold: Option[Int],
alertPercentThreshold: Option[Double],
alertInterval: Duration,
alertInterval: FiniteDuration,
alertMailRecipients: Option[String],
createdAt: DateTime,
updatedAt: DateTime) extends ConfigModel[HttpPartConfig] {
Expand Down
4 changes: 2 additions & 2 deletions app/controllers/AdminController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,10 +612,10 @@ object AdminController {
case (Some(beforeTtl), Some(afterTtl)) if beforeTtl > afterTtl => true
case _ => false
})
(hasChangedOn(_.uriToInterpolate) ||
hasChangedOn(_.uriToInterpolate) ||
hasChangedOn(_.method) ||
hasChangedOn(_.additionalValidStatuses) ||
cacheTTLReduced)
cacheTTLReduced
}

def shouldBustCache(param: PartParam): Boolean = {
Expand Down
Loading

0 comments on commit f41dcf3

Please sign in to comment.