-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrest-light.py
executable file
·217 lines (194 loc) · 7.92 KB
/
rest-light.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python3
##################################################
# Imports & Global variables
##################################################
import random
import sys
import subprocess
import os
import logging
import re
import string
from flask import Flask, request
app = Flask(__name__)
DEBUG_MODE = False
# Stores the currently valid api-key
LOADED_API_KEY = None
# important paths for the application
paths = {
"base": "/etc/rest-light",
"433utils": "/opt/433Utils/RPi_utils",
"api_key": "/etc/rest-light/api-key.txt"
}
##################################################
# utility functions
##################################################
# setup logging on startup
def setup_logging():
log_level = logging.INFO
if DEBUG_MODE:
log_level = logging.DEBUG
root = logging.getLogger()
root.setLevel(log_level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(log_level)
formatter = logging.Formatter('[%(asctime)s ] - %(levelname)s: %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
# loads key on app startup
def load_key():
# exit if key already defined
if LOADED_API_KEY is not None:
return
# try to open persistence file
key_tmp = None
try:
with open(paths['api_key'], 'r') as f:
lines = f.readlines()
key_tmp = re.findall("\w+", lines[0])[0]
except FileNotFoundError as e:
logging.info('No API-Key found, generating new one')
except BaseException as e:
logging.fatal('Unkown exception when trying to load api-key! ' + str(e))
# return key
if key_tmp is not None:
return key_tmp
# generate new key
else:
new_key = ''.join(random.choices(
string.ascii_lowercase + string.ascii_uppercase + string.digits, k=42))
# persists key
try:
with open(paths['api_key'], 'w') as f:
f.write(new_key)
except BaseException as e:
logging.fatal(
'Could not save API-Key in the following folder. Ensure permissions are correct! ' + paths['base'])
logging.fatal(str(e))
sys.exit()
# return key and log
logging.warning('#'*72)
logging.warning('# Generated API-Key: ' + new_key)
logging.warning('#'*72)
return new_key
##################################################
# Functions to handle requests
##################################################
# function that cleans input from possible injections
def sanitize_input(input):
output = None
try:
output = re.findall("\w+", str(input))[0]
except BaseException as e:
logging.error('Received unparsable web-request')
logging.error(str(e))
return output
# function to check, if a provided api-key is valid
def check_access(input_args):
if 'api_key' in input_args:
provided_key = sanitize_input(input_args['api_key'])
if provided_key == LOADED_API_KEY:
return (True, None)
else:
logging.warning('Request with wrong API-Key received')
return (False, {'error': 'Wrong API-Key provided'})
else:
logging.warning('Request without API-Key received')
return (False, {'error': 'No API-Key provided'})
# function to reveive arguments from request
def parse_request(request, required_arguments, optional_arguments = []):
input_args = None
if request.is_json:
input_args = request.get_json()
logging.debug(input_args)
else:
input_args = request.form
logging.debug(input_args.to_dict(flat=False))
valid, error = check_access(input_args)
if not valid:
return (valid, error)
arguments = {}
for argument in required_arguments + optional_arguments:
if argument in input_args:
arguments[argument] = sanitize_input(input_args[argument])
elif argument not in input_args and argument in required_arguments:
logging.info('API-Request without mandory field ' + argument)
return (False, {'error': 'Mandatory field ' + argument + ' not provided'})
return (True, arguments)
# function that runs a OS-Subprocess and generates a return-dict
def run_command(arguments):
# Run Command and capture output
run_result = None
try:
run_result = subprocess.run(arguments, capture_output=True)
except subprocess.SubprocessError as e:
logging.fatal(
"Running of subprocess resulted in SubprocessError: " + str(e.output))
return {'status': 'Error',
'stdout': "Running of subprocess resulted in SubprocessError: " + str(e.output)}
except FileNotFoundError as e:
logging.fatal(
"Running of subprocess resulted in FileNotFoundError: " + str(e.strerror))
return {'status': 'Error',
'stdout': "Running of subprocess resulted in FileNotFoundError: " + str(e.strerror)}
except BaseException as e:
logging.fatal('Unkown exception when trying to run subprocess! ' + str(e))
return {'status': 'Error',
'stdout': 'Unkown exception when trying to run subprocess! ' + str(e)}
# treat output
try:
if run_result is not None and run_result.returncode == 0:
logging.info("Successfully ran command: " + " ".join(arguments))
return {'status': 'Success', 'stdout': str(run_result.stdout) }
elif run_result is not None and hasattr(run_result, 'stderr'):
logging.error(
"Running of command " + " ".join(arguments) + " failed with output: " + str(run_result.stderr))
return {'status': 'Error',
'stdout': str(run_result.stdout), 'stderr': str(run_result.stderr) }
else:
logging.error("Running of command " + " ".join(arguments) + " failed without output")
return {'status': 'Error', 'stdout': 'Could not run command!'}
except BaseException as e:
logging.fatal('Unkown exception when trying to parse command " + " ".join(arguments) + " output! ' + str(e))
return {'status': 'Error',
'stdout': 'Unkown exception when trying to parse subprocess output! ' + str(e)}
##################################################
# Flask routes
##################################################
# status page
@app.route('/', methods=['GET'])
def hello():
return 'Running <a href="https://github.com/' + str(os.getenv('GITHUB_REPOSITORY')) + '">REST-Light</a> Version ' + str(os.getenv('APP_VERSION'))
# api to switch sockets
@app.route('/send', methods=['POST'])
def send():
request_valid, parsed_request = parse_request(
request, ['system_code', 'unit_code', 'state'])
if not request_valid:
return parsed_request
return run_command([paths['433utils'] + "/send",
parsed_request['system_code'],
parsed_request['unit_code'],
parsed_request['state']])
# api to send raw codes
@app.route('/codesend', methods=['POST'])
def codesend():
request_valid, parsed_request = parse_request(
request, ['decimalcode'], ['protocol', 'pulselength', 'bitlength'])
if not request_valid:
return parsed_request
return run_command([paths['433utils'] + "/codesend",
parsed_request['decimalcode'],
parsed_request['protocol'] if 'protocol' in parsed_request else "",
parsed_request['pulselength'] if 'pulselength' in parsed_request else "",
parsed_request['bitlength'] if 'bitlength' in parsed_request else "" ])
##################################################
# Main call
##################################################
# init
setup_logging()
LOADED_API_KEY = load_key()
# serve
logging.info("Starting " + str(os.getenv('GITHUB_REPOSITORY')) + " " + str(os.getenv('APP_VERSION')))
if __name__ == "__main__":
app.run(debug=DEBUG_MODE, host='0.0.0.0', port=4242)