Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#201 add ehcache
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Stavetski committed Jun 8, 2018
1 parent ebb11c9 commit 3b76648
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ buildscript {
rxkotlinVersion = "2.2.0"
reactorVersion = "3.1.3.RELEASE"
cassandraVersion = "3.5.0"
ehcacheVersion = "3.4.0"

xchangeStreamCoreVersion = "4.3.2"
xchangeStreamVersion = "4.3.3-SNAPSHOT"
Expand Down Expand Up @@ -160,6 +161,8 @@ subprojects {

dependency("io.micrometer:micrometer-core:$micrometerVersion")
dependency("io.micrometer:micrometer-registry-prometheus:$micrometerVersion")

dependency("org.ehcache:ehcache:$ehcacheVersion")
}
}

Expand Down Expand Up @@ -284,7 +287,8 @@ project(":tickers-batch") {
compile project(":common-kafka")
compile project(":cassandra-service")

compile("org.springframework.boot:spring-boot-starter")
compile 'org.springframework.boot:spring-boot-starter'
compile 'org.ehcache:ehcache'

testCompile("org.springframework.boot:spring-boot-starter-test")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package fund.cyber.markets.ticker.configuration

import fund.cyber.markets.cassandra.model.CqlTokenTicker
import fund.cyber.markets.common.LAG_FROM_REAL_TIME_MIN
import fund.cyber.markets.common.LAG_FROM_REAL_TIME_MIN_DEFAULT
import fund.cyber.markets.common.MINUTES_TO_MILLIS
import fund.cyber.markets.common.WINDOW_INTERVALS_MIN
import fund.cyber.markets.common.WINDOW_INTERVALS_MIN_DEFAULT
import fund.cyber.markets.common.convert
import org.ehcache.Cache
import org.ehcache.CacheManager
import org.ehcache.config.builders.CacheConfigurationBuilder
import org.ehcache.config.builders.CacheManagerBuilder
import org.ehcache.config.builders.ResourcePoolsBuilder
import org.ehcache.config.units.MemoryUnit
import org.ehcache.core.spi.service.StatisticsService
import org.ehcache.impl.internal.statistics.DefaultStatisticsService
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

const val TICKERS_CACHE_NAME = "tickers"
const val TICKERS_CACHE_SIZE_GB = 5L

@Configuration
class TickersConfiguration(
@Value("\${$WINDOW_INTERVALS_MIN:$WINDOW_INTERVALS_MIN_DEFAULT}")
Expand All @@ -28,4 +40,27 @@ class TickersConfiguration(
@Bean
fun lagFromRealTime(): Long = lag convert MINUTES_TO_MILLIS

@Bean
fun tickerCache(cacheManager: CacheManager): Cache<String, MutableList<CqlTokenTicker>> {
return cacheManager.getCache(TICKERS_CACHE_NAME, String::class.java, MutableList::class.java as Class<MutableList<CqlTokenTicker>>)
}

@Bean
fun cacheStatisticsService() = DefaultStatisticsService()

@Bean
fun cacheManager(cacheStatisticsService: StatisticsService): CacheManager {

return CacheManagerBuilder.newCacheManagerBuilder()
.withCache(TICKERS_CACHE_NAME,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
String::class.java,
MutableList::class.java as Class<MutableList<CqlTokenTicker>>,
ResourcePoolsBuilder.newResourcePoolsBuilder().heap(TICKERS_CACHE_SIZE_GB, MemoryUnit.GB)
)
)
.using(cacheStatisticsService)
.build(true)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import fund.cyber.markets.common.Intervals
import fund.cyber.markets.common.MILLIS_TO_DAYS
import fund.cyber.markets.common.convert
import fund.cyber.markets.common.model.TokenTicker
import org.ehcache.Cache
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import java.util.*

@Service
class TickerService(
private val tickerRepository: TickerRepository
private val tickerRepository: TickerRepository,
private val cache: Cache<String, MutableList<CqlTokenTicker>>
) {
private val log = LoggerFactory.getLogger(TickerService::class.java)!!

Expand All @@ -34,8 +36,30 @@ class TickerService(
var timestampToVar = timestampFrom + Intervals.HOUR

while (timestampTo - timestampToVar >= Intervals.HOUR) {

tickers = tickers.mergeWith(
tickerRepository.find(symbol, epochDay, Date(timestampFromVar), Date(timestampToVar), interval)
Flux.defer {
val cachedValue = cache.get(cacheKey(symbol, timestampFromVar, timestampToVar, interval)) ?: mutableListOf()
when {
cachedValue.isNotEmpty() -> Flux.fromIterable(cachedValue)
else -> Flux.empty()
}
}.switchIfEmpty(
tickerRepository
.find(symbol, epochDay, Date(timestampFromVar), Date(timestampToVar), interval)
.map { ticker ->
val key = cacheKey(symbol, timestampFrom, timestampTo, interval)
val list = cache.get(key)

if (list == null) {
cache.put(key, mutableListOf(ticker))
} else {
list.add(ticker)
}

ticker
}
)
)
timestampFromVar += Intervals.HOUR
timestampToVar += Intervals.HOUR
Expand All @@ -50,4 +74,8 @@ class TickerService(
return tickers
}

private fun cacheKey(symbol: String, timestampFrom: Long, timestampTo: Long, interval: Long): String {
return "${symbol}_${timestampFrom}_${timestampTo}_$interval"
}

}

0 comments on commit 3b76648

Please sign in to comment.