-
Notifications
You must be signed in to change notification settings - Fork 0
/
uwss.py
64 lines (59 loc) · 2.09 KB
/
uwss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import network, socket, ussl, time, wss_helper, sys, os, struct
# import sys
# import os
# import struct
# this project contains code modified from https://github.com/micropython/webrepl
class wss:
# you need to pass this a newly established socket, before sendings any data to it
# you can then deal with the object returned by this class
def __init__(self, s):
self._s = s
self.s = ussl.wrap_socket(s) # , server_side=False, keyfile=None, certfile=None, cert_reqs=ussl.CERT_NONE, ca_certs=None)
wss_helper.client_handshake(self.s)
self.buf = b""
def write(self, data):
l = len(data)
if l < 126:
# TODO: hardcoded "binary" type
hdr = struct.pack(">BB", 0x82, l)
else:
hdr = struct.pack(">BBH", 0x82, 126, l)
self.s.write(hdr)
self.s.write(data)
def recvexactly(self, sz):
res = b""
while sz:
data = self.s.read(sz)
if not data:
break
res += data
sz -= len(data)
return res
def read(self, size, text_ok=False):
if not self.buf:
while True:
hdr = self.recvexactly(2)
assert len(hdr) == 2
fl, sz = struct.unpack(">BB", hdr)
if sz == 126:
hdr = self.recvexactly(2)
assert len(hdr) == 2
(sz,) = struct.unpack(">H", hdr)
if fl == 0x82:
break
if text_ok and fl == 0x81:
break
print("Got unexpected websocket record of type %x, skipping it" % fl)
while sz:
skip = self.s.read(sz)
print("Skip data: {}" % skip)
sz -= len(skip)
data = self.recvexactly(sz)
assert len(data) == sz
self.buf = data
d = self.buf[:size]
self.buf = self.buf[size:]
assert len(d) == size, len(d)
return d
def ioctl(self, req, val):
assert req == 9 and val == 2