Skip to content

Commit

Permalink
Merge pull request #72 from shiva-rakshith/audit-indexer
Browse files Browse the repository at this point in the history
Audit Indexer Job
  • Loading branch information
maheshkumargangula authored Feb 1, 2022
2 parents fa000bb + 23e760b commit 3fe1dca
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 3 deletions.
144 changes: 144 additions & 0 deletions hcx-pipeline-jobs/audit-indexer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>pipeline-jobs</artifactId>
<groupId>org.swasth</groupId>
<version>1.0</version>
</parent>

<groupId>org.swasth</groupId>
<artifactId>audit-indexer</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>

<repositories>
<repository>
<id>my-local-repo</id>
<url>file://${user.home}/.m2/repository</url>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.swasth</groupId>
<artifactId>core</artifactId>
<version>1.0.0</version>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.7</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>

<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.target.runtime}</source>
<target>${java.target.runtime}</target>
</configuration>
</plugin>

<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.swasth.dp.auditindexer.task.AuditIndexerStreamTask</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.swasth.dp.auditindexer.functions;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swasth.dp.auditindexer.task.AuditIndexerConfig;
import org.swasth.dp.core.job.Metrics;
import org.swasth.dp.core.util.ElasticSearchUtil;
import org.swasth.dp.core.util.JSONUtil;

import java.io.IOException;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;

public class AuditIndexerProcessFunction extends ProcessFunction<Map<String,Object>,Metrics> {

private Logger logger = LoggerFactory.getLogger(AuditIndexerProcessFunction.class);
private AuditIndexerConfig config;
private ElasticSearchUtil esUtil;

public AuditIndexerProcessFunction(AuditIndexerConfig config) {
this.config = config;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
esUtil = new ElasticSearchUtil(config.esUrl, config.auditIndex, config.batchSize);
}

@Override
public void close() throws Exception {
super.close();
esUtil.close();
}

@Override
public void processElement(Map<String, Object> event, ProcessFunction<Map<String, Object>, Metrics>.Context context, Collector<Metrics> collector) throws Exception {
try {
String indexName = getIndexName((Long) event.get("auditTimeStamp"));
String apiCallId = (String) event.get("x-hcx-api_call_id");
createIndex(indexName);
esUtil.addDocumentWithIndex(JSONUtil.serialize(event), indexName, apiCallId);
System.out.println("Audit record created for " + apiCallId);
logger.info("Audit record created for " + apiCallId);
//TODO: add metrics
}
catch (IOException e) {
logger.error("Error while indexing message :: " + event + " :: " + e.getMessage());
throw e;
}
catch (Exception e) {
logger.error("Error while processing message :: " + event + " :: " + e.getMessage());
throw e;
}
}

private String getIndexName(long ets){
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone(config.timeZone));
cal.setTime(new Date(ets));
return config.auditIndex + "_" + cal.get(Calendar.YEAR) + "_" + cal.get(Calendar.WEEK_OF_YEAR);
}

private void createIndex(String indexName){
String settings = "{ \"index\": { } }";
if (settings == null) {
logger.error("Failed to load index settings");
}
String mappings = "{ \"properties\": { \"x-hcx-sender_code\": { \"type\": \"keyword\" }, \"x-hcx-recipient_code\": { \"type\": \"keyword\" }, \"x-hcx-api_call_id\": { \"type\": \"keyword\" }, \"x-hcx-correlation_id\": { \"type\": \"keyword\" }, \"x-hcx-workflow_id\": { \"type\": \"keyword\" }, \"x-hcx-timestamp\": { \"type\": \"date\", \"format\":\"date_time\" }, \"mid\": { \"type\": \"keyword\" }, \"action\": { \"type\": \"keyword\" }, \"status\": { \"type\": \"keyword\" }, \"auditTimeStamp\": { \"type\": \"keyword\" }, \"requestTimeStamp\": { \"type\": \"keyword\" }, \"updatedTimestamp\": { \"type\": \"keyword\" }, \"error_details\": { \"type\": \"object\" }, \"debug_details\": { \"type\": \"object\" } } }";
if (mappings == null) {
logger.error("Failed to load mappings for index with name '{}'", config.auditAlias);
}
esUtil.addIndex(settings, mappings, indexName, config.auditAlias);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.swasth.dp.auditindexer.task;

import com.typesafe.config.Config;
import org.swasth.dp.core.job.BaseJobConfig;

public class AuditIndexerConfig extends BaseJobConfig {

private Config config;

// kafka
public String kafkaInputTopic;

// Consumers
public String auditIndexerConsumer = "audit-indexer-consumer";
public int consumerParallelism;
public int parallelism;

// Elastic Search Config
public String esUrl;
public String timeZone;
public String auditIndex = "hcx_audit";
public String auditAlias = "hcx_audit";
public int batchSize = 1000;

public AuditIndexerConfig(Config config, String jobName) {
super(config, jobName);
this.config = config;
initValues();
}

private void initValues(){
kafkaInputTopic = config.getString("kafka.input.topic");
consumerParallelism = config.getInt("task.consumer.parallelism");
parallelism = config.getInt("task.parallelism");
esUrl = config.getString("es.basePath");
timeZone = config.hasPath("timezone") ? config.getString("timezone") : "IST";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.swasth.dp.auditindexer.task;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swasth.dp.auditindexer.functions.AuditIndexerProcessFunction;
import org.swasth.dp.core.job.BaseJobConfig;
import org.swasth.dp.core.job.FlinkKafkaConnector;
import org.swasth.dp.core.util.FlinkUtil;
import scala.Option;
import scala.Some;

import java.io.File;
import java.util.Map;


public class AuditIndexerStreamTask {

private final static Logger logger = LoggerFactory.getLogger(AuditIndexerStreamTask.class);
private AuditIndexerConfig config;
private FlinkKafkaConnector kafkaConnector;

public AuditIndexerStreamTask(AuditIndexerConfig config, FlinkKafkaConnector kafkaConnector){
this.config = config;
this.kafkaConnector = kafkaConnector;
}

public static void main(String[] args) throws Exception {
Option<String> configFilePath = new Some<String>(ParameterTool.fromArgs(args).get("config.file.path"));
Config conf = configFilePath.map(path -> ConfigFactory.parseFile(new File(path)).resolve())
.getOrElse(() -> ConfigFactory.load("resources/audit-indexer.conf").withFallback(ConfigFactory.systemEnvironment()));
AuditIndexerConfig config = new AuditIndexerConfig(conf,"AuditIndexer-Job");
FlinkKafkaConnector kafkaConnector = new FlinkKafkaConnector(config);
AuditIndexerStreamTask auditIndexerStreamTask = new AuditIndexerStreamTask(config, kafkaConnector);
try {
auditIndexerStreamTask.process(config);
} catch (Exception e) {
logger.error("Error while processing audit indexer stream job: " + e.getMessage());
throw e;
}
}

private void process(BaseJobConfig baseJobConfig) throws Exception {
StreamExecutionEnvironment env = FlinkUtil.getExecutionContext(baseJobConfig);
SourceFunction<Map<String,Object>> kafkaConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic);

env.addSource(kafkaConsumer, config.auditIndexerConsumer)
.uid(config.auditIndexerConsumer).setParallelism(config.consumerParallelism)
.rebalance()
.process(new AuditIndexerProcessFunction(config)).setParallelism(config.parallelism);

System.out.println(config.jobName() + " is processing");
env.execute(config.jobName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
include required("base-config.conf")

kafka {
input.topic = ${job.env}".hcx.audit"
groupId = ${job.env}"-audit-group"
}

task {
consumer.parallelism = 1
parallelism = 1
}

timezone = "IST"
7 changes: 6 additions & 1 deletion hcx-pipeline-jobs/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down Expand Up @@ -139,6 +139,11 @@
<artifactId>keycloak-admin-client</artifactId>
<version>14.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
</dependencies>

<build>
Expand Down
4 changes: 4 additions & 0 deletions hcx-pipeline-jobs/core/src/main/resources/base-config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ lms-cassandra {
port = "9042"
}

es {
basePath = "localhost:9200"
}

registry {
endPointUrl = "http://a2aab11985f46432cb2905d56b8b923f-628388358.ap-south-1.elb.amazonaws.com:8081/api/v1/Organisation/search"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl
val enrichedOutputTag: OutputTag[util.Map[String, AnyRef]] = OutputTag[util.Map[String, AnyRef]]("enriched-events")

val retryOutputTag: OutputTag[String] = OutputTag[String]("retry-events")
val retryTopic = config.getString("kafka.retry.topic")
val retryTopic = if (config.hasPath("kafka.retry.topic")) config.getString("kafka.retry.topic") else ""

val auditOutputTag: OutputTag[String] = OutputTag[String]("audit-events")
val auditTopic = config.getString("kafka.audit.topic")
val auditTopic = if (config.hasPath("kafka.audit.topic")) config.getString("kafka.audit.topic") else ""

// Producers
val retryProducer = "retry-events-sink"
Expand Down
Loading

0 comments on commit 3fe1dca

Please sign in to comment.