forked from GreatLakesEnergy/Mysql-to-influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysql2influx.py
175 lines (136 loc) · 6.13 KB
/
mysql2influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/python
import logging
import os
import argparse
import MySQLdb
import MySQLdb.cursors
import time
from ConfigParser import RawConfigParser
from influxdb import InfluxDBClient
from time_utils import get_epoch_from_datetime
from datetime import datetime
logger = logging.getLogger(__name__)
class Mysql2Influx:
def __init__(self,config):
#TODO put site info into settings file
self._site_name = config.get('site_info','site_name')
self._table = config.get('mysql','table')
self._siteid_field = config.get('mysql','siteid_field')
if config.has_option('mysql','time_field'):
self._time_field = config.get('mysql','time_field')
else:
self._time_field = 'timestamp'
#intitialise client for mysql database
self._mysql_host = config.get('mysql','host')
self._mysql_username = config.get('mysql','username')
self._mysql_password = config.get('mysql','password')
self._mysql_db = config.get('mysql','db')
self._influx_db_host = config.get('influx','host')
self._influx_db_port = config.get('influx','port')
self._influx_db_username = config.get('influx','username')
self._influx_db_password = config.get('influx','password')
self._influx_db = config.get('influx','db')
self._complete = False
self._check_field = config.get('mysql','check_field')
self.initialise_database()
def initialise_database(self):
self._db_client = MySQLdb.connect ( self._mysql_host,
self._mysql_username,
self._mysql_password,
self._mysql_db,
cursorclass = MySQLdb.cursors.DictCursor
)
self._influx_client = InfluxDBClient(
self._influx_db_host,
self._influx_db_port,
self._influx_db_username,
self._influx_db_password,
self._influx_db
)
def transfer_data(self):
self._get_data_from_mysql()
self._update_rows()
logger.debug('All data transfer completed : %s '% self._complete)
def _purge_data_in_db(self):
"""
Once the data is configured and within influx we can pruge our database
"""
if self._complete:
query = "SELECT * FROM TABLE %s WHERE %s = 0 ORDER BY %s DESC"%(self._table, self._check_fields,self._time_field)
def _get_data_from_mysql(self):
"""
get the cursor to dump all the data from mysql
"""
query = "SELECT * FROM `%s` WHERE `%s`=0 ORDER BY %s DESC"%(self._table,self._check_field,self._time_field)
logger.debug('executing query %s '% query)
cursor = self._db_client.cursor()
cursor.execute(query)
# pull data from mysql in X increments
rows = cursor.fetchall()
logger.info('querying MYSQL got %s rows'%len(rows))
self._format_data(rows)
def _send_data_to_influx(self,data_point):
"""
Break up data to make sure in the format the inflxu like
"""
logger.debug('Sending data to influx %s ...'%data_point[0])
self._influx_client.write_points(data_point)
def _format_data(self,data):
self._complete = False
#turn time into epoch timesa
if data:
logger.debug('Got data from mysql')
for row in data:
data_list =[]
for key in row.keys():
#format date to epoch
epoch_time = row[self._time_field].isoformat()
if not isinstance(row[key],datetime):
data_point = {"measurement":key,
"tags":{"site_name":row[self._siteid_field],
"source": "wago"},
"time" : "%sZ"%epoch_time,
"fields" : {"value":row[key]}
}
data_list.append(data_point)
logger.debug("data_point = %s"%data_point)
self._send_data_to_influx(data_list)
self._complete = True
def _update_rows(self):
query = 'UPDATE %s SET %s=1 WHERE %s=0;'%(self._table,self._check_field,self._check_field)
if self._complete:
logger.debug('Updating rows : executing query %s '% query)
c = self._db_client.cursor()
c.execute(query)
self._db_client.commit()
def main():
#Argument parsing
parser = argparse.ArgumentParser(description = 'Get Time series data from MYSQL and push it to influxdb' )
parser.add_argument( '-d', '--debug', help = 'set logging level to debug', action = 'store_true')
parser.add_argument( '-c', '--config', help = 'config file location', nargs = 1, default = 'settings.ini' )
parser.add_argument( '-s', '--server', help = 'run as server with interval ',action = 'store_true' )
args = parser.parse_args()
# Init logging
logging.basicConfig(level=(logging.DEBUG if True or args.debug else logging.INFO))
logger.debug('Starting up with config file %s' % (args.config))
#get config file
config = RawConfigParser()
config.read(args.config)
_sleep_time = float(config.get('server','interval'))
logger.debug('configs %s' % (config.sections()))
#start
mclient = Mysql2Influx(config)
if not args.server:
mclient.transfer_data()
else:
logger.info('Starting up server mode interval: %s' % _sleep_time)
while True:
try:
mclient.transfer_data()
except Exception,e:
logger.exception("Error occured will try again")
time.sleep(_sleep_time)
mclient.initialise_database()
if __name__ == '__main__':
#Check our config file
main()