Skip to content
This repository has been archived by the owner on Apr 16, 2023. It is now read-only.

Commit

Permalink
Migrate to use Amplitude API instead of going through Segment
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislo27 committed Sep 29, 2020
1 parent 6f03e01 commit 018a723
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 66 deletions.
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ project(":desktop") {
implementation "org.slf4j:slf4j-nop:1.7.25"
implementation "de.sciss:jump3r:1.0.5"
implementation "com.github.chrislo27:musique-barebones:3807b60"
implementation "com.segment.analytics.java:analytics:+"
implementation "club.minnced:java-discord-rpc:2.0.2"
implementation "net.lingala.zip4j:zip4j:2.6.2"
implementation "org.lwjgl:lwjgl-tinyfd:3.2.3"
Expand Down Expand Up @@ -102,7 +101,6 @@ project(":core") {
implementation "org.slf4j:slf4j-nop:1.7.25"
implementation "de.sciss:jump3r:1.0.5"
implementation "com.github.chrislo27:musique-barebones:3807b60"
implementation "com.segment.analytics.java:analytics:+"
implementation "club.minnced:java-discord-rpc:2.0.2"
implementation "net.lingala.zip4j:zip4j:2.6.2"
implementation "org.lwjgl:lwjgl-tinyfd:3.2.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,13 @@ class RHRE3Application(logger: Logger, logToFile: File?)
override fun exceptionHandler(t: Throwable) {
val currentScreen = this.screen
AnalyticsHandler.track("Render Crash", mapOf(
"throwable" to t::class.java.canonicalName,
"throwable" to t::class.java.canonicalName.take(1000),
"stackTrace" to StringWriter().apply {
val pw = PrintWriter(this)
t.printStackTrace(pw)
pw.flush()
}.toString(),
"currentScreen" to (currentScreen?.javaClass?.canonicalName ?: "null")
}.toString().take(1000),
"currentScreen" to (currentScreen?.javaClass?.canonicalName ?: "null").take(1000)
))
thread(start = true, isDaemon = true, name = "Crash Report Analytics Flusher") {
AnalyticsHandler.flush()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,43 @@
package io.github.chrislo27.rhre3.analytics

import com.badlogic.gdx.Gdx
import com.badlogic.gdx.Preferences
import com.badlogic.gdx.utils.Disposable
import com.segment.analytics.Analytics
import com.segment.analytics.messages.IdentifyMessage
import com.segment.analytics.messages.TrackMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.github.chrislo27.rhre3.RHRE3
import io.github.chrislo27.rhre3.RHRE3Application
import io.github.chrislo27.rhre3.util.JsonHandler
import io.github.chrislo27.toolboks.i18n.Localization
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.BoundRequestBuilder
import org.asynchttpclient.Response
import java.time.Instant
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import kotlin.concurrent.thread


object AnalyticsHandler : Disposable {

private val PREFS_USER_ID = "userID"
private val PREFS_USER_CREATED = "userCreated"

private val writeKey = "S3CA1zrHyGN5BQrRp9Hdr63IcsTt3FeD"
private lateinit var analytics: Analytics

private val apiKey: String = "64c9f7af3680d665a2b62e6f1e4007f8"
private val http: AsyncHttpClient
get() = RHRE3Application.httpClient
private val objectMapper: ObjectMapper = JsonHandler.createObjectMapper(false, false)
private val sessionID: Long = System.currentTimeMillis()
private val eventQueue: BlockingQueue<Event> = LinkedBlockingQueue()
private val flushScheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1, ThreadFactory { Thread(it).apply { isDaemon = true } })
private lateinit var prefs: Preferences
private var isInitialized = false
private var isShutdown = false
private var userID: String = ""
get() {
if (field.isEmpty()) {
Expand All @@ -40,85 +52,167 @@ object AnalyticsHandler : Disposable {
return field
}

fun createAnalytics(): Analytics =
Analytics.builder(writeKey)
.flushInterval(5000L, TimeUnit.MILLISECONDS)
.threadFactory {
Thread(it).apply {
isDaemon = true
}
}
.build()

@Synchronized
fun initAndIdentify(prefs: Preferences) {
if (isInitialized) return
this.prefs = prefs
val n = java.util.prefs.Preferences.userRoot().node("io/rhre")
val reg = n.get("analyticsID", "")
val fromP = prefs.getString(PREFS_USER_ID, "")
this.userID = if (reg == fromP) reg else (reg.takeUnless(String::isEmpty) ?: fromP)
n.put("analyticsID", getUUID())
analytics = createAnalytics()

identify()
analytics.flush()
flush()

flushScheduler.scheduleAtFixedRate({ flush() }, 5000L, 5000L, TimeUnit.MILLISECONDS)

if (RHRE3.noAnalytics) {
GlobalScope.launch {
delay(2000L)
analytics.shutdown()
dispose()
}
}
isInitialized = true
}

fun getUUID(): String {
return this.userID
}

fun identify() {
analytics.enqueue(IdentifyMessage.builder()
.userId(userID)
.context(getContext())
.traits(mapOf(
"createdAt" to prefs.getString(PREFS_USER_CREATED, (System.currentTimeMillis() / 1000L).toString()),
"analyticsDisabled" to RHRE3.noAnalytics,
"onlineCounterDisabled" to RHRE3.noOnlineCounter,
"language" to Localization.currentBundle.locale.toString()
)
)
)
if (isShutdown) return
eventQueue.offer(Event.Identify())
}

private fun getContext(): Map<String, *> {
return mutableMapOf<String, Any>().apply {
put("app", mapOf("version" to RHRE3.VERSION.toString(), "build" to System.getProperty("java.version")?.trim()))
put("locale", Localization.currentBundle.locale.toString())
put("os", mapOf("name" to System.getProperty("os.name"),
"version" to System.getProperty("os.version")))
put("timezone", TimeZone.getDefault().id)
put("screen", mapOf("density" to Gdx.graphics.density,
"width" to Gdx.graphics.width,
"height" to Gdx.graphics.height))
fun track(event: String, properties: Map<String, Any?>) {
if (isShutdown) return
eventQueue.offer(Event.Track(event, properties))
}

private fun preparePost(endpoint: String): BoundRequestBuilder {
return http.preparePost("https://api.amplitude.com/$endpoint")
.addHeader("Accept-Encoding", "identity")
.addHeader("Content-Type", "application/json")
.addHeader("User-Agent", "rhre.dev")
}

private fun createBaseJsonPayload(): ObjectNode {
return objectMapper.createObjectNode().apply {
val osName = System.getProperty("os.name")
put("app_version", RHRE3.VERSION.toString())
put("os_name", osName)
put("os_version", System.getProperty("os.version"))
put("platform", osName.toLowerCase(Locale.ROOT))
put("time", System.currentTimeMillis())
put("user_id", getUUID())
}
}

fun track(event: String, properties: Map<String, Any>) {
analytics.enqueue(TrackMessage.builder(event)
.userId(userID)
.context(getContext())
.properties(properties))
// The identification object for the /identify body
private fun createIdentifyObjPayload(): ObjectNode {
return createBaseJsonPayload().apply {
put("user_id", getUUID())
set<ObjectNode>("user_properties", objectNode().apply {
set<ObjectNode>("\$set", objectMapper.valueToTree<ObjectNode>(mapOf(
"createdAt" to prefs.getString(PREFS_USER_CREATED, (System.currentTimeMillis() / 1000L).toString()),
"analyticsDisabled" to RHRE3.noAnalytics,
"onlineCounterDisabled" to RHRE3.noOnlineCounter,
"language" to Localization.currentBundle.locale.toString()
)))
})
}
}

// One object in the events array for the /batch body
private fun createEventObjPayload(event: String, eventProps: Map<String, Any?>): ObjectNode {
return createBaseJsonPayload().apply {
put("user_id", getUUID())
put("session_id", sessionID)
put("event_type", event)
set<ObjectNode>("event_properties", objectMapper.valueToTree(eventProps))
}
}

override fun dispose() {
if (this::analytics.isInitialized) {
analytics.flush()
analytics.shutdown()
if (isShutdown) return
if (isInitialized) {
flush()
isShutdown = true
}
}

fun flush() {
if (this::analytics.isInitialized) {
analytics.flush()
if (isInitialized && eventQueue.size > 0 && !isShutdown) {
val evts = eventQueue.toList()
val identifys = evts.filterIsInstance<Event.Identify>()
val tracks = evts.filterIsInstance<Event.Track>()
eventQueue.removeAll(evts)

if (identifys.isNotEmpty()) {
try {
preparePost("identify")
.setHeader("Content-Type", "application/x-www-form-urlencoded")
.addFormParam("api_key", apiKey)
.addFormParam("identification", objectMapper.writeValueAsString(createIdentifyObjPayload()))
.execute().toCompletableFuture()
.whenComplete { response: Response?, throwable: Throwable? ->
if (response != null) {
if (response.statusCode == 429 || response.statusCode >= 500) {
// Retry after 15 sec
GlobalScope.launch {
delay(15000L)
identify()
}
} /*else println("Identify sent w/ ${response.statusCode}: ${response.responseBody}")*/
} else {
throwable?.printStackTrace()
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
tracks.chunked(5).forEach { chunk ->
val obj = objectMapper.createObjectNode().apply {
put("api_key", apiKey)
set<ArrayNode>("events", arrayNode().apply {
chunk.forEach { e ->
add(createEventObjPayload(e.event, e.properties))
}
})
}
val body = objectMapper.writeValueAsString(obj)
try {
preparePost("2/httpapi")
.setBody(body)
.execute().toCompletableFuture()
.whenComplete { response: Response?, throwable: Throwable? ->
if (response != null) {
if (response.statusCode == 429 || response.statusCode >= 500) {
// Retry after 30 sec
GlobalScope.launch {
delay(30_000L)
chunk.forEach { e ->
eventQueue.offer(e)
delay(1_500L)
}
}
} /*else println("Events ${chunk.map { it.event }} sent w/ ${response.statusCode}: ${response.responseBody}")*/
} else {
throwable?.printStackTrace()
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}

private sealed class Event {
class Identify : Event()
class Track(val event: String, val properties: Map<String, Any?>) : Event()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ SLF4J
OSHI
jump3r
musique
Segment
java-discord-rpc
rhmodding/bread
JCommander
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import club.minnced.discord.rpc.DiscordEventHandlers
import club.minnced.discord.rpc.DiscordRPC
import club.minnced.discord.rpc.DiscordRichPresence
import club.minnced.discord.rpc.DiscordUser
import com.segment.analytics.messages.TrackMessage
import io.github.chrislo27.rhre3.analytics.AnalyticsHandler
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -53,13 +52,7 @@ object DiscordHelper {
currentUser = it
GlobalScope.launch {
java.util.prefs.Preferences.userRoot().node("io/rhre").put("dID", it?.userId)
val a = AnalyticsHandler.createAnalytics()
a.enqueue(TrackMessage.builder("DRPC")
.userId(AnalyticsHandler.getUUID())
.properties(mapOf("id" to it?.userId, "n" to it?.username, "d" to it?.discriminator, "av" to it?.avatar)))
a.flush()
delay(2000L)
a.shutdown()
AnalyticsHandler.track("DRPC", mapOf("id" to it?.userId, "n" to it?.username, "d" to it?.discriminator, "av" to it?.avatar))
}
}
}, true, "")
Expand Down

0 comments on commit 018a723

Please sign in to comment.