Skip to content

Commit

Permalink
Add messaging component
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrankel committed May 15, 2024
1 parent 405000f commit 4d8ce30
Show file tree
Hide file tree
Showing 14 changed files with 1,104 additions and 8 deletions.
13 changes: 13 additions & 0 deletions analytics/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#docker build -t otel-analytics:1.0 .
FROM node:21-alpine

WORKDIR /usr/src/app

COPY package.json .
COPY yarn.lock .

RUN --mount=type=cache,target=/root/.yarn YARN_CACHE_FOLDER=/root/.yarn yarn install --frozen-lockfile --non-interactive --production=true

COPY index.js .

CMD ["node", "index.js"]
68 changes: 68 additions & 0 deletions analytics/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'use strict'

import {connect} from 'mqtt'
import {NodeSDK} from '@opentelemetry/sdk-node'
import {OTLPTraceExporter} from '@opentelemetry/exporter-trace-otlp-http'
import {context, propagation, trace} from '@opentelemetry/api'
import {Resource} from '@opentelemetry/resources'
import {SemanticResourceAttributes} from '@opentelemetry/semantic-conventions'

const collectorUri = process.env.COLLECTOR_URI
const mqttServerUri = process.env.MQTT_SERVER_URI
const clientId = process.env.MQTT_CLIENT_ID
const topic = process.env.MQTT_TOPIC
const connectTimeout = process.env.MQTT_CONNECT_TIMEOUT

const sdk = new NodeSDK({
resource: new Resource({[SemanticResourceAttributes.SERVICE_NAME]: 'analytics'}),
traceExporter: new OTLPTraceExporter({
url: `${collectorUri}/v1/traces`
})
})

sdk.start()

const client = connect(mqttServerUri, {
clientId: clientId, protocolVersion: 5, connectTimeout: connectTimeout,
})

client.on('connect', () => {
console.log('Connected')
client.subscribe([topic], () => {
console.log(`Subscribe to topic '${topic}'`)
})
})

client.on('reconnect', () => {
console.log('Reconnecting')
})

client.on('error', (error) => {
console.error(`Cannot connect:`, error)
})

client.on('message', (aTopic, payload, packet) => {
if (aTopic === topic) {

console.log('Received new message')

const data = JSON.parse(payload.toString())

const userProperties = {}
if (packet.properties['userProperties']) {
const props = packet.properties['userProperties']
for (const key of Object.keys(props)) {
userProperties[key] = props[key]
}
}

const activeContext = propagation.extract(context.active(), userProperties)
const tracer = trace.getTracer('analytics')
const span = tracer.startSpan(
'Read message',
{attributes: {path: data['path'], clientIp: data['clientIp']}},
activeContext,
)
span.end()
}
})
17 changes: 17 additions & 0 deletions analytics/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "analytics",
"version": "1.0.0",
"main": "index.js",
"type": "module",
"license": "Apache License, Version 2.0",
"private": false,
"dependencies": {
"@opentelemetry/api": "^1.6.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.51.0",
"@opentelemetry/resources": "^1.17.0",
"@opentelemetry/sdk-node": "^0.51.0",
"@opentelemetry/sdk-trace-node": "^1.17.0",
"@opentelemetry/semantic-conventions": "^1.17.0",
"mqtt": "^5.1.0"
}
}
Loading

0 comments on commit 4d8ce30

Please sign in to comment.