Skip to content

Commit

Permalink
debug the first message variable length code
Browse files Browse the repository at this point in the history
  • Loading branch information
goatchurchprime committed Nov 13, 2024
1 parent b827142 commit 8d23de4
Showing 1 changed file with 94 additions and 89 deletions.
183 changes: 94 additions & 89 deletions addons/mqtt/mqtt.gd
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,35 @@ func receiveintobuffer():
if sslsocket != null:
var sslsocketstatus = sslsocket.get_status()
if sslsocketstatus == StreamPeerTLS.STATUS_CONNECTED or sslsocketstatus == StreamPeerTLS.STATUS_HANDSHAKING:
sslsocket.poll()
var E = sslsocket.poll()
if E != 0:
printerr("Socket poll error: ", E)
return E
var n = sslsocket.get_available_bytes()
if n == -1:
printerr("get_available_bytes returned -1")
return FAILED
if n != 0:
assert (n > 0)
var sv = sslsocket.get_data(n)
assert (sv[0] == 0) # error code
receivedbuffer.append_array(sv[1])

elif socket != null and socket.get_status() == StreamPeerTCP.STATUS_CONNECTED:
socket.poll()
var E = socket.poll()
if E != 0:
printerr("Socket poll error: ", E)
return E
var n = socket.get_available_bytes()
if n == -1:
printerr("get_available_bytes returned -1")
return FAILED
if n != 0:
assert (n > 0)
var sv = socket.get_data(n)
assert (sv[0] == 0) # error code
receivedbuffer.append_array(sv[1])


elif websocket != null:
websocket.poll()
while websocket.get_available_packet_count() != 0:
Expand Down Expand Up @@ -197,51 +210,57 @@ func set_user_pass(suser, spswd):
self.user = null
self.pswd = null


static func encoderemaininglength(pkt, sz):
assert(sz < 2097152)
var i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
if i + 1 > len(pkt):
pkt.append(0x00);
pkt[i] = sz

static func encodeshortint(pkt, n):
assert (n >= 0 and n < 65536)
pkt.append((n >> 8) & 0xFF)
pkt.append(n & 0xFF)

static func encodevarstr(pkt, bs):
encodeshortint(pkt, len(bs))
pkt.append_array(bs)

func firstmessagetoserver():
var clean_session = true
var msg = PackedByteArray()
msg.append(CP_CONNECT);
msg.append(0x00);
msg.append(0x00);
msg.append(0x04);
msg.append_array("MQTT".to_ascii_buffer());
msg.append(0x04);
msg.append(0x02);
msg.append(0x00);
msg.append(0x3C);

msg[1] = 10 + 2 + len(self.client_id)
msg[9] = (1<<1) if clean_session else 0
if self.user != null:
msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
msg[9] |= 0xC0
if self.keepalive:
assert(self.keepalive < 65536)
msg[10] |= self.keepalive >> 8
msg[11] |= self.keepalive & 0x00FF
if self.lw_topic:
msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[9] |= 1<<5 if self.lw_retain else 0

msg.append(len(self.client_id) >> 8)
msg.append(self.client_id.length() & 0xFF)
msg.append_array(self.client_id.to_ascii_buffer())
var pkt = PackedByteArray()
pkt.append(CP_CONNECT);
pkt.append(0x00);
var sz = 10 + (2+len(self.client_id)) + \
(2+len(self.user)+2+len(self.pswd) if self.user != null else 0) + \
(2+len(self.lw_topic)+2+len(self.lw_msg) if self.lw_topic else 0)
encoderemaininglength(pkt, sz)
var remstartpos = len(pkt)
print("MQTT".to_ascii_buffer())
encodevarstr(pkt, [0x4D, 0x51, 0x54, 0x54]); # "MQTT".to_ascii_buffer()
var protocollevel = 0x04 # MQTT v3.1.1
var connectflags = (0xC0 if self.user != null else 0) | \
(0x20 if self.lw_retain else 0) | \
(self.lw_qos << 3) | \
(0x04 if self.lw_topic else 0) | \
(0x02 if clean_session else 0)
pkt.append(protocollevel);
pkt.append(connectflags);
encodeshortint(pkt, self.keepalive)
encodevarstr(pkt, self.client_id.to_ascii_buffer())
if self.lw_topic:
msg.append(len(self.lw_topic) >> 8)
msg.append(len(self.lw_topic) & 0xFF)
msg.append_array(self.lw_topic)
msg.append(len(self.lw_msg) >> 8)
msg.append(len(self.lw_msg) & 0xFF)
msg.append_array(self.lw_msg)
encodevarstr(pkt, self.lw_topic)
encodevarstr(pkt, self.lw_msg)
if self.user != null:
msg.append(len(self.user) >> 8)
msg.append(len(self.user) & 0xFF)
msg.append_array(self.user)
msg.append(len(self.pswd) >> 8)
msg.append(len(self.pswd) & 0xFF)
msg.append_array(self.pswd)
return msg
encodevarstr(pkt, self.user)
encodevarstr(pkt, self.pswd)
assert (len(pkt) - remstartpos == sz)
return pkt

func cleanupsockets(retval=false):
if verbose_level:
Expand All @@ -264,7 +283,7 @@ func connect_to_broker(brokerurl):
assert (brokerconnectmode == BCM_NOCONNECTION)
var brokermatch = regexbrokerurl.search(brokerurl)
if brokermatch == null:
printerr("ERROR: unrecognized brokerurl pattern: ", brokerurl, " must be tcp,ssh,ws, or wss://")
print("ERROR: unrecognized brokerurl pattern:", brokerurl)
return cleanupsockets(false)
var brokercomponents = brokermatch.strings
var brokerprotocol = brokercomponents[1]
Expand All @@ -285,7 +304,7 @@ func connect_to_broker(brokerurl):
print("Connecting to websocketurl: ", websocketurl)
var E = websocket.connect_to_url(websocketurl)
if E != 0:
printerr("ERROR: websocketclient.connect_to_url Err: ", E)
print("ERROR: websocketclient.connect_to_url Err: ", E)
return cleanupsockets(false)
print("Websocket get_requested_url ", websocket.get_requested_url())
brokerconnectmode = BCM_WAITING_WEBSOCKET_CONNECTION
Expand All @@ -296,7 +315,7 @@ func connect_to_broker(brokerurl):
print("Connecting to %s:%s" % [brokerserver, brokerport])
var E = socket.connect_to_host(brokerserver, brokerport)
if E != 0:
printerr("ERROR: socketclient.connect_to_url Err: ", E)
print("ERROR: socketclient.connect_to_url Err: ", E)
return cleanupsockets(false)
if isssl:
brokerconnectmode = BCM_WAITING_SSL_SOCKET_CONNECTION
Expand All @@ -313,36 +332,23 @@ func disconnect_from_server():
emit_signal("broker_disconnected")
cleanupsockets()


func publish(stopic, smsg, retain=false, qos=0):
var msg = smsg.to_ascii_buffer() if not binarymessages else smsg
var topic = stopic.to_ascii_buffer()

var pkt = PackedByteArray()
pkt.append(CP_PUBLISH | (2 if qos else 0) | (1 if retain else 0));
pkt.append(0x00);

var sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert(sz < 2097152)
var i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
if i + 1 > len(pkt):
pkt.append(0x00);
pkt[i] = sz

pkt.append(len(topic) >> 8)
pkt.append(len(topic) & 0xFF)
pkt.append_array(topic)

var sz = 2 + len(topic) + len(msg) + (2 if qos > 0 else 0)
encoderemaininglength(pkt, sz)
var remstartpos = len(pkt)
encodevarstr(pkt, topic)
if qos > 0:
pid += 1
pkt.append(pid >> 8)
pkt.append(pid & 0xFF)
encodeshortint(pkt, pid)
pkt.append_array(msg)
assert (len(pkt) - remstartpos == sz)
senddata(pkt)
if verbose_level >= 2:
print("CP_PUBLISH%s%s topic=%s msg=%s" % [ "[%d]"%pid if qos else "", " <retain>" if retain else "", stopic, smsg])
Expand All @@ -351,19 +357,19 @@ func publish(stopic, smsg, retain=false, qos=0):
func subscribe(stopic, qos=0):
pid += 1
var topic = stopic.to_ascii_buffer()
var length = 2 + 2 + len(topic) + 1
var msg = PackedByteArray()
msg.append(CP_SUBSCRIBE);
msg.append(length)
msg.append(pid >> 8)
msg.append(pid & 0xFF)
msg.append(len(topic) >> 8)
msg.append(len(topic) & 0xFF)
msg.append_array(topic)
msg.append(qos);
var sz = 2 + 2 + len(topic) + 1
var pkt = PackedByteArray()
pkt.append(CP_SUBSCRIBE);
pkt.append(0x00);
encoderemaininglength(pkt, sz)
var remstartpos = len(pkt)
encodeshortint(pkt, pid)
encodevarstr(pkt, topic)
pkt.append(qos);
assert (len(pkt) - remstartpos == sz)
if verbose_level:
print("SUBSCRIBE[%d] topic=%s" % [pid, stopic])
senddata(msg)
senddata(pkt)

func pingreq():
if verbose_level >= 2:
Expand All @@ -373,19 +379,18 @@ func pingreq():
func unsubscribe(stopic):
pid += 1
var topic = stopic.to_ascii_buffer()
var length = 2 + 2 + len(topic)
var msg = PackedByteArray()
msg.append(CP_UNSUBSCRIBE);
msg.append(length)
msg.append(pid >> 8)
msg.append(pid & 0xFF)
msg.append(len(topic) >> 8)
msg.append(len(topic) & 0xFF)
msg.append_array(topic)
var sz = 2 + 2 + len(topic)
var pkt = PackedByteArray()
pkt.append(CP_UNSUBSCRIBE);
pkt.append(0x00)
encoderemaininglength(pkt, sz)
var remstartpos = len(pkt)
encodeshortint(pkt, pid)
encodevarstr(pkt, topic)
if verbose_level:
print("UNSUBSCRIBE[%d] topic=%s" % [pid, stopic])
senddata(msg)

assert (len(pkt) - remstartpos == sz)
senddata(pkt)

func wait_msg():
var n = receivedbuffer.size()
Expand Down

0 comments on commit 8d23de4

Please sign in to comment.