-
Notifications
You must be signed in to change notification settings - Fork 0
/
subshell.py
82 lines (69 loc) · 2.06 KB
/
subshell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import subprocess
import threading
from queue import Empty, Queue
import colorama
colorama.init()
class Shell:
def __init__(self):
self.proc = subprocess.Popen(["/bin/sh"], stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,bufsize=1,encoding='utf-8')
self.proc_msg_queue = Queue()
self.output_conn = None
self.log = False
threading.Thread(target=self.read_stdout, daemon=True).start()
threading.Thread(target=self.read_stderr, daemon=True).start()
def relay(self, msg):
try:
if self.output_conn is not None:
self.output_conn.send((msg + "\n").encode())
except Exception:
pass
def read_stdout(self):
while True:
line = self.proc.stdout.readline()
if not line:
break
self.proc_msg_queue.put(("stdout", line))
if "%*&" not in line:
if self.log:
print(line.strip())
self.relay(line.strip())
def read_stderr(self):
while True:
line = self.proc.stderr.readline()
if not line:
break
self.proc_msg_queue.put(("stderr", line))
if self.log:
print(line.strip())
self.relay(line.strip())
def run_cmd(self, cmd):
if self.log:
print(colorama.Fore.GREEN + f"> {cmd}" + colorama.Fore.RESET)
self.relay(colorama.Fore.GREEN + f"> {cmd}" + colorama.Fore.RESET)
self.proc.stdin.write(f'{cmd} ; echo "%*&$?"\n')
self.proc.stdin.flush()
output = ""
error = ""
code = 0
while True:
try:
type, line = self.proc_msg_queue.get_nowait()
if type == "stdout" and "%*&" in line:
code = int(line[(line.find("%*&") + 3):])
break
if type == "stdout":
output += line + "\n"
if type == "stderr":
error += line + "\n"
except Empty:
pass
if (code != 0):
raise Exception(code, error)
return (output, error, code)
def close(self):
self.output_conn = False
self.log = False
self.proc.stderr.close()
self.proc.stdin.close()
self.proc.stdout.close()
self.proc.terminate()