-
Notifications
You must be signed in to change notification settings - Fork 0
/
http-hotspot-user-access.conf
119 lines (102 loc) · 3.06 KB
/
http-hotspot-user-access.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# ----------------------------------------------------------------------------
# enrich-http-logstash-ELK
# Jesus Mañogil - jmanogil-coding
input {
http {
# identify this input, useful in logstash logs
id => "hotspot_user_access_http_plugin"
# port open in network sections VirtualBox
port => "3333"
# security
user => "http"
password => "http"
}
}
filter {
# parse the received data
grok {
match => {
"[headers][request_path]" => "%{WORD:api_name}/%{NUMBER:hotspot_id}\?user=%{NUMBER:user_id}&mac=%{MAC:user_mac}&brand=%{WORD:user_mobile_manufacturer}"
}
}
# check parse errors and used api
if "_grokparsefailure" in [tags] or [api_name] != "hotspot" {
drop { }
}
# both id's come as strings, you should converted
mutate {
convert => {
"hotspot_id" => "integer"
"user_id" => "integer"
}
}
# connection to MySQL Database
jdbc_streaming {
jdbc_driver_library => "/etc/logstash/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mybusiness"
jdbc_user => "logstash"
jdbc_password => "logstash"
statement => "(select
email as user_email,
date_format(birthday, '%Y-%m-%d') as user_birthday,
name as user_name,
city as user_city,
country as user_country,
null as hotspot_desc,
null as hotspot_latitude,
null as hotspot_longitude,
null as hotspot_ip
from user
where id = :user)
union
(select
null as user_email,
null as user_birthday,
null as user_name,
null as user_city,
null as user_country,
description as hotspot_desc,
st_y(location) as hotspot_latitude,
st_x(location) as hotspot_longitude,
ip as hotspot_ip
from hotspot
where id = :hotspot)"
parameters => {
"hotspot" => "hotspot_id"
"user" => "user_id"
}
target => "myData"
}
# user and hotspot have to exist
if ![myData][1][hotspot_ip] or ![myData][0][user_email] or "_jdbcstreamingfailure" in [tags]{
drop { }
}
# preparing fields to be indexed
mutate {
rename => {"[myData][0][user_email]" => "user_email"}
rename => {"[myData][0][user_birthday]" => "user_birthday"}
rename => {"[myData][0][user_name]" => "user_name"}
rename => {"[myData][0][user_city]" => "user_city"}
rename => {"[myData][0][user_country]" => "user_country"}
rename => {"[myData][1][hotspot_desc]" => "hotspot_desc"}
rename => {"[myData][1][hotspot_ip]" => "hotspot_ip"}
# elasticsearch waits for a geo_point type
add_field => {
"hotspot_location" => "%{[myData][1][hotspot_latitude]}, %{[myData][1][hotspot_longitude]}"
}
# no longer needed
remove_field => ["host", "headers", "message", "@version", "api_name", "myData"]
}
}
output {
# elasticsearch cluster
elasticsearch {
hosts => "http://localhost:9200"
index => "hotspot-user-access-%{+YYYY.MM.dd}"
manage_template => false
}
# just to test everything works
# stdout {}
}
# ----------------------------------------------------------------------------