Skip to content

Latest commit

 

History

History
90 lines (67 loc) · 3.8 KB

README.md

File metadata and controls

90 lines (67 loc) · 3.8 KB

KStream pipeline template

This is IoT phone demo fork with added monitoring stack to collect and visualize JMX metrics from KStream apps.

Please follow this youtube video to use it:

Watch the video

Example KStream app

In the video we going to build a KStream service that count number of messages per second in input topic. This is code snippet of the service:

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Produced;
import org.json.JSONObject;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;

public class MessageCountPerSecond {

    public static void main(String[] args) {
        try {
            
            // Get the environment ID from the environment variables
            String workspace_id = System.getenv("Quix__Workspace__Id");

            // Get Kafka configuration properties
            Properties props = QuixConfigBuilder.buildKafkaProperties();

            props.put("application.id", workspace_id + "-" + "message-count-app");

            // Set default key and value serde
            props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
            props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");

            StreamsBuilder builder = new StreamsBuilder();

            // Topic name consists of workspace ID and input topic name in QuixCloud.
            KStream<String, String> messageStream = builder.stream(workspace_id + "-" + System.getenv("input"));

            // Count messages per second
            KTable<Windowed<String>, Long> messageCountPerSecond = messageStream
                    .groupByKey()
                    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(1)))
                    .count();

            // Convert the count data to JSON format before sending it to the output topic
            KStream<String, String> jsonOutputStream = messageCountPerSecond.toStream()
                    .map((windowedKey, count) -> {
                        long windowStart = windowedKey.window().start();
                        long windowEnd = windowedKey.window().end();

                        // Create a JSON object with window-start, window-end, and count
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("window-start", windowStart);
                        jsonObject.put("window-end", windowEnd);
                        jsonObject.put("count", count);

                        // Convert the Windowed key to a simple string representation (optional)
                        String key = windowedKey.key();

                        // Return the new key-value pair where value is JSON string
                        return new org.apache.kafka.streams.KeyValue<>(key, jsonObject.toString());
                    });

            // Output the JSON results to another topic
            jsonOutputStream.to(workspace_id + "-" + System.getenv("output"), Produced.with(Serdes.String(), Serdes.String()));

            KafkaStreams streams = new KafkaStreams(builder.build(), props);
            streams.start();

            // Gracefully shutdown on exit
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}