diff --git a/addons/mqtt/mqtt.gd b/addons/mqtt/mqtt.gd index 1c01bb4..3b9a2fe 100644 --- a/addons/mqtt/mqtt.gd +++ b/addons/mqtt/mqtt.gd @@ -80,22 +80,35 @@ func receiveintobuffer(): if sslsocket != null: var sslsocketstatus = sslsocket.get_status() if sslsocketstatus == StreamPeerTLS.STATUS_CONNECTED or sslsocketstatus == StreamPeerTLS.STATUS_HANDSHAKING: - sslsocket.poll() + var E = sslsocket.poll() + if E != 0: + printerr("Socket poll error: ", E) + return E var n = sslsocket.get_available_bytes() + if n == -1: + printerr("get_available_bytes returned -1") + return FAILED if n != 0: + assert (n > 0) var sv = sslsocket.get_data(n) assert (sv[0] == 0) # error code receivedbuffer.append_array(sv[1]) elif socket != null and socket.get_status() == StreamPeerTCP.STATUS_CONNECTED: - socket.poll() + var E = socket.poll() + if E != 0: + printerr("Socket poll error: ", E) + return E var n = socket.get_available_bytes() + if n == -1: + printerr("get_available_bytes returned -1") + return FAILED if n != 0: + assert (n > 0) var sv = socket.get_data(n) assert (sv[0] == 0) # error code receivedbuffer.append_array(sv[1]) - elif websocket != null: websocket.poll() while websocket.get_available_packet_count() != 0: @@ -197,51 +210,57 @@ func set_user_pass(suser, spswd): self.user = null self.pswd = null + +static func encoderemaininglength(pkt, sz): + assert(sz < 2097152) + var i = 1 + while sz > 0x7f: + pkt[i] = (sz & 0x7f) | 0x80 + sz >>= 7 + i += 1 + if i + 1 > len(pkt): + pkt.append(0x00); + pkt[i] = sz + +static func encodeshortint(pkt, n): + assert (n >= 0 and n < 65536) + pkt.append((n >> 8) & 0xFF) + pkt.append(n & 0xFF) + +static func encodevarstr(pkt, bs): + encodeshortint(pkt, len(bs)) + pkt.append_array(bs) + func firstmessagetoserver(): var clean_session = true - var msg = PackedByteArray() - msg.append(CP_CONNECT); - msg.append(0x00); - msg.append(0x00); - msg.append(0x04); - msg.append_array("MQTT".to_ascii_buffer()); - msg.append(0x04); - msg.append(0x02); - msg.append(0x00); - msg.append(0x3C); - - msg[1] = 10 + 2 + len(self.client_id) - msg[9] = (1<<1) if clean_session else 0 - if self.user != null: - msg[1] += 2 + len(self.user) + 2 + len(self.pswd) - msg[9] |= 0xC0 - if self.keepalive: - assert(self.keepalive < 65536) - msg[10] |= self.keepalive >> 8 - msg[11] |= self.keepalive & 0x00FF - if self.lw_topic: - msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) - msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 - msg[9] |= 1<<5 if self.lw_retain else 0 - - msg.append(len(self.client_id) >> 8) - msg.append(self.client_id.length() & 0xFF) - msg.append_array(self.client_id.to_ascii_buffer()) + var pkt = PackedByteArray() + pkt.append(CP_CONNECT); + pkt.append(0x00); + var sz = 10 + (2+len(self.client_id)) + \ + (2+len(self.user)+2+len(self.pswd) if self.user != null else 0) + \ + (2+len(self.lw_topic)+2+len(self.lw_msg) if self.lw_topic else 0) + encoderemaininglength(pkt, sz) + var remstartpos = len(pkt) + print("MQTT".to_ascii_buffer()) + encodevarstr(pkt, [0x4D, 0x51, 0x54, 0x54]); # "MQTT".to_ascii_buffer() + var protocollevel = 0x04 # MQTT v3.1.1 + var connectflags = (0xC0 if self.user != null else 0) | \ + (0x20 if self.lw_retain else 0) | \ + (self.lw_qos << 3) | \ + (0x04 if self.lw_topic else 0) | \ + (0x02 if clean_session else 0) + pkt.append(protocollevel); + pkt.append(connectflags); + encodeshortint(pkt, self.keepalive) + encodevarstr(pkt, self.client_id.to_ascii_buffer()) if self.lw_topic: - msg.append(len(self.lw_topic) >> 8) - msg.append(len(self.lw_topic) & 0xFF) - msg.append_array(self.lw_topic) - msg.append(len(self.lw_msg) >> 8) - msg.append(len(self.lw_msg) & 0xFF) - msg.append_array(self.lw_msg) + encodevarstr(pkt, self.lw_topic) + encodevarstr(pkt, self.lw_msg) if self.user != null: - msg.append(len(self.user) >> 8) - msg.append(len(self.user) & 0xFF) - msg.append_array(self.user) - msg.append(len(self.pswd) >> 8) - msg.append(len(self.pswd) & 0xFF) - msg.append_array(self.pswd) - return msg + encodevarstr(pkt, self.user) + encodevarstr(pkt, self.pswd) + assert (len(pkt) - remstartpos == sz) + return pkt func cleanupsockets(retval=false): if verbose_level: @@ -264,7 +283,7 @@ func connect_to_broker(brokerurl): assert (brokerconnectmode == BCM_NOCONNECTION) var brokermatch = regexbrokerurl.search(brokerurl) if brokermatch == null: - printerr("ERROR: unrecognized brokerurl pattern: ", brokerurl, " must be tcp,ssh,ws, or wss://") + print("ERROR: unrecognized brokerurl pattern:", brokerurl) return cleanupsockets(false) var brokercomponents = brokermatch.strings var brokerprotocol = brokercomponents[1] @@ -285,7 +304,7 @@ func connect_to_broker(brokerurl): print("Connecting to websocketurl: ", websocketurl) var E = websocket.connect_to_url(websocketurl) if E != 0: - printerr("ERROR: websocketclient.connect_to_url Err: ", E) + print("ERROR: websocketclient.connect_to_url Err: ", E) return cleanupsockets(false) print("Websocket get_requested_url ", websocket.get_requested_url()) brokerconnectmode = BCM_WAITING_WEBSOCKET_CONNECTION @@ -296,7 +315,7 @@ func connect_to_broker(brokerurl): print("Connecting to %s:%s" % [brokerserver, brokerport]) var E = socket.connect_to_host(brokerserver, brokerport) if E != 0: - printerr("ERROR: socketclient.connect_to_url Err: ", E) + print("ERROR: socketclient.connect_to_url Err: ", E) return cleanupsockets(false) if isssl: brokerconnectmode = BCM_WAITING_SSL_SOCKET_CONNECTION @@ -313,6 +332,7 @@ func disconnect_from_server(): emit_signal("broker_disconnected") cleanupsockets() + func publish(stopic, smsg, retain=false, qos=0): var msg = smsg.to_ascii_buffer() if not binarymessages else smsg var topic = stopic.to_ascii_buffer() @@ -320,29 +340,15 @@ func publish(stopic, smsg, retain=false, qos=0): var pkt = PackedByteArray() pkt.append(CP_PUBLISH | (2 if qos else 0) | (1 if retain else 0)); pkt.append(0x00); - - var sz = 2 + len(topic) + len(msg) - if qos > 0: - sz += 2 - assert(sz < 2097152) - var i = 1 - while sz > 0x7f: - pkt[i] = (sz & 0x7f) | 0x80 - sz >>= 7 - i += 1 - if i + 1 > len(pkt): - pkt.append(0x00); - pkt[i] = sz - - pkt.append(len(topic) >> 8) - pkt.append(len(topic) & 0xFF) - pkt.append_array(topic) - + var sz = 2 + len(topic) + len(msg) + (2 if qos > 0 else 0) + encoderemaininglength(pkt, sz) + var remstartpos = len(pkt) + encodevarstr(pkt, topic) if qos > 0: pid += 1 - pkt.append(pid >> 8) - pkt.append(pid & 0xFF) + encodeshortint(pkt, pid) pkt.append_array(msg) + assert (len(pkt) - remstartpos == sz) senddata(pkt) if verbose_level >= 2: print("CP_PUBLISH%s%s topic=%s msg=%s" % [ "[%d]"%pid if qos else "", " " if retain else "", stopic, smsg]) @@ -351,19 +357,19 @@ func publish(stopic, smsg, retain=false, qos=0): func subscribe(stopic, qos=0): pid += 1 var topic = stopic.to_ascii_buffer() - var length = 2 + 2 + len(topic) + 1 - var msg = PackedByteArray() - msg.append(CP_SUBSCRIBE); - msg.append(length) - msg.append(pid >> 8) - msg.append(pid & 0xFF) - msg.append(len(topic) >> 8) - msg.append(len(topic) & 0xFF) - msg.append_array(topic) - msg.append(qos); + var sz = 2 + 2 + len(topic) + 1 + var pkt = PackedByteArray() + pkt.append(CP_SUBSCRIBE); + pkt.append(0x00); + encoderemaininglength(pkt, sz) + var remstartpos = len(pkt) + encodeshortint(pkt, pid) + encodevarstr(pkt, topic) + pkt.append(qos); + assert (len(pkt) - remstartpos == sz) if verbose_level: print("SUBSCRIBE[%d] topic=%s" % [pid, stopic]) - senddata(msg) + senddata(pkt) func pingreq(): if verbose_level >= 2: @@ -373,19 +379,18 @@ func pingreq(): func unsubscribe(stopic): pid += 1 var topic = stopic.to_ascii_buffer() - var length = 2 + 2 + len(topic) - var msg = PackedByteArray() - msg.append(CP_UNSUBSCRIBE); - msg.append(length) - msg.append(pid >> 8) - msg.append(pid & 0xFF) - msg.append(len(topic) >> 8) - msg.append(len(topic) & 0xFF) - msg.append_array(topic) + var sz = 2 + 2 + len(topic) + var pkt = PackedByteArray() + pkt.append(CP_UNSUBSCRIBE); + pkt.append(0x00) + encoderemaininglength(pkt, sz) + var remstartpos = len(pkt) + encodeshortint(pkt, pid) + encodevarstr(pkt, topic) if verbose_level: print("UNSUBSCRIBE[%d] topic=%s" % [pid, stopic]) - senddata(msg) - + assert (len(pkt) - remstartpos == sz) + senddata(pkt) func wait_msg(): var n = receivedbuffer.size()