-
Notifications
You must be signed in to change notification settings - Fork 3
/
sshcenter.py
executable file
·297 lines (250 loc) · 9.16 KB
/
sshcenter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/env python3
import json
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from typing import Optional
from typing import Dict
from typing import List
import paramiko
import argparse
import re
from multiprocessing import Pool
from termcolor import colored
@dataclass_json
@dataclass
class Server:
host: Optional[str] = None
user: Optional[str] = None
keyfile: Optional[str] = None
password: Optional[str] = None
@dataclass_json
@dataclass
class Config:
default: Server
servers: Dict[str, Server]
groups: Dict[str, List[str]]
def get(config):
config = Config.from_json(config)
# merge default values
for server in config.servers:
for k,v in config.default.__dict__.items():
if config.servers[server].__dict__[k] is None:
config.servers[server].__dict__[k] = v
return config
# Domain
class SSHUser:
def __init__(self, commented, key_type, key, username):
self.enabled = commented
self.key_type = key_type
self.key = key
self.username = username
def __str__(self):
valid = colored("[+]", "green") if self.enabled else colored("[X]", "red")
short_key = self.key[0:10] + "..." + self.key[-10:]
return "%sUser: %s (%s)" % (valid, self.username, short_key)
class SSHCenter:
def __init__(self, config):
self.config = config
def expand_server_names(self, patterns):
expanded = set()
for name in self.config.servers:
for pattern in patterns:
if re.match(pattern, name):
expanded.add(name)
return list(expanded)
def get_server_names(self, name, group):
if group and name in self.config.groups:
return self.expand_server_names(self.config.groups[name])
if not group and name in self.config.servers:
return [name]
return []
def parse_ssh_users(self, client):
users = []
user_strings = client.exec(" cat ~/.ssh/authorized_keys").split("\n")
user_strings = filter(lambda s: len(s.strip()) > 0, user_strings)
for user in user_strings:
r = re.search(r"(#)?\s*([^\s]+)\s+([^\s]+)\s+(.*)", user)
users.append(SSHUser(r.group(1) is None,r.group(2),r.group(3),r.group(4)))
return users
def store_ssh_users(self, client, users):
authorized_keys_contents = ""
for user in users:
commented = "#" if not user.enabled else ""
authorized_keys_contents += "%s%s %s %s\n" % (commented, user.key_type, user.key, user.username)
original_authorized_keys_contents = client.exec(" cat ~/.ssh/authorized_keys")
client.exec(" echo \"%s\" > ~/.ssh/authorized_keys" % (authorized_keys_contents))
if not client.test():
client.exec(" echo \"%s\" > ~/.ssh/authorized_keys" % (original_authorized_keys_contents))
def store_ssh_users_tuple(self, server_name_users_tuple):
server_name = server_name_users_tuple[0]
users = server_name_users_tuple[1]
server = self.config.servers.get(server_name)
ssh = SSHClient(server, server_name)
self.store_ssh_users(ssh, users)
def get_ssh_users_tuple(self, server_name):
server = self.config.servers.get(server_name)
ssh = SSHClient(server, server_name)
users = self.parse_ssh_users(ssh)
return (server_name, users)
def convert_list_of_tuples_to_dict(self, list_of_tuples):
d = {}
for k,v in list_of_tuples: d[k] = v
return d
def get_users_dict(self, server_names):
pool = Pool(16)
users = pool.map(self.get_ssh_users_tuple, server_names)
pool.close()
pool.join()
return self.convert_list_of_tuples_to_dict(users)
def store_users_dict(self, users):
pool = Pool(16)
users = pool.map(self.store_ssh_users_tuple, users.items())
pool.close()
pool.join()
def list_users(self, server_names, enabled_only):
users = self.get_users_dict(server_names)
for server_name, users in users.items():
print(colored("===== " + server_name + " =====", "yellow"))
if enabled_only:
users = filter(lambda user: user.enabled, users)
for user in users:
print(user)
def search_user(self, server_names, username, publickey, enabled_only):
users = self.get_users_dict(server_names)
for server_name, users in users.items():
if enabled_only:
users = filter(lambda user: user.enabled, users)
if username:
users = filter(lambda user: user.username.find(username) > -1, users)
if publickey:
users = filter(lambda user: user.key.find(publickey) > -1, users)
for user in users:
print(colored(server_name, "yellow") + " | " + str(user))
def add_user(self, server_names, publickey, username, keytype):
users = self.get_users_dict(server_names)
new_user = SSHUser(True, keytype, publickey, username)
for server_name, server_users in users.items():
users[server_name] = server_users + [new_user]
self.store_users_dict(users)
def del_user(self, server_names, username):
users = self.get_users_dict(server_names)
for server_name, server_users in users.items():
for user in server_users:
if user.username == username:
server_users.remove(user)
self.store_users_dict(users)
def set_name_for_user(self, server_names, publickey, username):
users = self.get_users_dict(server_names)
for server_name, server_users in users.items():
for user in server_users:
if user.key.find(publickey) > -1:
user.username = username
self.store_users_dict(users)
class SSHClient:
def __init__(self, server, name):
self.server = server
self.name = name
self.client = None
def __del__(self):
if self.client:
self.client.close()
def ssh_obtain_key(self):
if self.server.keyfile:
if self.server.password:
return paramiko.RSAKey.from_private_key_file(self.server.keyfile, self.server.password)
else:
try:
return paramiko.RSAKey.from_private_key_file(self.server.keyfile)
except:
return None
else:
return None
def ssh_get_client(self):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key = self.ssh_obtain_key()
password = self.server.password
if key:
client.connect(hostname = self.server.host, username = self.server.user, pkey = key)
elif password:
client.connect(hostname = self.server.host, username = self.server.user, password = password)
else:
raise Exception("Authentification failed for server: " + self.name)
return client
def exec(self, cmd):
if not self.client:
self.client = self.ssh_get_client()
stdin , stdout, stderr = self.client.exec_command(cmd)
output = stdout.read().strip()
errors = stderr.read().strip()
if len(errors)>0:
print(errors)
return output.decode("utf-8")
def test(self):
try:
self.ssh_get_client()
return True
except:
return False
class Cli:
def __init__(self):
parser = argparse.ArgumentParser(description='SSH Users Center')
parser.add_argument('--config','-c', action="store", default="config.json", help='Config file (default: config.json)')
parser.add_argument('--group','-g', action="store_true", help='Group name')
parser.add_argument('name', help='Server or group name')
subparsers = parser.add_subparsers(dest="command", help='Commands')
# list
list_parser = subparsers.add_parser('list', help='List users')
list_parser.add_argument('--enabled','-e', action="store_true", help='Enabled only users')
# search
search_parser = subparsers.add_parser('search', help='Search user')
search_parser.add_argument('--user','-u', help='User name (can be partial)')
search_parser.add_argument('--key','-k', help='Public key (can be partial)')
search_parser.add_argument('--enabled','-e', action="store_true", help='Enabled only users')
# add
add_parser = subparsers.add_parser('add', help='Add user')
add_parser.add_argument('key', help='Public key of user')
add_parser.add_argument('username', help='Name of user')
add_parser.add_argument('--keytype','-t', action="store", default="ssh-rsa", help='Type of publickey (default: ssh-rsa)')
# del
del_parser = subparsers.add_parser('del', help='Delete user')
del_parser.add_argument('username', help='Name of user')
# setname
setname_parser = subparsers.add_parser('setname', help='Set username for public key')
setname_parser.add_argument('key', help='Public key of user (can be partial)')
setname_parser.add_argument('username', help='New name for the user')
self.args = parser.parse_args()
self.validate()
def validate(self):
if not self.args.command:
quit("Specify command")
def is_list(self):
return self.args.command == "list"
def is_search(self):
return self.args.command == "search"
def is_add(self):
return self.args.command == "add"
def is_del(self):
return self.args.command == "del"
def is_setname(self):
return self.args.command == "setname"
# EntryPoint
if __name__ == "__main__":
cli = Cli()
# parse config
with open(cli.args.config) as data:
config = Config.get(data.read())
# ssh
ssh_center = SSHCenter(config)
server_names = ssh_center.get_server_names(cli.args.name, cli.args.group)
# command selector
if cli.is_list():
ssh_center.list_users(server_names, cli.args.enabled)
elif cli.is_search():
ssh_center.search_user(server_names, cli.args.user, cli.args.key, cli.args.enabled)
elif cli.is_add():
ssh_center.add_user(server_names, cli.args.key, cli.args.username, cli.args.keytype)
elif cli.is_del():
ssh_center.del_user(server_names, cli.args.username)
elif cli.is_setname():
ssh_center.set_name_for_user(server_names, cli.args.key, cli.args.username)