-
Notifications
You must be signed in to change notification settings - Fork 4
/
database.py
118 lines (98 loc) · 4.02 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import sessionmaker
from settings import DB_PATH
engine = create_engine('sqlite:///' + DB_PATH)
db = declarative_base()
class Domain(db):
__tablename__ = "domain"
id = Column(Integer, primary_key=True)
domain = Column(String(255))
md5 = Column(String(32))
cert_path = Column(String(255))
private_key_path = Column(String(255))
user = Column(Integer)
class User(db):
__tablename__ = "user"
id = Column(Integer, primary_key=True)
name = Column(String(255))
access_key_id = Column(String(16))
access_key_secret = Column(String(32))
class Database(object):
def __init__(self, verbosity=False):
engine.echo = verbosity
db.metadata.create_all(engine)
self.sessionmaker = sessionmaker(bind=engine)
self.session = self.sessionmaker()
def add_user(self, name, access_key_id, access_key_secret):
try:
self.session.add(User(name=name, access_key_id=access_key_id, access_key_secret=access_key_secret))
self.session.commit()
print("\033[1;32mUser %s added successfully\033[0m" % name)
except Exception as e:
print(e)
def add_domain(self, domain, user, cert_path=None, private_key_path=None):
try:
self.session.add(Domain(domain=domain, user=user,
cert_path=cert_path, private_key_path=private_key_path))
self.session.commit()
print("\033[1;32mDomain %s added successfully\033[0m" % domain)
except Exception as e:
print(e)
def get_domain(self, domain):
return self.session.query(Domain).filter(Domain.domain == domain).first()
def get_all_domain(self):
return self.session.query(Domain).all()
def has_domain(self, domain):
domain = self.get_domain(domain)
if domain is not None:
return True
return False
def has_user(self, user):
user = self.get_user(user)
if user is not None:
return True
return False
def get_user(self, name):
return self.session.query(User).filter(User.name == name).first()
def get_all_user(self):
return self.session.query(User).all()
def update_user(self, name, access_key_id=None, access_key_secret=None):
try:
user = self.session.query(User).filter(User.name == name).first()
if access_key_id:
user.access_key_id = access_key_id
if access_key_secret:
user.access_key_secret = access_key_secret
self.session.commit()
print("\033[1;32mUser %s updated successfully\033[0m" % name)
except Exception as e:
print(e)
def update_domain(self, domain, md5=None, user=None, cert_path=None, private_key_path=None):
try:
domain = self.session.query(Domain).filter(Domain.domain == domain).first()
if md5 is not None:
domain.md5 = md5
if user is not None:
domain.user = user
if cert_path is not None:
domain.cert_path = cert_path
if private_key_path is not None:
domain.private_key_path = private_key_path
self.session.commit()
print("\033[1;32mDomain %s updated successfully\033[0m" % domain.domain)
except Exception as e:
print(e)
def delete_domain(self, domain):
self.session.query(Domain).filter(Domain.domain == domain).delete()
self.session.commit()
print("\033[1;32mDomain %s deleted successfully\033[0m" % domain)
def delete_user(self, name):
try:
self.session.query(User).filter(User.name == name).delete()
self.session.commit()
print("\033[1;32mUser %s deleted successfully\033[0m" % name)
except Exception as e:
print(e)