-
Notifications
You must be signed in to change notification settings - Fork 0
/
Script for Python code.txt
85 lines (70 loc) · 5.21 KB
/
Script for Python code.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
CODE FLOW FOR STREAMING LAYER - LOGIC PDF
=========================================
I) CODE SETUP AND EXECUTION STEPS
==============================
1. Log into EC2 instance and start the cloudera instance.
2. Start a putty instance with ec2-user. Switch to root user ( sudo -i)
3. Create the following folders in linux
rm -rf creditcardproject
mkdir creditcardproject
cd creditcardproject
4. WinSCP the python.zip folder under creditcardproject folder. Unzip it. cd python/src
5. Update the name of your EC2 server instance into the dao.py file using following command
vi db/dao.py Update the hostname to your EC2 instance ec2-54-236-48-79.compute-1.amazonaws.com
5. Start the Thrift Server
Check if thrift server is running with the following command : jps
Start Thrift server : /opt/cloudera/parcels/CDH/lib/hbase/bin/hbase-daemon.sh start thrift -p 9090
6. Run the application with the following command
export SPARK_KAFKA_VERSION=0.10
spark2-submit --py-files src.zip --jars spark-sql-kafka-0-10_2.11-2.3.0.jar --files uszipsv.csv driver.py
II) CODING LOGIC EXPLAINED.
=======================
A ) Code structure and brief explanation of code files.
===================================================
python/
-----src/
----uszipsv.csv ( Contains all the latitude/longitude values for postcodes. Used by geo_map.py)
----spark-sql-kafka-0-10_2.11-2.3.0.jar
----driver.py (Python file that contains the spark application and which starts the code flow)
----rules /
----rules.py (Python file that contains class CC_Rules. This has methods to calculate rules like ucl, score, transaction distance and transaction time.
This class also has method to validate the transaction.
At the end of transaction validation, it appends the transaction to "card_transaction_master" hbase table with the new transaction and it also updates the "card_transaction_lookup" hbase table for transaction_date and postcode details of last transaction
---- db /
---- dao.py ( Python file that contains class HBase.dao which has methods to read and write data to hbase Db. )
---- geo_map.py ( Python file that contains class Geo_Map which returns latitude & longtitude based on postcodes.
I have created my own user defined function (get_DistanceViaPosId) which takes two postcodes and returns the distance between them.
B) Code Logic and Flow
=======================
1. driver.py contains the Spark application ("CreditCardFraudDetection") which is the spark streaming application that ingests data, validates rules, and updates repositories.
2. Spark application ("CreditCardFraudDetection") connects to Kafka at the below mentioned co-ordinates
"kafka.bootstrap.servers","18.211.252.152:9092"
Topic : "transactions-topic-verified"
3. Spark application ("CreditCardFraudDetection") uses spark structured stream to read a stream from above mentioned kafka topic using a pre-defined schema ("creditcardSchema") into a DataFrame
4. Spark application ("CreditCardFraudDetection") uses a user defined function (apply_rules) to which it passes each credit card transaction data. The output is the status of the transaction ( GENUINE/FRAUD) which is appended to the streaming dataframe as a column.
The apply_rules user defined function in driver.py invokes the "isTransactionValid" method on class CC_Rules. It passes card transaction data and the method returns status of the transaction after performing the following actions
4.1 Pulls the UCL, score,postcode, transaction_dt from "card_transaction_lookup" hbase table for this transcation's card Id.
4.2 Calculates Time difference between present transaction's date_time and lookup record's date_time
4.3 Calculates the Distance between present transaction & look up transaction by leveraging postcodes and looking up the latitude & longitudes from the Geo_Map class
4.4 Implements "Zip Code Distance" validation using isSpeedValid method. The method takes as inputs output from steps 4.2 & 4.3 and returns a boolean value
4.5 Implements "UCL" validation using isUCLValid method. Method checks that the transactions amount does not exceed the UCL value for the card ID in the lookup table. Method returns a boolean value
4.6 Implements "Score" validation using isScoreValid method. Method checks that the score value for the card in the lookup table is not below the threshhold value of 200. Method returns a boolean value.
4.7 Results of steps 4.4, 4.5 4.6 are aggregated and the transaction status is deemed to be either "GENUINE" OR "FRAUD"
4.8 The transaction data along with calculated status is appended to the "card_transaction_master" hbase table.
4.9 The "card_transaction_lookup" hbase table is updated for postcode and transaction_dt values only if the transaction is deemed "GENUINE"
APPENDIX
=========
COMMON LINUX COMMANDS
cd creditcardproject/python/src/rules/
cd ../../../../..
rm -rf src.zip
zip src.zip __init__.py rules/* db/*
zip *
TEST STANDALONE THE RULES .. RUN THIS FROM SRC FOLDER.
python -m rules.testrules
HBASE COMMANDS
hbase shell
list
count 'card_transactions_master'
get 'card_transactions_master', '340028465709212'
get 'card_transaction_lookup', '340028465709212'