-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
141 lines (95 loc) · 3.72 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from fastapi import FastAPI, HTTPException
import pandas as pd
from warnings import filterwarnings
filterwarnings("ignore")
from typing import List
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient
# from . import models, schemas, crud
import models
import schemas
import crud
import database
import tags
import rules
import mongo
from bson import ObjectId
import pymongo
description = "Fraud Analyzer and report generation."
##### Initializing the app #
analyzer = FastAPI(
title="Fraud Analyzer",
description=description,
summary="Fraud Analyzer",
version="0.1.1",
openapi_tags=tags.tags_metadata
)
# Database Dependency pymongo
conn_url = 'mongodb://localhost:27017/'
def get_mongo_client() -> pymongo.MongoClient:
client = MongoClient(conn_url)
try:
client.admin.command("ping")
return client
except Exception:
client.close()
##### defining auth checks #
database = mongo.mongo_db
##### Getting from real database
@analyzer.post("/companies/create-company", response_model=schemas.Company)
def create_company(company: schemas.Company):
company = crud.create_company_mongo(company)
return company
@analyzer.get("/companies", response_model=List[schemas.Company])
async def get_all_companies(limit: int):
companies = list(database["company"].find().limit(limit))
return companies
@analyzer.get("/companies/{id}", response_model=schemas.Company)
def get_company(id: str):
company = crud.find_company_by_id(id)
return company
@analyzer.put("/companies/{id}", response_model=schemas.Company)
def update_company_route(id: str, updated_company: schemas.Company):
updated_company = crud.update_company(id, updated_company)
return updated_company
@analyzer.post("/analyze/{authorization}", response_description="Add new transaction")
async def analyze_transaction(
payload: schemas.CustomerTransaction,
authorization: str,
):
##### Read the company data from the mongo DB
company = crud.find_company_by_id(authorization)
# print(company) ############
if not company:
raise create_http_exception("invalid company ID")
customer_tier = company.configuration.get_tier(payload.customer_tier)
print(f'Customer tier: {customer_tier}')
if not customer_tier:
raise create_http_exception("Invalid customer tier")
trans_type_config = customer_tier.get_trans_type_config(payload.type.upper())
if not(trans_type_config):
raise create_http_exception("Invalid transaction type")
input_df = pd.DataFrame({
"hash": [payload.hash],
"type": [payload.type.upper()],
"amount": [payload.amount],
"customer_tier": [payload.customer_tier],
"customer_unique_id": [payload.customer_unique_id],
"transaction_time": [payload.transaction_time],
'company_id': [payload.company_id]
})
transactions = mongo.mongo_db.transactions
new_df = rules.attach_day_and_hour(input_df, "transaction_time")
new_df = new_df.to_dict(orient="records")
created_company = transactions.insert_many(new_df)
getMappedResults = lambda x: {"rule": x, "score": rules.apply_rule(x, payload, input_df, company)}
results = list( map(getMappedResults, trans_type_config.rules))
print(results)
getScores = lambda x : x["score"]
return {"risk_score": sum([getScores(x) for x in results])/len(trans_type_config.rules), "results": results}
def create_http_exception(msg: str, status_code: int = 400):
return HTTPException(
status_code=status_code,
detail= { "message": msg},
headers={"content-type": "application/json"}
)