-
Notifications
You must be signed in to change notification settings - Fork 0
/
brew.py
152 lines (125 loc) · 5.2 KB
/
brew.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# coding=utf-8
import dotbot
import os
from pyDes import triple_des as des, ECB, PAD_PKCS5
import base64
import getpass
class Brew(dotbot.Plugin):
_BASE_ENCRYPTED = './encrypt'
_BASE_DECRYPTED = './decrypt'
_key = 'default_key'
_crypto = None
_action = None
ACTION_DECRYPT = 'decrypt'
ACTION_ENCRYPT = 'encrypt'
def can_handle(self, directive):
return directive in [self.ACTION_ENCRYPT, self.ACTION_DECRYPT]
def handle(self, directive, data):
pwd = self._context.base_directory()
log = self._log
log.debug(data)
self._action = directive
# 获取加解密的key
try:
key = getpass.getpass('[dotbot-secret]Please input key:')
if self._action == self.ACTION_ENCRYPT:
repeat_key = getpass.getpass('[dotbot-secret]Please repeat key:')
if not key == repeat_key:
log.error('Two keys are not same, exit.')
return False
except BaseException as e:
log.error(e)
return True
self._key = key
log.debug('Use key ' + self._key + '.')
try:
self.gen_crypto()
except BaseException as e:
log.error(e)
return True
defaults = self._context.defaults().get(self._action, {})
src_path = defaults.get('src_path',
self._BASE_ENCRYPTED if self._action == self.ACTION_DECRYPT else self._BASE_DECRYPTED)
dst_path = defaults.get('dst_path',
self._BASE_DECRYPTED if self._action == self.ACTION_DECRYPT else self._BASE_ENCRYPTED)
for item in data:
_src_path = src_path
_dst_path = dst_path
if isinstance(item, dict):
_src_path = item.get('src_path', src_path)
_dst_path = item.get('dst_path', dst_path)
elif isinstance(item, list):
_src_path = item[0] if len(item) > 0 else src_path
_dst_path = item[1] if len(item) > 1 else dst_path
src = os.path.join(pwd, _src_path)
dst = os.path.join(pwd, _dst_path)
log.info('Load from ' + src + ', write to ' + dst + '.')
try:
err = self.work(src, dst)
if not (err is None):
log.error(err)
return True
except BaseException as e:
log.error(e)
return True
return True
KEY_LEN = 24
def gen_crypto(self):
self._log.debug('input key is ' + self._key)
key = self._key
if len(key) > self.KEY_LEN:
key = key[0:self.KEY_LEN]
pad_len = self.KEY_LEN - len(key) % self.KEY_LEN
for _ in range(pad_len):
key += ' '
self._log.debug('real key is ==' + key + '==')
self._crypto = des(key, ECB)
def work(self, src, dest):
log = self._log
if not os.access(src, os.F_OK | os.R_OK):
return ValueError('Access ' + src + ' failed.')
if os.path.isfile(src):
if os.access(dest, os.F_OK):
log.warning('Skip ' + src + ' because ' + dest + ' already existed.')
return None
src_file = open(src, mode='r')
dest_file = open(dest, mode='w')
origin = ''.join(src_file.readlines())
if self._action == self.ACTION_DECRYPT:
dest_file.writelines(self.decrypt(origin).decode("utf-8"))
else:
dest_file.writelines(self.encrypt(origin).decode("utf-8"))
dest_file.flush()
src_file.close()
dest_file.close()
if self._action == self.ACTION_DECRYPT:
log.info('Decrypt ' + src + ' to ' + dest + ' successfully.')
else:
log.info('Encrypt' + src + ' to ' + dest + ' successfully.')
return None
if not os.path.isdir(src):
return ValueError('Unsupported path type: ' + src + '.')
# recursive check folder
file_list = os.listdir(src)
for f in file_list:
next_src_path = os.path.join(src, f)
next_dest_path = os.path.join(dest, f)
if os.path.isdir(next_src_path):
# check dest folder
if not os.access(next_dest_path, os.F_OK):
# create folder if not exist
os.mkdir(next_dest_path)
if not os.access(next_dest_path, os.R_OK | os.W_OK):
# check access
log.warning('Skip ' + next_src_path + ' because access ' + next_dest_path + ' failed.')
continue
if not os.path.isdir(next_dest_path):
# check isdir
log.warning('Skip ' + next_src_path + ' because of existed file ' + next_dest_path + '.')
continue
self.work(next_src_path, next_dest_path)
return None
def encrypt(self, src):
return base64.b64encode(self._crypto.encrypt(src.encode('utf-8'), padmode=PAD_PKCS5))
def decrypt(self, src):
return self._crypto.decrypt(base64.b64decode(src), padmode=PAD_PKCS5)