Skip to content

Commit

Permalink
reading kafka key and value serializer from config
Browse files Browse the repository at this point in the history
  • Loading branch information
ag060 committed Jan 8, 2025
1 parent 489fb59 commit dd567f0
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.akto.kafka.KafkaConfig;
import com.akto.kafka.KafkaConsumerConfig;
import com.akto.kafka.KafkaProducerConfig;
import com.akto.kafka.Serializer;
import com.akto.threat.backend.client.IPLookupClient;
import com.akto.threat.backend.service.MaliciousEventService;
import com.akto.threat.backend.service.ThreatActorService;
Expand Down Expand Up @@ -56,6 +57,8 @@ public static void main(String[] args) throws Exception {
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(1000).build())
.setKeySerializer(Serializer.STRING)
.setValueSerializer(Serializer.STRING)
.build();

IPLookupClient ipLookupClient = new IPLookupClient(getMaxmindFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
public class MaliciousEventService {

private final Kafka kafka;
private MongoClient mongoClient;
private IPLookupClient ipLookupClient;
private final MongoClient mongoClient;
private final IPLookupClient ipLookupClient;

public MaliciousEventService(
KafkaConfig kafkaConfig, MongoClient mongoClient, IPLookupClient ipLookupClient) {
Expand All @@ -44,8 +44,6 @@ public MaliciousEventService(
}

public void recordMaliciousEvent(String accountId, RecordMaliciousEventRequest request) {
System.out.println("Received malicious event: " + request);

MaliciousEventMessage evt = request.getMaliciousEvent();
String actor = evt.getActor();
String filterId = evt.getFilterId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class FlushMessagesToDB {

Expand All @@ -32,11 +35,22 @@ public class FlushMessagesToDB {

public FlushMessagesToDB(KafkaConfig kafkaConfig, MongoClient mongoClient) {
String kafkaBrokerUrl = kafkaConfig.getBootstrapServers();
String groupId = kafkaConfig.getGroupId();

Properties properties =
Utils.configProperties(
kafkaBrokerUrl, groupId, kafkaConfig.getConsumerConfig().getMaxPollRecords());
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl);
properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getKeySerializer().getDeserializer());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
kafkaConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

this.kafkaConsumer = new KafkaConsumer<>(properties);
this.kafkaConfig = kafkaConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.akto.kafka.KafkaConfig;
import com.akto.kafka.KafkaConsumerConfig;
import com.akto.kafka.KafkaProducerConfig;
import com.akto.kafka.Serializer;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.session_factory.SessionFactoryUtils;
import com.akto.threat.detection.tasks.CleanupTask;
Expand All @@ -19,7 +20,7 @@ public class Main {

private static final String CONSUMER_GROUP_ID = "akto.threat_detection";

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
runMigrations();

SessionFactory sessionFactory = SessionFactoryUtils.createFactory();
Expand All @@ -37,6 +38,8 @@ public static void main(String[] args) throws Exception {
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(100).build())
.setKeySerializer(Serializer.STRING)
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();

KafkaConfig internalKafka =
Expand All @@ -50,6 +53,8 @@ public static void main(String[] args) throws Exception {
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(100).build())
.setKeySerializer(Serializer.STRING)
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package com.akto.threat.detection.kafka;

import com.akto.kafka.KafkaConfig;
import com.akto.kafka.Serializer;
import com.google.protobuf.Message;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProtoProducer {
private KafkaProducer<String, byte[]> producer;
private final KafkaProducer<String, byte[]> producer;
public boolean producerReady;

public KafkaProtoProducer(KafkaConfig kafkaConfig) {
this.producer = generateProducer(
kafkaConfig.getBootstrapServers(),
kafkaConfig.getProducerConfig().getLingerMs(),
kafkaConfig.getProducerConfig().getBatchSize());
this.producer =
generateProducer(
kafkaConfig.getBootstrapServers(),
kafkaConfig.getProducerConfig().getLingerMs(),
kafkaConfig.getProducerConfig().getBatchSize());
}

public void send(String topic, Message message) {
Expand All @@ -28,21 +29,21 @@ public void close() {
producer.close(Duration.ofMillis(0)); // close immediately
}

private KafkaProducer<String, byte[]> generateProducer(String brokerIP, int lingerMS, int batchSize) {
if (producer != null)
close(); // close existing producer connection
private KafkaProducer<String, byte[]> generateProducer(
String brokerIP, int lingerMS, int batchSize) {
if (producer != null) close(); // close existing producer connection

int requestTimeoutMs = 5000;
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.STRING.getSerializer());
kafkaProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.BYTE_ARRAY.getSerializer());
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS);
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0);
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs);
return new KafkaProducer<String, byte[]>(kafkaProps);
return new KafkaProducer<>(kafkaProps);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.akto.threat.detection.tasks;

import com.akto.kafka.KafkaConfig;
import com.akto.runtime.utils.Utils;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
Expand All @@ -11,7 +11,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public abstract class AbstractKafkaConsumerTask<V> implements Task {

Expand All @@ -23,15 +22,17 @@ public AbstractKafkaConsumerTask(KafkaConfig kafkaConfig, String kafkaTopic) {
this.kafkaTopic = kafkaTopic;
this.kafkaConfig = kafkaConfig;

String kafkaBrokerUrl = kafkaConfig.getBootstrapServers();
String groupId = kafkaConfig.getGroupId();

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
kafkaConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Expand All @@ -49,8 +50,9 @@ public void run() {
() -> {
// Poll data from Kafka topic
while (true) {
ConsumerRecords<String, V> records = kafkaConsumer.poll(
Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli()));
ConsumerRecords<String, V> records =
kafkaConsumer.poll(
Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli()));
if (records.isEmpty()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope;
import com.akto.proto.http_response_param.v1.HttpResponseParam;
import com.akto.rules.TestPlugin;
import com.akto.runtime.utils.Utils;
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
Expand All @@ -38,10 +37,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;

/*
Class is responsible for consuming traffic data from the Kafka topic.
Expand All @@ -66,13 +62,21 @@ public MaliciousTrafficDetectorTask(
KafkaConfig trafficConfig, KafkaConfig internalConfig, RedisClient redisClient) {
this.kafkaConfig = trafficConfig;

String kafkaBrokerUrl = trafficConfig.getBootstrapServers();
String groupId = trafficConfig.getGroupId();

this.kafkaConsumer =
new KafkaConsumer<>(
Utils.configProperties(
kafkaBrokerUrl, groupId, trafficConfig.getConsumerConfig().getMaxPollRecords()));
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, trafficConfig.getBootstrapServers());
properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
trafficConfig.getKeySerializer().getDeserializer());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
trafficConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
trafficConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, trafficConfig.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.kafkaConsumer = new KafkaConsumer<>(properties);

this.httpCallParser = new HttpCallParser(120, 1000);

Expand Down
29 changes: 22 additions & 7 deletions libs/utils/src/main/java/com/akto/kafka/Kafka.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.akto.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,18 +16,29 @@ public Kafka(KafkaConfig kafkaConfig) {
this(
kafkaConfig.getBootstrapServers(),
kafkaConfig.getProducerConfig().getLingerMs(),
kafkaConfig.getProducerConfig().getBatchSize());
kafkaConfig.getProducerConfig().getBatchSize(),
kafkaConfig.getKeySerializer(),
kafkaConfig.getValueSerializer());
}

public Kafka(String brokerIP, int lingerMS, int batchSize) {
public Kafka(
String brokerIP,
int lingerMS,
int batchSize,
Serializer keySerializer,
Serializer valueSerializer) {
producerReady = false;
try {
setProducer(brokerIP, lingerMS, batchSize);
setProducer(brokerIP, lingerMS, batchSize, keySerializer, valueSerializer);
} catch (Exception e) {
e.printStackTrace();
}
}

public Kafka(String brokerIP, int lingerMS, int batchSize) {
this(brokerIP, 0, 0, Serializer.STRING, Serializer.STRING);
}

public void send(String message, String topic) {
if (!this.producerReady) return;

Expand All @@ -41,14 +51,19 @@ public void close() {
producer.close(Duration.ofMillis(0)); // close immediately
}

private void setProducer(String brokerIP, int lingerMS, int batchSize) {
private void setProducer(
String brokerIP,
int lingerMS,
int batchSize,
Serializer keySerializer,
Serializer valueSerializer) {
if (producer != null) close(); // close existing producer connection

int requestTimeoutMs = 5000;
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getSerializer());
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getSerializer());
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS);
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0);
Expand Down
26 changes: 26 additions & 0 deletions libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ public class KafkaConfig {
private final String groupId;
private final KafkaConsumerConfig consumerConfig;
private final KafkaProducerConfig producerConfig;
private final Serializer keySerializer;
private final Serializer valueSerializer;

public static class Builder {
private String bootstrapServers;
private String groupId;
private KafkaConsumerConfig consumerConfig;
private KafkaProducerConfig producerConfig;
private Serializer keySerializer;
private Serializer valueSerializer;

private Builder() {}

Expand All @@ -34,6 +38,16 @@ public Builder setProducerConfig(KafkaProducerConfig producerConfig) {
return this;
}

public Builder setKeySerializer(Serializer keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public Builder setValueSerializer(Serializer valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

public KafkaConfig build() {
return new KafkaConfig(this);
}
Expand All @@ -44,6 +58,10 @@ private KafkaConfig(Builder builder) {
this.groupId = builder.groupId;
this.consumerConfig = builder.consumerConfig;
this.producerConfig = builder.producerConfig;

this.keySerializer = builder.keySerializer == null ? Serializer.STRING : builder.keySerializer;
this.valueSerializer =
builder.valueSerializer == null ? Serializer.STRING : builder.valueSerializer;
}

public String getBootstrapServers() {
Expand All @@ -62,6 +80,14 @@ public KafkaProducerConfig getProducerConfig() {
return producerConfig;
}

public Serializer getKeySerializer() {
return keySerializer;
}

public Serializer getValueSerializer() {
return valueSerializer;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down
Loading

0 comments on commit dd567f0

Please sign in to comment.