This repository has been archived by the owner on Mar 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathdb.py
105 lines (88 loc) · 3.27 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Functions governing the application's interactions with the databas.
There is essentially no business logic in here; it establishes a connection
to the application's database and that's all.
"""
import time
import psycopg2
import config
class Connection(object):
"""Data type holding the data required to maintain a database
connection and perform queries.
"""
def __init__(self, host, dbname, user, password):
"""Stores db connection info in memory and initiates a
connection to the specified db."""
self.db = None
self.host = host
self.dbname = dbname
self.user = user
self.password = password
try:
self._attempt_connect()
except RuntimeError as e:
print(f"FATAL: {e}")
exit(1)
print("Connected!")
self.cursor = self.db.cursor()
def _attempt_connect(self, attempts=0):
"""Initiates a connection to the database and tracks retry attempts.
Arguments:
- attempts: How many failed attempts have already happened.
Side effects:
- self.db: Set on a successful connection attempt.
"""
attempts += 1
print(f'Connecting. Attempt {attempts} of {config.db["connection"]["max_attempts"]}.')
try:
self.db = psycopg2.connect(
host=self.host,
dbname=self.dbname,
user=self.user,
password=self.password,
connect_timeout=config.db["connection"]["timeout"],
options=f'-c search_path={config.db["schema"]}'
)
self.db.set_session(autocommit=True)
except:
if attempts >= config.db["connection"]["max_attempts"]:
print("Giving up.")
raise RuntimeError("Failed to connect to database.")
print(f'Connection to DB failed. Retrying in {config.db["connection"]["attempt_pause"] * attempts} seconds.')
time.sleep(config.db["connection"]["attempt_pause"] * attempts)
self._attempt_connect(attempts)
def read(self, query, params=None):
"""Helper function that converts results returned stored in a
Psycopg cursor into a less temperamental list format. Note that
there IS recursive retry logic here; when the connection to the
database is dropped, the query will fail, prompting this method
to re-connect and try the query again. This will continue trying
to reconnect indefinitely. This is probably not ideal.
Arguments:
- query: The SQL query to be executed.
- params: Any parameters to be substituted into the query. It's
important to let Psycopg handle this rather than using Python
string interpolation because it helps mitigate SQL injection.
Returns:
- A list of tuples, one for each row of results.
"""
results = []
try:
with self.db.cursor() as cursor:
if params is not None:
cursor.execute(query, params)
else:
cursor.execute(query)
for result in cursor:
results.append(result)
return results
except psycopg2.OperationalError as e:
print(f"ERROR with db query execution: {e}")
print("Reconnecting.")
self._attempt_connect()
print("Sending query again.")
return self.read(query, params)
def __del__(self):
"""Closes the database connection when the Connection object
is destroyed."""
if self.db is not None:
self.db.close()