-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
MetricsVerticle.java
48 lines (38 loc) · 1.47 KB
/
MetricsVerticle.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package io.vertx.example.kafka.dashboard;
import com.sun.management.OperatingSystemMXBean;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.lang.management.ManagementFactory;
import java.util.UUID;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class MetricsVerticle extends AbstractVerticle {
private OperatingSystemMXBean systemMBean;
private KafkaWriteStream<String, JsonObject> producer;
@Override
public void start() throws Exception {
systemMBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
// A random identifier
String pid = UUID.randomUUID().toString();
// Get the kafka producer config
JsonObject config = config();
// Create the producer
producer = KafkaWriteStream.create(vertx, config.getMap(), String.class, JsonObject.class);
// Publish the metircs in Kafka
vertx.setPeriodic(1000, id -> {
JsonObject metrics = new JsonObject();
metrics.put("CPU", systemMBean.getProcessCpuLoad());
metrics.put("Mem", systemMBean.getTotalPhysicalMemorySize() - systemMBean.getFreePhysicalMemorySize());
producer.write(new ProducerRecord<>("the_topic", new JsonObject().put(pid, metrics)));
});
}
@Override
public void stop() throws Exception {
if (producer != null) {
producer.close();
}
}
}