Skip to content

Commit

Permalink
KStream + monitoring stack (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomas-quix authored Nov 13, 2024
1 parent 9868fe1 commit 7852b75
Show file tree
Hide file tree
Showing 57 changed files with 3,603 additions and 3,780 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,5 @@ venv/
**/certificates/

**/state/

.venv/
1 change: 0 additions & 1 deletion README.md

This file was deleted.

10 changes: 4 additions & 6 deletions json-sink/README.md → counter/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# Starter transformation

[This project](https://github.com/quixio/quix-samples/tree/main/python/destinations/starter_destination) is an example of how to transform data on the fly between source and destination.
[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/starter_transformation) demonstrates how to consume data from a topic, apply a simple transformation to that data and publish the result to an output topic (while printing content to the console output).

The default implementation subscribes to data from the source topic and publishes to your destination as-well-as printing content to console output.

Modify the Python code to publish to your chosen destination(s) on the fly.
Modify the Python code to transform your data on the fly.

## How to run

Create a [Quix](https://portal.platform.quix.ai/self-sign-up?xlink=github) account or log-in and visit the Samples to use this project.
Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the Samples to use this project.

Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying.

Expand All @@ -27,4 +25,4 @@ Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-sampl

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
Please star us and mention us on social to show your appreciation.
17 changes: 17 additions & 0 deletions counter/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: counter
language: csharp
variables:
- name: input
inputType: InputTopic
description: Name of the input topic to listen to.
defaultValue: raw
required: false
- name: output
inputType: OutputTopic
description: Name of the output topic to write to.
defaultValue: count-1s
required: false
dockerfile: dockerfile
runEntryPoint: src/main/java/com/example/MessageCountPerSecond.java
defaultFile: src/main/java/com/example/MessageCountPerSecond.java
libraryItemId: kstreams
3 changes: 3 additions & 0 deletions counter/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rules:
- pattern: ".*"

34 changes: 34 additions & 0 deletions counter/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Stage 1: Build the application
FROM maven:3.8.4-openjdk-17 AS build

# Set the working directory in the container
WORKDIR /app

# Copy the pom.xml and download dependencies
COPY pom.xml .
RUN mvn dependency:go-offline

# Copy the entire project and build the application
COPY src ./src
RUN mvn clean package -DskipTests

# Stage 2: Run the application
FROM openjdk:17-jdk-slim

# Set the working directory in the container
WORKDIR /app
COPY config.yaml /app/config.yaml

# Copy the built shaded JAR file from the previous stage
COPY --from=build /app/target/kafka-message-count-1.0-SNAPSHOT.jar app.jar

# Copy the Prometheus JMX Exporter and configuration file
ADD https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar /app/jmx_prometheus_javaagent.jar

# Expose port 80 for Prometheus metrics
EXPOSE 80

# Run the application with JMX enabled
ENTRYPOINT ["java", \
"-javaagent:/app/jmx_prometheus_javaagent.jar=80:/app/config.yaml", \
"-jar", "/app/app.jar"]
23 changes: 23 additions & 0 deletions counter/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
from quixstreams import Application

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()

app = Application(consumer_group="transformation-v1", auto_offset_reset="earliest")

input_topic = app.topic(os.environ["input"])
output_topic = app.topic(os.environ["output"])

sdf = app.dataframe(input_topic)

# put transformation logic here
# see docs for what you can do
# https://quix.io/docs/get-started/quixtour/process-threshold.html

sdf.print()
sdf.to_topic(output_topic)

if __name__ == "__main__":
app.run(sdf)
83 changes: 83 additions & 0 deletions counter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-message-count</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>kafka-message-count</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>

<!-- JSON Parsing Library -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>

<!-- Logging (SLF4J with Log4j binding) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.MessageCountPerSecond</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- Adds the Main-Class to the manifest -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.MessageCountPerSecond</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
</project>
2 changes: 2 additions & 0 deletions counter/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams<2.10
python-dotenv
75 changes: 75 additions & 0 deletions counter/src/main/java/com/example/MessageCountPerSecond.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Produced;
import org.json.JSONObject;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;

public class MessageCountPerSecond {

public static void main(String[] args) {
try {

String workspace_id = System.getenv("Quix__Workspace__Id");

// Get Kafka configuration properties
Properties props = QuixConfigBuilder.buildKafkaProperties();

props.put("application.id", "message-count");

// SerDes settings.
props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> messageStream = builder.stream(workspace_id + "-" + System.getenv("input"));

// Count messages per second
KTable<Windowed<String>, Long> messageCountPerSecond = messageStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1)))
.count();

// Convert the count data to JSON format before sending it to the output topic
KStream<String, String> jsonOutputStream = messageCountPerSecond.toStream()
.map((windowedKey, count) -> {
long windowStart = windowedKey.window().start();
long windowEnd = windowedKey.window().end();

// Create a JSON object with window-start, window-end, and count
JSONObject jsonObject = new JSONObject();
jsonObject.put("window-start", windowStart);
jsonObject.put("window-end", windowEnd);
jsonObject.put("count", count);

// Convert the Windowed key to a simple string representation (optional)
String key = windowedKey.key();

// Return the new key-value pair where value is JSON string
return new org.apache.kafka.streams.KeyValue<>(key, jsonObject.toString());
});

// Output the JSON results to another topic
jsonOutputStream.to(workspace_id + "-" + System.getenv("output"), Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Gracefully shutdown on exit
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

} catch (IOException | InterruptedException e) {
e.printStackTrace();
System.exit(1);
}
}
}
58 changes: 58 additions & 0 deletions counter/src/main/java/com/example/QuixConfigBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.example;

import org.apache.kafka.common.protocol.types.Field.Str;
import org.json.JSONObject;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Properties;

public class QuixConfigBuilder {



private static final String KAFKA_CONFIG_API_URL = "%s/workspaces/%s/broker/librdkafka";

public static Properties buildKafkaProperties() throws IOException, InterruptedException {
String workspaceId = System.getenv("Quix__Workspace__Id");
String token = System.getenv("Quix__Sdk__Token");
String portalApi = System.getenv("Quix__Portal__Api");


HttpClient client = HttpClient.newHttpClient();

// API Call to get Kafka configuration
HttpRequest kafkaConfigRequest = HttpRequest.newBuilder()
.uri(URI.create(String.format(KAFKA_CONFIG_API_URL, portalApi, workspaceId)))
.header("accept", "text/plain")
.header("X-Version", "2.0")
.header("Authorization", "bearer " + token)
.build();

HttpResponse<String> kafkaConfigResponse = client.send(kafkaConfigRequest, HttpResponse.BodyHandlers.ofString());

// Parse the JSON response
JSONObject jsonResponse = new JSONObject(kafkaConfigResponse.body());
String bootstrapServers = jsonResponse.getString("bootstrap.servers");
String securityProtocol = jsonResponse.getString("security.protocol");
String saslMechanism = jsonResponse.getString("sasl.mechanism");
String saslUsername = jsonResponse.getString("sasl.username");
String saslPassword = jsonResponse.getString("sasl.password");
String clientId = jsonResponse.getString("client.id");

// Set up Kafka Streams configuration properties
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
saslUsername, saslPassword));
props.put("client.id", clientId);

return props;
}
}
13 changes: 13 additions & 0 deletions counter/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Root logger option
log4j.rootLogger=INFO, console

# Console appender configuration
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c: %m%n

# Optional: Configure Kafka Streams logging levels
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.clients=INFO
38 changes: 38 additions & 0 deletions counter/src/test/java/com/example/AppTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}

/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}

/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}
Loading

0 comments on commit 7852b75

Please sign in to comment.