Skip to content

Commit

Permalink
fix add queue
Browse files Browse the repository at this point in the history
  • Loading branch information
marandaneto committed Sep 19, 2023
1 parent 39950d6 commit 2cceef7
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import androidx.compose.runtime.Composable
import androidx.compose.ui.Modifier
import androidx.compose.ui.text.AnnotatedString
import androidx.compose.ui.tooling.preview.Preview
import com.posthog.PostHog
import com.posthog.android.sample.ui.theme.PostHogAndroidSampleTheme

class MainActivity : ComponentActivity() {
Expand Down Expand Up @@ -38,7 +39,7 @@ fun Greeting(name: String, modifier: Modifier = Modifier) {
modifier = modifier,
onClick = {
// PostHog.identify("my_distinct_id", properties = mapOf("my_property" to 1), userProperties = mapOf("name" to "hello"))
// PostHog.capture("testEvent", mapOf("testProperty" to "testValue"))
PostHog.capture("testEvent", mapOf("testProperty" to "testValue"))
// PostHog.reloadFeatureFlagsRequest()
// PostHog.isFeatureEnabled("sessionRecording")
// PostHog.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class MyApp : Application() {

val config = PostHogConfig("_6SG-F7I1vCuZ-HdJL3VZQqjBlaSb1_20hDPwqMNnGI").apply {
debug = true
flushAt = 5
// flushIntervalSeconds = 5
// flushAt = 1
}
Expand Down
105 changes: 41 additions & 64 deletions posthog-v3/posthog/src/main/java/com/posthog/internal/PostHogQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,47 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora

private var isFlushing = AtomicBoolean(false)

private var dirCreated = false

private val delay: Long get() = (config.flushIntervalSeconds * 1000).toLong()

private val executor = Executors.newSingleThreadScheduledExecutor(PostHogThreadFactory("PostHogQueueThread"))

fun add(event: PostHogEvent) {
var removeFirst = false
if (deque.size >= config.maxQueueSize) {
try {
val first: File
removeFirst = true
}

executor.execute {
if (removeFirst) {
try {
val first: File
synchronized(dequeLock) {
first = deque.removeFirst()
}
first.delete()
config.logger.log("Queue is full, the oldest event ${first.name} is dropped.")
} catch (ignore: NoSuchElementException) {}
}

config.storagePrefix?.let {
val dir = File(it, config.apiKey)

if (!dirCreated) {
dir.mkdirs()
}

val file = File(dir, "${UUID.randomUUID()}.event")
synchronized(dequeLock) {
first = deque.removeFirst()
deque.add(file)
}
config.logger.log("Queue is full, the oldest event ${first.name} is dropped.")
} catch (ignore: NoSuchElementException) {}
}
serializer.serializeEvent(event, file.writer().buffered())
config.logger.log("Queued event ${file.name}.")

val dir = File(config.storagePrefix!!, config.apiKey)
val file = File(dir, "${UUID.randomUUID()}.event")
synchronized(dequeLock) {
deque.add(file)
flushIfOverThreshold()
}
}
serializer.serializeEvent(event, file.writer().buffered())
config.logger.log("Queued event ${file.name}.")

flushIfOverThreshold()
}

private fun flushIfOverThreshold() {
Expand All @@ -72,7 +89,7 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
return true
}

private fun takeEvents(): List<File> {
private fun takeFiles(): List<File> {
val events: List<File>
synchronized(dequeLock) {
events = deque.take(config.maxBatchSize)
Expand All @@ -99,20 +116,18 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
} catch (e: Throwable) {
config.logger.log("Flushing failed: $e")

// TODO: when do we actually drop those events? maybe they are broken for good
// and the SDK will be stuck at them
retry = true
retryCount++
}

calculateDelay(retry)
} finally {
calculateDelay(retry)

isFlushing.set(false)
isFlushing.set(false)
}
}
}

private fun batchEvents() {
val files = takeEvents()
val files = takeFiles()

val events = mutableListOf<PostHogEvent>()
for (file in files) {
Expand Down Expand Up @@ -151,11 +166,11 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
config.logger.log("Flushing failed: $e")
retry = true
retryCount++
}
} finally {
calculateDelay(retry)

calculateDelay(retry)

isFlushing.set(false)
isFlushing.set(false)
}
}
}

Expand All @@ -171,21 +186,6 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
}

fun start() {
executor.execute {
config.storagePrefix?.let {
val file = File(it, config.apiKey)

if (!file.exists()) {
file.mkdirs()
}

// flushLegacyEvents(file)

// synchronized(dequeLock) {
// }
}
}

synchronized(timerLock) {
stopTimer()
val timer = Timer(true)
Expand All @@ -202,29 +202,6 @@ internal class PostHogQueue(private val config: PostHogConfig, private val stora
}
}

// private fun flushLegacyEvents(file: File) {
// config.legacyStoragePrefix?.let {
// val legacyDir = File(it)
// val legacyFile = File(legacyDir, "${config.apiKey}.tmp")
//
// if (legacyFile.exists()) {
// val legacy = PostHogQueueFile.Builder(legacyFile)
// .forceLegacy(true)
// .build()
// val iterator = legacy.iterator()
// while (iterator.hasNext()) {
// val next = iterator.next()
//
// val nextFile = File(file, "${UUID.randomUUID()}.event")
// nextFile.writeBytes(next)
//
// iterator.remove()
// }
// }
// legacyFile.delete()
// }
// }

private fun stopTimer() {
timerTask?.cancel()
timer?.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,70 @@ internal class SendCachedEventsIntegration(private val config: PostHogConfig, pr
executor.shutdown()
}

// TODO: respect maxBatchSize

private fun flushLegacyEvents() {
config.legacyStoragePrefix?.let {
val legacyDir = File(it)
val legacyFile = File(legacyDir, "${config.apiKey}.tmp")

if (legacyFile.exists()) {
val legacy = PostHogQueueFile.Builder(legacyFile)
.forceLegacy(true)
.build()
if (!legacyFile.exists()) {
return
}

val iterator = legacy.iterator()
val legacy = PostHogQueueFile.Builder(legacyFile)
.forceLegacy(true)
.build()

val events = mutableListOf<PostHogEvent>()
val iterator = legacy.iterator()

while (iterator.hasNext()) {
val eventBytes = iterator.next()
val events = mutableListOf<PostHogEvent>()

val event = serializer.deserializeEvent(eventBytes.inputStream().reader().buffered())
event?.let {
events.add(event)
}
}
while (iterator.hasNext()) {
val eventBytes = iterator.next()

if (events.isNotEmpty()) {
api.batch(events)
val event = serializer.deserializeEvent(eventBytes.inputStream().reader().buffered())
event?.let {
events.add(event)
}
}

legacyFile.delete()
if (events.isNotEmpty()) {
api.batch(events)
}

legacyFile.delete()
}
}

private fun flushEvents() {
config.storagePrefix?.let {
val dir = File(it, config.apiKey)

if (dir.exists()) {
// TODO: in case this is executed after new events come in, we have to filter those
// they are in the queue already
val listFiles = dir.listFiles() ?: arrayOf()
val events = mutableListOf<PostHogEvent>()
val iterator = listFiles.iterator()
if (!dir.exists()) {
return
}

// TODO: in case this is executed after new events come in, we have to filter those
// they are in the queue already
val listFiles = dir.listFiles() ?: arrayOf()
val events = mutableListOf<PostHogEvent>()
val iterator = listFiles.iterator()

while (iterator.hasNext()) {
val eventBytes = iterator.next()
while (iterator.hasNext()) {
val eventBytes = iterator.next()

val event = serializer.deserializeEvent(eventBytes.inputStream().reader().buffered())
event?.let {
events.add(event)
}
val event = serializer.deserializeEvent(eventBytes.inputStream().reader().buffered())
event?.let {
events.add(event)
}
}

if (events.isNotEmpty()) {
api.batch(events)
if (events.isNotEmpty()) {
api.batch(events)

listFiles.forEach { file ->
file.delete()
}
listFiles.forEach { file ->
file.delete()
}
}
}
Expand Down

0 comments on commit 2cceef7

Please sign in to comment.