Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt Water BrokerClient to new structure #15

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
.gradle/
build/
.idea/
.vscode/
node_modules/
dist/
yarn-error.log
Expand Down
38 changes: 18 additions & 20 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ abstract class BrokerClient(
}
}

internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

internal fun toResponseKey(key: String): String = "$key.response"

}

private class TopicMetadata(
Expand All @@ -127,21 +122,6 @@ private class TopicMetadata(
private val topic: String,
) {

private class KeyMetadata(val key: String) {
val producers: MutableSet<ProducerSubclient<*>> = Collections.synchronizedSet(HashSet())
val consumers: MutableSet<ConsumerSubclient<*>> = Collections.synchronizedSet(HashSet())

val isEmpty: Boolean
get() = producers.isEmpty() && consumers.isEmpty()

fun destroy() {
producers.forEach(ProducerSubclient<*>::destroy)
consumers.forEach(ConsumerSubclient<*>::destroy)
producers.clear()
consumers.clear()
}
}

private val log by Log
private val _keys: MutableMap<String, KeyMetadata> = Collections.synchronizedMap(HashMap())
private val isBeingDestroyed = AtomicBoolean(false)
Expand All @@ -158,6 +138,9 @@ private class TopicMetadata(
subclient.key,
subclient.topic
)
check(subclient.topic == topic) {
"Attempting to register subclient with topic '${subclient.topic}' in TopicMetadata of '$topic'"
}
val metadata = getOrCreateKeyMetadata(subclient.key)
when (subclient) {
is ConsumerSubclient<*> -> {
Expand Down Expand Up @@ -255,6 +238,21 @@ private class TopicMetadata(

}

private class KeyMetadata(val key: String) {
val producers: MutableSet<ProducerSubclient<*>> = Collections.synchronizedSet(HashSet())
val consumers: MutableSet<ConsumerSubclient<*>> = Collections.synchronizedSet(HashSet())

val isEmpty: Boolean
get() = producers.isEmpty() && consumers.isEmpty()

fun destroy() {
producers.forEach(ProducerSubclient<*>::destroy)
consumers.forEach(ConsumerSubclient<*>::destroy)
producers.clear()
consumers.clear()
}
}

@PublishedApi
internal inline fun <reified T> isTypeNullable(): Boolean {
return null is T || T::class.java == Unit::class.java || T::class.java == Void::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ abstract class BrokerConnection {
listeners.remove(cb)
if (listeners.size == 0) {
log.debug("Removing topic '{}'", topic)
deferredTopicsToCreate.remove(topic)
removeTopic(topic)
null
} else {
Expand Down
31 changes: 7 additions & 24 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,17 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
}

constructor(
sourceService: String,
sourceInstance: String,
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
) : this(
createHeadersMap(
sourceService,
sourceInstance,
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
null,
)
)

constructor(
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
) : this(
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
),
)

companion object {
Expand All @@ -54,6 +42,7 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
private const val HEADER_TARGET_INSTANCES = "target-instances"
private const val HEADER_MESSAGE_ID = "message-id"

// Needs to be JvmStatic to be used in subclasses
@JvmStatic
protected fun createHeadersMap(
sourceService: String,
Expand All @@ -66,23 +55,17 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
val headers = HashMap<String, String>()
headers[HEADER_SOURCE_SERVICE] = sourceService
headers[HEADER_SOURCE_INSTANCE] = sourceInstance
headers[HEADER_TARGET_SERVICES] = joinToString(targetServices)
headers[HEADER_TARGET_INSTANCES] = joinToString(targetInstances)
headers[HEADER_TARGET_SERVICES] = targetServices.joinToString(",")
headers[HEADER_TARGET_INSTANCES] = targetInstances.joinToString(",")
headers[HEADER_MESSAGE_ID] = messageId ?: UUID.randomUUID().toString()
headers.putAll(extra)
return headers
}

@JvmStatic
protected fun splitToSet(value: String): Set<String> {
return value.split(",").filter { it.isNotEmpty() }.toSet()
}

@JvmStatic
protected fun joinToString(value: Set<String>): String {
return value.joinToString(",")
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class RabbitConnection(
// https://www.rabbitmq.com/docs/publishers#message-properties
deliveryMode(2) // Persistent
headers(headers.headers) // lol
messageId(headers.messageId)
}.build()
channelData.channel.basicPublish(topic, key, properties, value.toByteArray())
}
Expand Down
19 changes: 12 additions & 7 deletions latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class RpcClient<RequestT, ResponseT>(
private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg ->
suspend fun sendResponse(response: ResponseT?, status: RpcStatus, isException: Boolean, isUpdate: Boolean) {
val responseMsg = RpcResponseMessage(
client.toResponseTopic(topic),
client.toResponseKey(key),
toResponseTopic(topic),
toResponseKey(key),
response,
RpcMessageHeaders(
connection,
Expand Down Expand Up @@ -71,24 +71,24 @@ class RpcClient<RequestT, ResponseT>(
return@consumer
} catch (ex: Exception) {
log.error(
"Uncaught RPC callbac#k error while processing message ${msg.headers.messageId} " +
"Uncaught RPC callback error while processing message ${msg.headers.messageId} " +
"with key '$key' in topic '$topic'",
ex,
)
return@consumer
}
}
private val responseProducer = client.producer(
client.toResponseTopic(topic),
client.toResponseKey(key),
toResponseTopic(topic),
toResponseKey(key),
options,
responseType,
responseIsNullable,
)
private val responseFlow = MutableSharedFlow<BaseBrokerMessage<ResponseT>>()
private val responseConsumer = client.consumer(
client.toResponseTopic(topic),
client.toResponseKey(key),
toResponseTopic(topic),
toResponseKey(key),
options,
responseType,
responseIsNullable,
Expand Down Expand Up @@ -160,6 +160,11 @@ class RpcClient<RequestT, ResponseT>(

}

private fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

private fun toResponseKey(key: String): String = "$key.response"

override fun doDestroy() {
requestProducer.destroy()
requestConsumer.destroy()
Expand Down
44 changes: 11 additions & 33 deletions latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ class RpcMessageHeaders(headers: Map<String, String>) : BrokerMessageHeaders(hea
constructor(base: BrokerMessageHeaders) : this(base.headers)

constructor(
sourceService: String,
sourceInstance: String,
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
inReplyTo: MessageId,
Expand All @@ -33,46 +32,25 @@ class RpcMessageHeaders(headers: Map<String, String>) : BrokerMessageHeaders(hea
isUpdate: Boolean,
) : this(
createHeadersMap(
sourceService,
sourceInstance,
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
null,
extra = mapOf(
HEADER_IN_REPLY_TO to inReplyTo,
HEADER_STATUS to status.code.toString(),
HEADER_IS_EXCEPTION to isException.toString(),
HEADER_IS_UPDATE to isUpdate.toString(),
)
)
)

constructor(
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
inReplyTo: MessageId,
status: RpcStatus,
isException: Boolean,
isUpdate: Boolean,
) : this(
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
inReplyTo,
status,
isException,
isUpdate,
extra =
mapOf(
HEADER_IN_REPLY_TO to inReplyTo,
HEADER_STATUS to status.code.toString(),
HEADER_IS_EXCEPTION to isException.toString(),
HEADER_IS_UPDATE to isUpdate.toString(),
),
),
)

companion object {

private const val HEADER_IN_REPLY_TO = "rpc-in-reply-to"
private const val HEADER_STATUS = "rpc-response-status"
private const val HEADER_IS_EXCEPTION = "rpc-is-exception"
private const val HEADER_IS_UPDATE = "rpc-is-update"

}

}
1 change: 1 addition & 0 deletions water/.eslintignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
dist
.eslintrc.cjs
vite.config.ts
1 change: 0 additions & 1 deletion water/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
.idea/
.vscode/
.pnp.*
.yarn/*
!.yarn/patches
Expand Down
5 changes: 5 additions & 0 deletions water/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*
!/src
!eslintrc.cjs
!package.json
!tsconfig.json
10 changes: 10 additions & 0 deletions water/.prettierrc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
useTabs: true
singleQuote: false
jsxSingleQuote: false
bracketSameLine: true
quoteProps: as-needed
trailingComma: all
semi: true
printWidth: 120
arrowParens: avoid
endOfLine: auto
16 changes: 16 additions & 0 deletions water/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[json]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"editor.insertSpaces": false,
"editor.detectIndentation": false,
"editor.formatOnSave": true,
"editor.tabSize": 4,
"prettier.requireConfig": true,
"prettier.configPath": ".prettierrc.yml",
"typescript.tsdk": "node_modules\\typescript\\lib",
"typescript.enablePromptUseWorkspaceTsdk": true
}
56 changes: 31 additions & 25 deletions water/package.json
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
{
"name": "@beemobot/water",
"version": "1.0.0",
"author": "Beemo Devs (https://beemo.gg)",
"license": "MIT",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"build": "rm -rf dist/ && tsc",
"lint": "eslint ."
},
"files": [
"dist"
],
"dependencies": {
"kafkajs": "^2.2.3"
},
"devDependencies": {
"@types/node": "^18.11.18",
"@typescript-eslint/eslint-plugin": "^5.48.0",
"@typescript-eslint/parser": "^5.48.0",
"eslint": "^8.31.0",
"eslint-plugin-import": "^2.26.0",
"typescript": "^4.9.4"
}
"name": "@beemobot/water",
"version": "1.0.0",
"author": "Beemo Devs (https://beemo.gg)",
"license": "MIT",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"build": "rm -rf dist/ && tsc",
"format": "prettier --write .",
"lint": "eslint .",
"test": "vitest"
},
"files": [
"dist"
],
"dependencies": {
"rabbitmq-client": "^4.6.0",
"valibot": "^0.36.0"
},
"devDependencies": {
"@types/node": "^20.14.10",
"@typescript-eslint/eslint-plugin": "^7.16.0",
"@typescript-eslint/parser": "^7.16.0",
"eslint": "^8.57.0",
"eslint-plugin-import": "^2.29.1",
"prettier": "^3.3.3",
"typescript": "^5.5.3",
"vite-tsconfig-paths": "^4.3.2",
"vitest": "^2.0.3"
}
}
7 changes: 7 additions & 0 deletions water/src/CommonConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export enum BrokerServices {
TEA = "tea", // Bot
VANILLA = "vanilla", // Bot cluster coordinator
MILK = "milk", // Raid logs
SUGAR = "sugar", // Premium management
COFFEE = "coffee", // Raid bans
}
Loading