Skip to content

Commit

Permalink
Add messaging component
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrankel committed Jul 31, 2024
1 parent 6f1ace8 commit 795f846
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ build/
__pycache__
node_modules
.ruby-version
dist
23 changes: 23 additions & 0 deletions analytics/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#docker build -t otel-analytics:1.0 .
FROM node:21-alpine AS build

WORKDIR /usr/src/app

COPY package.json .

RUN --mount=type=cache,target=/root/.yarn YARN_CACHE_FOLDER=/root/.yarn yarn install --frozen-lockfile --non-interactive --production=true

COPY src/index.ts src/index.ts
COPY tsconfig.json .

RUN yarn run build

FROM node:21-alpine

WORKDIR /usr/src/app

COPY package.json .
COPY --from=build /usr/src/app/node_modules node_modules/
COPY --from=build /usr/src/app/dist/index.js dist/

CMD ["yarn", "run", "start"]
25 changes: 25 additions & 0 deletions analytics/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "analytics",
"version": "1.0.0",
"main": "src/index.ts",
"scripts": {
"start": "node dist/index.js",
"build": "tsc -p ."
},
"type": "module",
"license": "Apache License, Version 2.0",
"private": false,
"dependencies": {
"@opentelemetry/api": "^1.6.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.52.0",
"@opentelemetry/resources": "^1.17.0",
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-node": "^1.17.0",
"@opentelemetry/semantic-conventions": "^1.17.0",
"mqtt": "^5.1.0",
"ts-node": "^10.9.1"
},
"devDependencies": {
"typescript": "^5.2.2"
}
}
75 changes: 75 additions & 0 deletions analytics/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import {connect} from 'mqtt'
import {NodeSDK} from '@opentelemetry/sdk-node'
import {OTLPTraceExporter} from '@opentelemetry/exporter-trace-otlp-http'
import {context, propagation, trace} from '@opentelemetry/api'
import {Resource} from '@opentelemetry/resources'
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions'

function getEnvironmentVariable(name: string): string {
const value = process.env[name]
if (!value) {
throw new Error(`Environment variable ${name} must be defined`)
}
return value
}

const collectorUri = getEnvironmentVariable('COLLECTOR_URI')
const mqttServerUri = getEnvironmentVariable('MQTT_SERVER_URI')
const clientId = getEnvironmentVariable('MQTT_CLIENT_ID')
const topic = getEnvironmentVariable('MQTT_TOPIC')
const connectTimeout = getEnvironmentVariable('MQTT_CONNECT_TIMEOUT')

const sdk = new NodeSDK({
resource: new Resource({[SEMRESATTRS_SERVICE_NAME]: 'analytics'}),
traceExporter: new OTLPTraceExporter({
url: `${collectorUri}/v1/traces`
})
})

sdk.start()

const client = connect(mqttServerUri, {
clientId: clientId, protocolVersion: 5, connectTimeout: parseInt(connectTimeout),
})

client.on('connect', () => {
console.log('Connected')
client.subscribe([topic], () => {
console.log(`Subscribe to topic '${topic}'`)
})
})

client.on('reconnect', () => {
console.log('Reconnecting')
})

client.on('error', (error) => {
console.error(`Cannot connect:`, error)
})

client.on('message', (aTopic, payload, packet) => {
if (aTopic === topic) {

console.log('Received new message')

const data = JSON.parse(payload.toString())

const userProperties: Record<string, any> = {}
if (packet.properties && packet.properties['userProperties']) {
const props = packet.properties['userProperties']
console.error('Props', props)
for (const key of Object.keys(props)) {
userProperties[key] = props[key]
}
}

const activeContext = propagation.extract(context.active(), userProperties)
const tracer = trace.getTracer('analytics')
const span = tracer.startSpan(
'Read message',
{attributes: {path: data['path'], clientIp: data['clientIp']}},
activeContext,
)
span.end()
}
})
13 changes: 13 additions & 0 deletions analytics/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "ES2015",
"module": "nodenext",
"moduleResolution": "nodenext",
"strict": true,
"outDir": "dist",
"rootDir": "src"
},
"$schema": "https://json.schemastore.org/tsconfig",
"include": ["src/**/*"],
"exclude": ["node_modules"]
}
4 changes: 2 additions & 2 deletions catalog/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#docker build -t otel-catalog:1.1 .
#docker build -t otel-catalog:1.2 .
FROM eclipse-temurin:21-jdk-jammy AS build

COPY .mvn .mvn
Expand All @@ -10,7 +10,7 @@ RUN --mount=type=cache,target=/root/.m2,rw ./mvnw package -DskipTests

FROM eclipse-temurin:21-jre-jammy

COPY --from=build target/catalog-1.1.jar catalog.jar
COPY --from=build target/catalog-1.2.jar catalog.jar

ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v2.3.0/opentelemetry-javaagent.jar opentelemetry-javaagent.jar

Expand Down
26 changes: 25 additions & 1 deletion catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>ch.frankel.blog</groupId>
<artifactId>catalog</artifactId>
<version>1.1</version>
<version>1.2</version>
<properties>
<java.version>21</java.version>
<kotlin.version>2.0.0</kotlin.version>
Expand Down Expand Up @@ -53,6 +53,16 @@
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-serialization-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-serialization-json</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
Expand All @@ -62,6 +72,15 @@
<artifactId>opentelemetry-instrumentation-annotations</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
Expand Down Expand Up @@ -101,6 +120,11 @@
<artifactId>kotlin-maven-allopen</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
Expand Down
71 changes: 71 additions & 0 deletions catalog/src/main/kotlin/Analytics.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package ch.frankel.catalog

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.context.Context
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.eclipse.paho.mqttv5.client.MqttClient
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions
import org.eclipse.paho.mqttv5.common.MqttMessage
import org.eclipse.paho.mqttv5.common.packet.MqttProperties
import org.eclipse.paho.mqttv5.common.packet.UserProperty
import org.springframework.web.reactive.function.server.HandlerFilterFunction
import org.springframework.web.reactive.function.server.HandlerFunction
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono
import kotlin.jvm.optionals.getOrNull


@Serializable
data class Payload(val path: String, val clientIp: String?)

class MessageHolder(private val req: ServerRequest, private val spanContext: SpanContext, private val options: MessageOptions) {

val message = MqttMessage().apply {

properties = MqttProperties().apply {
val traceparent = "00-${spanContext.traceId}-${spanContext.spanId}-${spanContext.traceFlags}"
userProperties = listOf(UserProperty("traceparent", traceparent))
}
qos = options.qos
isRetained = options.retained

val hostAddress = req.remoteAddress().map { it.address.hostAddress }.getOrNull()
payload = Json.encodeToString(Payload(req.path(), hostAddress)).toByteArray()
}
}

class AnalyticsFilter(private val client: MqttClient, private val options: Mqtt, otel: OpenTelemetry) :
HandlerFilterFunction<ServerResponse, ServerResponse> {

private val tracer = otel.tracerBuilder("ch.frankel.catalog").build()

override fun filter(req: ServerRequest, next: HandlerFunction<ServerResponse>): Mono<ServerResponse> {

reconnectIfNeeded()

val span = tracer.spanBuilder("AnalyticsFilter.filter").setParent(Context.current()).startSpan().apply {
setAttribute("MQTT.topic", options.topic)
setAttribute("MQTT.server-uri", options.serverUri)
setAttribute("MQTT.client-id", options.clientId)
}
val message = MessageHolder(req, span.spanContext, options.message).message
client.publish(options.topic, message)
span.end()

return next.handle(req)
}

private fun reconnectIfNeeded() {
if (!client.isConnected) {
val connectionOptions = MqttConnectionOptions().apply {
connectionTimeout = options.connect.timeout
isAutomaticReconnect = options.connect.automatic
}
client.connect(connectionOptions)
}
}
}
12 changes: 11 additions & 1 deletion catalog/src/main/kotlin/AppProperties.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@ package ch.frankel.catalog
import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(prefix = "app")
data class AppProperties(private val stock: Stock, private val pricing: Pricing) {
data class AppProperties(private val stock: Stock, private val pricing: Pricing, val mqtt: Mqtt) {
val stockEndpoint = stock.endpoint
val pricingEndpoint = pricing.endpoint
}

data class Stock(val endpoint: String)
data class Pricing(val endpoint: String)
data class Mqtt(
val serverUri: String,
val clientId: String,
val topic: String,
val message: MessageOptions,
val connect: ConnectionOptions
)

data class MessageOptions(val qos: Int, val retained: Boolean)
data class ConnectionOptions(val automatic: Boolean, val timeout: Int)
8 changes: 6 additions & 2 deletions catalog/src/main/kotlin/CatalogApp.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package ch.frankel.catalog

import kotlinx.coroutines.CoroutineDispatcher
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.annotations.SpanAttribute
import io.opentelemetry.instrumentation.annotations.WithSpan
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactor.awaitSingle
import org.eclipse.paho.mqttv5.client.MqttClient
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand Down Expand Up @@ -107,11 +109,13 @@ val beans = beans {
PriceService(ref())
}
bean {
val mqtt = ref<AppProperties>().mqtt
val client = MqttClient(mqtt.serverUri, mqtt.clientId)
val handler = ProductHandler(ref(), ref(), Dispatchers.IO, ref())
coRouter {
GET("/products")(handler::products)
GET("/products/{id}")(handler::product)
}
}.filter(AnalyticsFilter(client, mqtt, GlobalOpenTelemetry.get()))
}
}

Expand Down
11 changes: 11 additions & 0 deletions config/apisix/front/apisix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ upstreams:
- id: 3
nodes:
"pricing:8000": 1
- id: 4
nodes:
"mosquitto:1883": 1

routes:
- name: Product Catalog
Expand All @@ -34,6 +37,14 @@ routes:
uri: /search
ret_code: 301

stream_routes:
- id: 1
upstream_id: 4
plugins:
mqtt-proxy:
protocol_name: MQTT
protocol_level: 5

global_rules:
- id: 1
plugins:
Expand Down
6 changes: 6 additions & 0 deletions config/apisix/front/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ deployment:
role: data_plane
role_data_plane:
config_provider: yaml
apisix:
proxy_mode: http&stream
stream_proxy:
tcp:
- addr: 9100
tls: false
plugins:
- opentelemetry
plugin_attr:
Expand Down
2 changes: 2 additions & 0 deletions config/mosquitto/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883
allow_anonymous true
Loading

0 comments on commit 795f846

Please sign in to comment.