Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pattern matching in json kafka plugin #85

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/plugins/output/json-kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ add_library(json-kafka-output MODULE
src/Storage.hpp
src/Kafka.cpp
src/Kafka.hpp
src/pattern-matching/HyperscanCppInterface.hh
src/pattern-matching/PatternMatching.hh
)

find_package(LibRDKafka 0.9.3 REQUIRED)
find_package(ZLIB REQUIRED)
find_package(HS MODULE REQUIRED)

include_directories(
${LIBRDKAFKA_INCLUDE_DIRS} # librdkafka
)
target_link_libraries(json-kafka-output
${LIBRDKAFKA_LIBRARIES}
${HS_LIBRARIES}
)

install(
Expand Down
19 changes: 12 additions & 7 deletions src/plugins/output/json-kafka/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ Don't forget to remove (or comment) outputs that you don't want to use!
<kafka>
<name>Send to Kafka</name>
<brokers>127.0.0.1</brokers>
<topic>ipfix</topic>
<blocking>false</blocking>
<partition>unassigned</partition>
<patternTopic>
<regex>*</regex>
<topic>ipfix</topic>
<partition>unassigned</partition>
</patternTopic>

<!-- Zero or more additional properties -->
<property>
Expand Down Expand Up @@ -188,11 +191,13 @@ at the same time if the outputs are not in collision with each other.
:``name``: Identification name of the output. Used only for readability.
:``brokers``:
Initial list of brokers as a CSV list of broker "host" or "host:port".
:``topic``:
Kafka topic to produce to.
:``partition``:
Partition number to produce to. If the value is "unassigned", then the default random
distribution is used. [default: "unassigned"]
:``patternTopic``:
This section determines the output topic of ipfix message with a pattern.
In order to load balance or separate output topics of ipfix messages, specify the
`regex`, `topic`, and `partition` [default: "unassigned"]. This means, that
if `regex` is matched in the ipfix message, the message is produced to the
`topic`:`partition` in Kafka. It is possible to define more than one pattern,
if none of the `regex` patterns is matched, the first topic will be selected.
:``brokerVersion``:
Older broker versions (before 0.10.0) provide no way for a client to query for
supported protocol features making it impossible for the client to know what features
Expand Down
87 changes: 64 additions & 23 deletions src/plugins/output/json-kafka/src/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,16 @@ enum params_xml_nodes {
// Kafka output
KAFKA_NAME, /**< Name of the output */
KAFKA_BROKERS, /**< List of brokers */
KAFKA_TOPIC, /**< Topic */
KAFKA_PARTION, /**< Producer partition */
KAFKA_BVERSION, /**< Broker fallback version */
KAFKA_BLOCKING, /**< Block when queue is full */
KAFKA_PERF_TUN, /**< Add performance tuning options */
KAFKA_PROPERTY, /**< Additional librdkafka property */
KAFKA_PROP_KEY, /**< Property key */
KAFKA_PROP_VALUE, /**< Property value */
KAFKA_PATTERN, /**< Regex patterns to determine output topic */
KAFKA_PATTERN_REGEX, /**< Pattern regex */
KAFKA_PATTERN_TOPIC, /**< Pattern output topic */
KAFKA_PATTERN_PARTITION, /**< pattern output topic partition */
};

/** Definition of the \<property\> of \<kafka\> node */
Expand All @@ -89,16 +91,23 @@ static const struct fds_xml_args args_kafka_prop[] = {
FDS_OPTS_END
};

/** Definition of the \<pattern\> of \<kafka\> node */
static const struct fds_xml_args args_kafka_pattern_topic[] = {
FDS_OPTS_ELEM(KAFKA_PATTERN_REGEX, "regex", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(KAFKA_PATTERN_TOPIC, "topic", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(KAFKA_PATTERN_PARTITION, "partition", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
FDS_OPTS_END
};

/** Definition of the \<kafka\> node */
static const struct fds_xml_args args_kafka[] = {
FDS_OPTS_ELEM(KAFKA_NAME, "name", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(KAFKA_BROKERS, "brokers", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(KAFKA_TOPIC, "topic", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(KAFKA_PARTION, "partition", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(KAFKA_BVERSION, "brokerVersion", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(KAFKA_BLOCKING, "blocking", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(KAFKA_PERF_TUN, "performanceTuning", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
FDS_OPTS_NESTED(KAFKA_PROPERTY, "property", args_kafka_prop, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI),
FDS_OPTS_NESTED(KAFKA_PATTERN, "patternTopic", args_kafka_pattern_topic, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI),
FDS_OPTS_END
};

Expand Down Expand Up @@ -203,6 +212,51 @@ Config::parse_kafka_property(struct cfg_kafka &kafka, fds_xml_ctx_t *property)
kafka.properties.emplace(key, value);
}

void
Config::parse_kafka_pattern(struct cfg_kafka &kafka, fds_xml_ctx_t *pattern)
{
std::string regex, topic;
int32_t partition = RD_KAFKA_PARTITION_UA;

// For partition parser
int32_t value;
char aux;

const struct fds_xml_cont *content;
while (fds_xml_next(pattern, &content) != FDS_EOC) {
switch (content->id) {
case KAFKA_PATTERN_REGEX:
assert(content->type == FDS_OPTS_T_STRING);
regex = content->ptr_string;
break;
case KAFKA_PATTERN_TOPIC:
assert(content->type == FDS_OPTS_T_STRING);
topic = content->ptr_string;
break;
case KAFKA_PATTERN_PARTITION:
assert(content->type == FDS_OPTS_T_STRING);
if (strcasecmp(content->ptr_string, "unassigned") == 0) {
partition = RD_KAFKA_PARTITION_UA;
break;
}

if (sscanf(content->ptr_string, "%" SCNi32 "%c", &value, &aux) != 1 || value < 0) {
throw std::invalid_argument("Invalid partition number of a <kafka> output!");
}
partition = value;
break;
default:
throw std::invalid_argument("Unexpected element within <pattern>!");
}
}

if (regex.empty() || topic.empty()) {
throw std::invalid_argument("pattern key of a <kafka> output cannot be empty!");
}

kafka.pattern_topics.emplace_back(cfg_pattern_topic{ regex, topic, partition });
}

/**
* \brief Parse "kafka" output parameters
*
Expand All @@ -215,7 +269,6 @@ Config::parse_kafka(fds_xml_ctx_t *kafka)
{
// Prepare default values
struct cfg_kafka output;
output.partition = RD_KAFKA_PARTITION_UA;
output.blocking = false;
output.perf_tuning = true;

Expand All @@ -234,22 +287,6 @@ Config::parse_kafka(fds_xml_ctx_t *kafka)
assert(content->type == FDS_OPTS_T_STRING);
output.brokers = content->ptr_string;
break;
case KAFKA_TOPIC:
assert(content->type == FDS_OPTS_T_STRING);
output.topic = content->ptr_string;
break;
case KAFKA_PARTION:
assert(content->type == FDS_OPTS_T_STRING);
if (strcasecmp(content->ptr_string, "unassigned") == 0) {
output.partition = RD_KAFKA_PARTITION_UA;
break;
}

if (sscanf(content->ptr_string, "%" SCNi32 "%c", &value, &aux) != 1 || value < 0) {
throw std::invalid_argument("Invalid partition number of a <kafka> output!");
}
output.partition = value;
break;
case KAFKA_BVERSION:
assert(content->type == FDS_OPTS_T_STRING);
output.broker_fallback = content->ptr_string;
Expand All @@ -266,6 +303,10 @@ Config::parse_kafka(fds_xml_ctx_t *kafka)
assert(content->type == FDS_OPTS_T_CONTEXT);
parse_kafka_property(output, content->ptr_ctx);
break;
case KAFKA_PATTERN:
assert(content->type == FDS_OPTS_T_CONTEXT);
parse_kafka_pattern(output, content->ptr_ctx);
break;
default:
throw std::invalid_argument("Unexpected element within <kafka>!");
}
Expand All @@ -275,8 +316,8 @@ Config::parse_kafka(fds_xml_ctx_t *kafka)
if (output.brokers.empty()) {
throw std::invalid_argument("List of <kafka> brokers must be specified!");
}
if (output.topic.empty()) {
throw std::invalid_argument("Topic of <kafka> output must be specified!");
if (output.pattern_topics.empty()) {
throw std::invalid_argument("Pattern Topic of <kafka> output must be specified!");
}
if (!output.broker_fallback.empty()) {
// Try to check if version string is valid version (at least expect major + minor version)
Expand Down
16 changes: 12 additions & 4 deletions src/plugins/output/json-kafka/src/Config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ struct cfg_format {
bool template_info;
};

struct cfg_pattern_topic
{
std::string regex;
std::string topic;
int32_t partition;

cfg_pattern_topic() = default;
};

/** Output configuration base structure */
struct cfg_output {
/** Plugin identification */
Expand All @@ -83,10 +92,8 @@ struct cfg_output {
struct cfg_kafka : cfg_output {
/// Comma separated list of IP[:Port]
std::string brokers;
/// Produced topic
std::string topic;
/// Partition to which data should be send
int32_t partition;
/// Message regex pattern to determine output topic
std::vector<cfg_pattern_topic> pattern_topics;
/// Broker version fallback (empty or X.X.X.X)
std::string broker_fallback;
/// Block conversion if sender buffer is full
Expand All @@ -108,6 +115,7 @@ class Config {
void default_set();
void parse_kafka(fds_xml_ctx_t *kafka);
void parse_kafka_property(struct cfg_kafka &kafka, fds_xml_ctx_t *property);
void parse_kafka_pattern(struct cfg_kafka &kafka, fds_xml_ctx_t *pattern);
void parse_outputs(fds_xml_ctx_t *outputs);
void parse_params(fds_xml_ctx_t *params);

Expand Down
52 changes: 41 additions & 11 deletions src/plugins/output/json-kafka/src/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

#include "Config.hpp"
#include "Kafka.hpp"
#include "pattern-matching/PatternMatching.hh"

#include <pthread.h>
#include <stdexcept>

Expand All @@ -55,7 +57,7 @@
* \param[in] ctx Instance context
*/
Kafka::Kafka(const struct cfg_kafka &cfg, ipx_ctx_t *ctx)
: Output(cfg.name, ctx), m_partition(cfg.partition)
: Output(cfg.name, ctx)
{
IPX_CTX_DEBUG(_ctx, "Initialization of Kafka connector in progress...", '\0');
IPX_CTX_INFO(_ctx, "The plugin was built against librdkafka %X, now using %X",
Expand Down Expand Up @@ -107,14 +109,7 @@ Kafka::Kafka(const struct cfg_kafka &cfg, ipx_ctx_t *ctx)
}
kafka_cfg.release(); // Ownership has been successfully passed to the kafka

// Create the topic
m_topic.reset(rd_kafka_topic_new(m_kafka.get(), cfg.topic.c_str(), nullptr));
if (!m_topic) {
rd_kafka_resp_err_t err_code = rd_kafka_last_error();
const char *err_msg = rd_kafka_err2str(err_code);
throw std::runtime_error("rd_kafka_topic_new() failed: " + std::string(err_msg));
}

handle_pattern_topics(cfg.pattern_topics);
// Start poller thread
m_thread->stop = false;
m_thread->ctx = ctx;
Expand All @@ -126,6 +121,35 @@ Kafka::Kafka(const struct cfg_kafka &cfg, ipx_ctx_t *ctx)
IPX_CTX_DEBUG(_ctx, "Kafka connector successfully initialized!", '\0')
}

/**
* \brief Handles input pattern topics. At first create Kafka topic respectively and then register
* the regex pattern to the pattern matcher with topic id as a pattern id.
* \param[in] pattern_topics Pattern topics configuration.
*/
void
Kafka::handle_pattern_topics(const std::vector<cfg_pattern_topic> &pattern_topics)
{
const auto &callback_id = pattern_matcher.register_callback(pattern_matching_callback);

for (const auto &pattern: pattern_topics)
{
m_pattern_topics.emplace_back(nullptr, &rd_kafka_topic_destroy);
m_pattern_topics.back().reset(rd_kafka_topic_new(m_kafka.get(), pattern.topic.c_str(), nullptr));

m_topics_partition.emplace_back(pattern.partition);

if (!m_pattern_topics.back()) {
rd_kafka_resp_err_t err_code = rd_kafka_last_error();
const char *err_msg = rd_kafka_err2str(err_code);
throw std::runtime_error("rd_kafka_topic_new() failed: " + std::string(err_msg));
}

pattern_matcher.register_pattern(R"(/)" + pattern.regex + R"(/sa)", m_pattern_topics.size() - 1, callback_id);
}

pattern_matcher.update_database();
}

/** Destructor */
Kafka::~Kafka()
{
Expand All @@ -146,7 +170,9 @@ Kafka::~Kafka()
}

// Destruction must be called in this order!
m_topic.reset(nullptr);
for (auto &topic: m_pattern_topics)
topic.reset(nullptr);

m_kafka.reset(nullptr);

IPX_CTX_DEBUG(_ctx, "Destruction of Kafka connector completed!", '\0');
Expand All @@ -161,7 +187,11 @@ Kafka::~Kafka()
int
Kafka::process(const char *str, size_t len)
{
int rc = rd_kafka_produce(m_topic.get(), m_partition, m_produce_flags,
PatternMatchingUserData user_data{};

pattern_matcher.match_pattern(str, len, user_data);

int rc = rd_kafka_produce(m_pattern_topics[user_data.pattern_id].get(), m_topics_partition[user_data.pattern_id], m_produce_flags,
// Payload and length (without tailing new-line character)
reinterpret_cast<void *>(const_cast<char *>(str)), len - 1,
NULL, 0, // Optional key and its length
Expand Down
33 changes: 29 additions & 4 deletions src/plugins/output/json-kafka/src/Kafka.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
#define JSON_KAFKA_H

#include "Storage.hpp"
#include "pattern-matching/PatternMatching.hh"

#include <atomic>
#include <ctime>
#include <memory>
#include <vector>
#include <librdkafka/rdkafka.h>

/** JSON kafka connector */
Expand All @@ -66,6 +68,29 @@ class Kafka : public Output {
using uniq_config = std::unique_ptr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy)>;
using map_params = std::map<std::string, std::string>;

struct PatternMatchingUserData
{
std::size_t pattern_id;

PatternMatchingUserData() : pattern_id(0)
{}
};

PatternMatching::PatternMatching<PatternMatchingUserData> pattern_matcher;

static std::size_t pattern_matching_callback(const std::size_t pattern_id,
const std::size_t,
const std::size_t,
PatternMatchingUserData &user_data)
{
user_data.pattern_id = pattern_id;

static constexpr std::size_t DoStopPatternMatching = 1;
return DoStopPatternMatching;
}

void handle_pattern_topics(const std::vector<cfg_pattern_topic> &);

/// Poller timeout for events (milliseconds)
static constexpr int POLLER_TIMEOUT = 100;
/// Flush timeout before shutdown of the connector (milliseconds)
Expand All @@ -88,10 +113,10 @@ class Kafka : public Output {
map_params m_params;
/// Kafka object
uniq_kafka m_kafka = {nullptr, &rd_kafka_destroy};
/// Topic object
uniq_topic m_topic = {nullptr, &rd_kafka_topic_destroy};
/// Producer partition
int32_t m_partition;
/// Topics of patterns
std::vector<uniq_topic> m_pattern_topics;
/// Topics partition ot produce
std::vector<int32_t> m_topics_partition;
/// Producer flags
int m_produce_flags;
/// Polling thread
Expand Down
Loading