Skip to content

Commit

Permalink
Begin implementation of Jetstream service API.
Browse files Browse the repository at this point in the history
This allows developers to access the firehose in a more friendly way
than via the subscribeRepos() method. In this way, consumers can receive
friendly JSON output instead of CBOR-encoded blocks, as well as easy
filtering on specific collections or DIDs. This also supports the zstd
compression option for JVM and JS which can reduce message sizes
considerably. "A Jetstream consumer that only cares about posts and has
zstd compression enabled can get by on as little as ~25.5GB/mo, <99% of
the full weight firehose."

https://jazco.dev/2024/09/24/jetstream/
  • Loading branch information
christiandeange committed Nov 19, 2024
1 parent aa92189 commit d5cdce1
Show file tree
Hide file tree
Showing 20 changed files with 953 additions and 1 deletion.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ allprojects {

substitute(module("$group:api-gen-runtime-internal:$version"))
.using(project(":api-gen-runtime-internal"))

substitute(module("$group:bluesky:$version"))
.using(project(":bluesky"))
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ maven-publish = { module = "com.vanniktech:gradle-maven-publish-plugin", version
workflow-core = { module = "com.squareup.workflow1:workflow-core", version.ref = "workflow" }
workflow-runtime = { module = "com.squareup.workflow1:workflow-runtime", version.ref = "workflow" }

zstd = { module = "com.github.luben:zstd-jni", version = "1.5.6-7" }

bluesky = { module = "sh.christian.ozone:bluesky", version = "0.2.0" }
1 change: 1 addition & 0 deletions jetstream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/build/
342 changes: 342 additions & 0 deletions jetstream/api/jetstream.api

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions jetstream/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import org.jetbrains.dokka.gradle.AbstractDokkaTask
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id("ozone-dokka")
id("ozone-multiplatform")
id("ozone-publish")
id("sh.christian.ozone.generator")
id("org.jetbrains.kotlinx.binary-compatibility-validator")
}

ozone {
js()
jvm()
}

dependencies {
lexicons(fileTree("schemas") {
include("**/*.json")
})
}

kotlin {
sourceSets {
val commonMain by getting {
dependencies {
implementation(libs.ktor.logging)
}
}
val jvmMain by getting {
dependencies {
implementation(libs.zstd)
}
}
val jsMain by getting {
dependencies {
implementation(npm("zstd-codec", "0.1.5"))
}
}
}
}

val generateLexicons = tasks.generateLexicons
tasks.apiDump.configure { dependsOn(generateLexicons) }
tasks.apiCheck.configure { dependsOn(generateLexicons) }

tasks.withType<AbstractDokkaTask>().configureEach {
dependsOn(tasks.withType<KotlinCompile>())
}
2 changes: 2 additions & 0 deletions jetstream/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
POM_NAME=AT Protocol for Kotlin - Jetstream API
POM_DESCRIPTION=Jetstream API bindings for Kotlin.
182 changes: 182 additions & 0 deletions jetstream/schemas/app/bsky/jetstream/subscribe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
{
"lexicon": 1,
"id": "app.bsky.jetstream.subscribe",
"defs": {
"main": {
"type": "subscription",
"description": "Consume lightweight, friendly JSON converted from an ATProto `com.atproto.sync.subscribeRepos` stream.",
"parameters": {
"type": "params",
"properties": {
"wantedCollections": {
"type": "array",
"maxLength": 100,
"items": {
"type": "string",
"format": "nsid"
},
"description": "Which records you receive on your stream (default empty = all collections)."
},
"wantedDids": {
"type": "array",
"maxLength": 10000,
"items": {
"type": "string",
"format": "did"
},
"description": "Which records you receive on your stream (default empty = all repos)."
},
"maxMessageSizeBytes": {
"type": "integer",
"minimum": 0,
"default": 0,
"description": "The maximum size of a payload that this client would like to receive."
},
"cursor": {
"type": "integer",
"description": "A unix microseconds timestamp cursor to begin playback from."
},
"compress": {
"type": "boolean",
"default": false,
"description": "Set to true to enable zstd compression."
},
"requireHello": {
"type": "boolean",
"default": false,
"description": "Set to true to pause replay/live-tail until the server receives a SubscribeOptionsUpdate."
}
}
},
"message": {
"schema": {
"type": "ref",
"ref": "#event"
}
}
},
"event": {
"type": "object",
"required": ["did", "time_us", "kind"],
"nullable": ["commit", "identity", "account"],
"properties": {
"did": {
"type": "string",
"format": "did"
},
"time_us": {
"type": "integer"
},
"kind": {
"type": "string",
"knownValues": ["commit", "identity", "account"]
},
"commit": {
"type": "ref",
"ref": "#commit"
},
"identity": {
"type": "ref",
"ref": "#identity"
},
"account": {
"type": "ref",
"ref": "#account"
}
}
},
"commit": {
"type": "object",
"required": ["rev", "operation", "collection", "rkey"],
"nullable": ["record"],
"properties": {
"rev": { "type": "string" },
"operation": {
"type": "string",
"knownValues": ["create", "update", "delete"]
},
"collection": {
"type": "string",
"format": "nsid"
},
"rkey": {
"type": "string",
"format": "record-key"
},
"record": { "type": "unknown" },
"cid": {
"type": "string",
"format": "cid"
}
}
},
"identity": {
"type": "object",
"required": ["did", "handle", "seq", "time"],
"properties": {
"did": {
"type": "string",
"format": "did"
},
"handle": {
"type": "string",
"format": "handle"
},
"seq": { "type": "integer" },
"time": { "type": "string" }
}
},
"account": {
"type": "object",
"required": ["active", "did", "seq", "time"],
"properties": {
"active": { "type": "boolean" },
"did": {
"type": "string",
"format": "did"
},
"seq": { "type": "integer" },
"time": { "type": "string" }
}
},
"sourcedMessage": {
"type": "object",
"description": "Send messages back to Jetstream over the websocket.",
"required": ["type", "payload"],
"properties": {
"type": { "type": "string" },
"payload": { "type": "unknown" }
}
},
"optionsUpdate": {
"type": "object",
"description": "Update subscription filter after connecting to the socket.",
"properties": {
"wantedCollections": {
"type": "array",
"maxLength": 100,
"items": {
"type": "string",
"format": "nsid"
},
"description": "Which records you receive on your stream (default empty = all collections)."
},
"wantedDids": {
"type": "array",
"maxLength": 10000,
"items": {
"type": "string",
"format": "did"
},
"description": "Which records you receive on your stream (default empty = all repos)."
},
"maxMessageSizeBytes": {
"type": "integer",
"minimum": 0,
"default": 0,
"description": "The maximum size of a payload that this client would like to receive."
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package sh.christian.ozone.jetstream

import app.bsky.jetstream.SubscribeMessage
import app.bsky.jetstream.SubscribeOptionsUpdate
import app.bsky.jetstream.SubscribeQueryParams
import app.bsky.jetstream.SubscribeSourcedMessage
import io.ktor.client.HttpClient
import io.ktor.client.plugins.DefaultRequest
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.client.plugins.websocket.wss
import io.ktor.client.request.parameter
import io.ktor.websocket.Frame
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import sh.christian.ozone.api.model.JsonContent
import sh.christian.ozone.api.runtime.BlueskyJson
import sh.christian.ozone.api.xrpc.defaultHttpClient

/**
* Implementation to interact with a hosted Jetstream service.
*
* @constructor Construct a new instance using an existing [HttpClient].
*/
class JetstreamApi(httpClient: HttpClient) : JetstreamInterface {

/** Construct a new instance using a free-form [hostName]. */
constructor(hostName: String) : this(
defaultHttpClient.config {
install(DefaultRequest) {
url.host = hostName
}
}
)

/** Construct a new instance using a well-known [JetstreamHost] instance. */
constructor(host: JetstreamHost) : this("jetstream${host.instance}.${host.region}.bsky.network")

/** Construct a new instance that connects to the [JetstreamHost.JETSTREAM_1_US_EAST] instance. */
constructor() : this(JetstreamHost.JETSTREAM_1_US_EAST)

private val client: HttpClient = httpClient.config {
install(WebSockets)
}

override suspend fun subscribe(params: SubscribeQueryParams): Flow<SubscribeMessage> = flow {
withSubscribe(params) {
emitAll(incoming.messages())
}
}

override suspend fun subscribe(
params: SubscribeQueryParams,
block: suspend SubscriptionContext.() -> Unit,
) {
withSubscribe(params) {
val incomingMessages = incoming.messages()
val outgoingMessages = MutableSharedFlow<SubscribeSourcedMessage>()

val outgoingJob = launch {
outgoingMessages.collect { sourcedMessage ->
outgoing.send(Frame.Text(BlueskyJson.encodeToString(sourcedMessage)))
}
}

try {
DefaultSubscriptionContext(outgoingMessages, incomingMessages).apply {
block()
}
} finally {
outgoingJob.cancelAndJoin()
}
}
}

private suspend fun withSubscribe(
params: SubscribeQueryParams,
block: suspend DefaultClientWebSocketSession.() -> Unit
) {
client.wss(
path = "/subscribe",
request = { params.asList().forEach { (key, value) -> parameter(key, value) } },
) {
initZstd()
block()
}
}

private fun ReceiveChannel<Frame>.messages(): Flow<SubscribeMessage> {
return receiveAsFlow()
.mapNotNull { frame ->
when (frame) {
// zstd-compressed json
is Frame.Binary -> decompressZstd(frame.data)
// raw json
is Frame.Text -> frame.data
// ignored
is Frame.Close,
is Frame.Ping,
is Frame.Pong -> null
else -> null
}
}
.map { data ->
BlueskyJson.decodeFromString(
deserializer = SubscribeMessage.serializer(),
string = data.decodeToString(),
)
}
.catch { it.printStackTrace() }
}

private inner class DefaultSubscriptionContext(
private val sourcedMessages: MutableSharedFlow<SubscribeSourcedMessage>,
override val messages: Flow<SubscribeMessage>,
) : SubscriptionContext {
override suspend fun updateSubscription(message: SubscribeOptionsUpdate) {
sourcedMessages.emit(
SubscribeSourcedMessage(
type = "options_update",
payload = JsonContent.encodeFrom(message),
)
)
}
}
}
Loading

0 comments on commit d5cdce1

Please sign in to comment.