Skip to content

Commit

Permalink
add flush
Browse files Browse the repository at this point in the history
  • Loading branch information
marandaneto committed Sep 14, 2023
1 parent 2a2d767 commit ec3b5a0
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ fun Greeting(name: String, modifier: Modifier = Modifier) {
text = AnnotatedString("Hello $name!"),
modifier = modifier,
onClick = {
// PostHog.capture("testEvent", mapOf("testProperty" to "testValue"))
PostHog.capture("testEvent", mapOf("testProperty" to "testValue"))
// PostHog.reloadFeatureFlagsRequest()
// sessionRecording
PostHog.isFeatureEnabled("sessionRecording")
// PostHog.isFeatureEnabled("sessionRecording")
PostHog.flush()
},
)
}
Expand Down
8 changes: 8 additions & 0 deletions posthog-v3/posthog/api/posthog.api
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ public final class com/posthog/PostHog {
public final fun capture (Ljava/lang/String;Ljava/util/Map;)V
public static synthetic fun capture$default (Lcom/posthog/PostHog;Ljava/lang/String;Ljava/util/Map;ILjava/lang/Object;)V
public final fun close ()V
public final fun flush ()V
public final fun getAnonymousId ()Ljava/lang/String;
public final fun getDistinctId ()Ljava/lang/String;
public final fun getFeatureFlag (Ljava/lang/String;Ljava/lang/Object;)Ljava/lang/Object;
public static synthetic fun getFeatureFlag$default (Lcom/posthog/PostHog;Ljava/lang/String;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
public final fun identify (Ljava/lang/String;Ljava/util/Map;)V
public static synthetic fun identify$default (Lcom/posthog/PostHog;Ljava/lang/String;Ljava/util/Map;ILjava/lang/Object;)V
public final fun isFeatureEnabled (Ljava/lang/String;Z)Z
public static synthetic fun isFeatureEnabled$default (Lcom/posthog/PostHog;Ljava/lang/String;ZILjava/lang/Object;)Z
public final fun reloadFeatureFlagsRequest ()V
Expand All @@ -20,6 +25,9 @@ public final class com/posthog/PostHog$Companion {
public final fun capture (Ljava/lang/String;Ljava/util/Map;)V
public static synthetic fun capture$default (Lcom/posthog/PostHog$Companion;Ljava/lang/String;Ljava/util/Map;ILjava/lang/Object;)V
public final fun close ()V
public final fun flush ()V
public final fun getFeatureFlag (Ljava/lang/String;Ljava/lang/Object;)Ljava/lang/Object;
public static synthetic fun getFeatureFlag$default (Lcom/posthog/PostHog$Companion;Ljava/lang/String;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
public final fun isFeatureEnabled (Ljava/lang/String;Z)Z
public static synthetic fun isFeatureEnabled$default (Lcom/posthog/PostHog$Companion;Ljava/lang/String;ZILjava/lang/Object;)Z
public final fun reloadFeatureFlagsRequest ()V
Expand Down
46 changes: 46 additions & 0 deletions posthog-v3/posthog/src/main/java/com/posthog/PostHog.kt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class PostHog {
}

// distinctId is always present but it has to be nullable because the SDK may be disabled
// TODO: missing static, dynamic context
distinctId?.let {
props["distinct_id"] = it
}
Expand Down Expand Up @@ -130,6 +131,29 @@ public class PostHog {
capture("\$create_alias", properties = props)
}

public fun identify(distinctId: String, properties: Map<String, Any>? = null) {
if (!isEnabled()) {
return
}

// TODO: reset feature flags, set anonymousId and distinctId
// val oldDistinctId = this.distinctId

val props = mutableMapOf<String, Any>()
props["distinct_id"] = distinctId
anonymousId?.let {
props["\$anon_distinct_id"] = it
}
properties?.let {
// Should $set be its own data class?
props["\$set"] = it
}

// TODO: does $set_once still exist?

capture("\$identify", properties = props)
}

public fun reloadFeatureFlagsRequest() {
if (!isEnabled()) {
return
Expand All @@ -149,6 +173,20 @@ public class PostHog {
return featureFlags?.isFeatureEnabled(key, defaultValue) ?: defaultValue
}

public fun getFeatureFlag(key: String, defaultValue: Any? = null): Any? {
if (!isEnabled()) {
return defaultValue
}
return featureFlags?.getFeatureFlag(key, defaultValue) ?: defaultValue
}

public fun flush() {
if (!isEnabled()) {
return
}
queue?.flush()
}

// TODO: groups, groupIdentify, group, feature flags, buildProperties (static context, dynamic context, distinct_id)

private fun isEnabled(): Boolean {
Expand Down Expand Up @@ -188,6 +226,14 @@ public class PostHog {
return shared.isFeatureEnabled(key, defaultValue = defaultValue)
}

public fun getFeatureFlag(key: String, defaultValue: Any? = null): Any? {
return shared.getFeatureFlag(key, defaultValue = defaultValue)
}

public fun flush() {
shared.flush()
}

// TODO: add other methods
}
}
27 changes: 12 additions & 15 deletions posthog-v3/posthog/src/main/java/com/posthog/internal/PostHogApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,7 @@ internal class PostHogApi(private val config: PostHogConfig) {
// "timestamp": "2023-09-13T12:05:30.326Z"
// }
// """.trimIndent()

val body = json.toRequestBody(mediaType)

val request = Request.Builder()
.url("${config.host}/batch")
.header("User-Agent", config.userAgent)
.post(body)
.build()
val request = makeRequest(json, "${config.host}/batch")

client.newCall(request).execute().use {
if (!it.isSuccessful) throw PostHogApiError(it.code, it.message, body = it.body)
Expand All @@ -60,6 +53,16 @@ internal class PostHogApi(private val config: PostHogConfig) {
}
}

private fun makeRequest(json: String, url: String): Request {
val body = json.toRequestBody(mediaType)

return Request.Builder()
.url(url)
.header("User-Agent", config.userAgent)
.post(body)
.build()
}

fun decide(properties: Map<String, Any>): Map<String, Any>? {
val map = mutableMapOf<String, Any>()
map.putAll(properties)
Expand All @@ -72,13 +75,7 @@ internal class PostHogApi(private val config: PostHogConfig) {
// "api_key": "_6SG-F7I1vCuZ-HdJL3VZQqjBlaSb1_20hDPwqMNnGI"
// }
// """.trimIndent()
val body = json.toRequestBody(mediaType)

val request = Request.Builder()
.url("${config.host}/decide/?v=3")
.header("User-Agent", config.userAgent)
.post(body)
.build()
val request = makeRequest(json, "${config.host}/decide/?v=3")

client.newCall(request).execute().use {
if (!it.isSuccessful) throw PostHogApiError(it.code, it.message, body = it.body)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ internal class PostHogFeatureFlags(private val config: PostHogConfig, private va
}
}

fun isFeatureEnabled(key: String, defaultValue: Boolean = false): Boolean {
fun isFeatureEnabled(key: String, defaultValue: Boolean): Boolean {
if (!isFeatureFlagsLoaded) {
return defaultValue
}
Expand All @@ -55,4 +55,17 @@ internal class PostHogFeatureFlags(private val config: PostHogConfig, private va
defaultValue
}
}

fun getFeatureFlag(key: String, defaultValue: Any?): Any? {
if (!isFeatureFlagsLoaded) {
return defaultValue
}
val value: Any?

synchronized(featureFlagsLock) {
value = featureFlags?.get(key)
}

return value ?: defaultValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora

private fun flushIfOverThreshold() {
if (deque.size >= config.flushAt) {
flush()
flushBatch()
}
}

private fun canFlush(): Boolean {
private fun canFlushBatch(): Boolean {
if (pausedUntil?.after(Date()) == true) {
config.logger?.log("Queue is paused until $pausedUntil")
return false
Expand All @@ -69,8 +69,16 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
return true
}

private fun flush() {
if (!canFlush()) {
private fun takeEvents(): List<PostHogEvent> {
val events: List<PostHogEvent>
synchronized(dequeLock) {
events = deque.take(config.maxBatchSize)
}
return events
}

private fun flushBatch() {
if (!canFlushBatch()) {
config.logger?.log("Cannot flush the Queue.")
return
}
Expand All @@ -80,17 +88,40 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
return
}

val events: List<PostHogEvent>
executor.execute {
try {
batchEvents()
} catch (e: Throwable) {
// TODO: retry?
config.logger?.log("Flushing failed: $e")
}

pausedUntil = calculatePausedUntil()

isFlushing.set(false)
}
}

private fun batchEvents() {
val events = takeEvents()

api.batch(events)

synchronized(dequeLock) {
events = deque.take(config.maxBatchSize)
deque.removeAll(events)
}
}

fun flush() {
if (isFlushing.getAndSet(true)) {
config.logger?.log("Queue is flushing.")
return
}

executor.execute {
try {
api.batch(events)

synchronized(dequeLock) {
deque.removeAll(events)
while (deque.isNotEmpty()) {
batchEvents()
}
} catch (e: Throwable) {
// TODO: retry?
Expand All @@ -105,7 +136,7 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora

private fun calculatePausedUntil(): Date {
val cal = Calendar.getInstance()
cal.add(config.flushIntervalSeconds, Calendar.SECOND)
cal.add(Calendar.SECOND, config.flushIntervalSeconds)
return cal.time
}

Expand Down

0 comments on commit ec3b5a0

Please sign in to comment.