This repository has been archived by the owner on Feb 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
virtualdeviceserver.py
97 lines (81 loc) · 4.53 KB
/
virtualdeviceserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import threading, json, program
from vexbrain import Brain
from httpServer import myhttpserver
class VirtualInterface(object):
def __init__(self, addr: tuple, brain: Brain):
self.server = myhttpserver.Server(name=__name__,
host=addr[0], port=addr[1])
self.brain = brain
self.server.routes.append(('/', self.index))
self.server.routes.append(('/index.js', self.indexJs))
self.server.routes.append(('/api/devices', self.listDevices))
self.server.routes.append(('/api/device/attributes/<deviceId>', self.listDeviceAttributes))
self.server.routes.append(('/api/device/setattribute/<deviceId>/<attribute>/<value>', self.setDeviceAttribute))
self.server.routes.append(('/api/device/callattribute/<deviceId>/<attribute>', self.callAttribute))
self.server.routes.append(('/api/interact/brain/<functionString>', self.interactBrain))
self.server.routes.append(('/api/program/loadandrun/<programFile>', self.loadAndRun))
threading.Thread(target=self.server.run).start()
def stopServer(self): self.server.stop()
def loadAndRun(self, programFile: str):
#Stop any currently running program
if self.brain.CodeEnviorment != None: self.brain.teardownProgram()
prgm = program.ProgramFile(f'data/emulatedstorage/Internal/programs/{programFile}')
#Get the brain ready to run the program
self.brain.onProgramFolderScreen = False; self.brain.BrainScreen._drawProgramBar = True
self.brain.onProgramScreen = True; self.brain.onHomeScreen = False
self.brain.onDeviceScreen = False; self.brain.BrainScreen.clear_screen()
#Run the program
self.brain.CodeEnviorment = prgm.loadContainer(self.brain)
self.brain.CodeEnviorment.threadedExecute()
return json.dumps({'success': True})
def index(self) -> str:
with open('data/static/index.html', 'r') as f: return f.read()
def indexJs(self) -> str:
with open('data/static/index.js', 'r') as f: return f.read()
def interactBrain(self, functionString):
try:
threading.Thread(self.brain.__getattribute__(functionString)).start()
except AttributeError as e:
return json.dumps({'success': False, 'error': str(e)})
return json.dumps({'success': True})
def listDevices(self) -> dict:
devices = {'devices': []}
for device in self.brain.virtualDevices:
devices['devices'].append({
'name': device._name,
'type': device._type,
'id': device._id,
})
return json.dumps(devices)
def callAttribute(self, deviceId, attribute):
for device in self.brain.virtualDevices:
if device._id == deviceId:
threading.Thread(target=device._attributes[attribute]['value']).start()
return json.dumps({'success': True})
def listDeviceAttributes(self, deviceId: str) -> dict:
attributes = {'attributes': {}}
for device in self.brain.virtualDevices:
if device._id == deviceId:
for attribute in device._attributes:
attributeName = attribute
attribute = device._attributes[attribute]
if attribute['type'] != 'callback':
attributes['attributes'][attributeName] = attribute
else:
attributes['attributes'][attributeName] = {'type': attribute['type'], 'value': '(callback)'}
return json.dumps(attributes)
def setDeviceAttribute(self, deviceId, attribute, value):
for device in self.brain.virtualDevices:
if device._id == deviceId:
if device._attributes[attribute]['type'] == 'bool':
device._attributes[attribute]['value'] = value.lower().strip() == 'true'
elif device._attributes[attribute]['type'] == 'int':
device._attributes[attribute]['value'] = int(value)
elif device._attributes[attribute]['type'] == 'float':
device._attributes[attribute]['value'] = float(value)
elif device._attributes[attribute]['type'] == 'tuple':
device._attributes[attribute]['value'] = tuple(value)
elif device._attributes[attribute]['type'] == 'str':
device._attributes[attribute]['value'] = str(value)
return json.dumps({'success': True})
return json.dumps({'success': False})