-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathpong_bot.py
executable file
·452 lines (390 loc) · 22.2 KB
/
pong_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
#!/usr/bin/env python3
# Meshtastic Autoresponder PONG Bot
# K7MHI Kelly Keeton 2024
try:
from pubsub import pub
except ImportError:
print(f"Important dependencies are not met, try install.sh\n\n Did you mean to './launch.sh pong' using a virtual environment.")
exit(1)
import asyncio
import time # for sleep, get some when you can :)
import random
from modules.log import *
from modules.system import *
# Global Variables
DEBUGpacket = False # Debug print the packet rx
def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM):
# Auto response to messages
message_lower = message.lower()
bot_response = "I'm sorry, I'm afraid I can't do that."
command_handler = {
# Command List processes system.trap_list. system.messageTrap() sends any commands to here
"ack": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"cmd": lambda: handle_cmd(message, message_from_id, deviceID),
"cq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"cqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"cqcqcq": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"lheard": lambda: handle_lheard(message, message_from_id, deviceID, isDM),
"motd": lambda: handle_motd(message, MOTD),
"ping": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"pong": lambda: "🏓PING!!🛜",
"sitrep": lambda: lambda: handle_lheard(message, message_from_id, deviceID, isDM),
"sysinfo": lambda: sysinfo(message, message_from_id, deviceID),
"test": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"testing": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
}
cmds = [] # list to hold the commands found in the message
for key in command_handler:
if key in message_lower.split(' '):
cmds.append({'cmd': key, 'index': message_lower.index(key)})
if len(cmds) > 0:
# sort the commands by index value
cmds = sorted(cmds, key=lambda k: k['index'])
logger.debug(f"System: Bot detected Commands:{cmds}")
# run the first command after sorting
bot_response = command_handler[cmds[0]['cmd']]()
# wait a responseDelay to avoid message collision from lora-ack
time.sleep(responseDelay)
return bot_response
def handle_cmd(message, message_from_id, deviceID):
# why CMD? its just a command list. a terminal would normally use "Help"
# I didnt want to invoke the word "help" in Meshtastic due to its possible emergency use
if " " in message and message.split(" ")[1] in trap_list:
return "🤖 just use the commands directly in chat"
return help_message
def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number):
global multiPing
if "?" in message and isDM:
return message.split("?")[0].title() + " command returns SNR and RSSI, or hopcount from your message. Try adding e.g. @place or #tag"
msg = ""
type = ''
if "ping" in message.lower():
msg = "🏓PONG\n"
type = "🏓PING"
elif "test" in message.lower() or "testing" in message.lower():
msg = random.choice(["🎙Testing 1,2,3\n", "🎙Testing\n",\
"🎙Testing, testing\n",\
"🎙Ah-wun, ah-two...\n", "🎙Is this thing on?\n",\
"🎙Roger that!\n",])
type = "🎙TEST"
elif "ack" in message.lower():
msg = random.choice(["✋ACK-ACK!\n", "✋Ack to you!\n"])
type = "✋ACK"
elif "cqcq" in message.lower() or "cq" in message.lower() or "cqcqcq" in message.lower():
if deviceID == 1:
myname = get_name_from_number(myNodeNum1, 'short', 1)
elif deviceID == 2:
myname = get_name_from_number(myNodeNum2, 'short', 2)
msg = f"QSP QSL OM DE {myname} K\n"
else:
msg = "🔊 Can you hear me now?"
if hop == "Direct":
msg = msg + f"SNR:{snr} RSSI:{rssi}"
else:
msg = msg + hop
if "@" in message:
msg = msg + " @" + message.split("@")[1]
type = type + " @" + message.split("@")[1]
elif "#" in message:
msg = msg + " #" + message.split("#")[1]
type = type + " #" + message.split("#")[1]
# check for multi ping request
if " " in message:
# if stop multi ping
if "stop" in message.lower():
for i in range(0, len(multiPingList)):
if multiPingList[i].get('message_from_id') == message_from_id:
multiPingList.pop(i)
msg = "🛑 auto-ping"
# if 3 or more entries (2 or more active), throttle the multi-ping for congestion
if len(multiPingList) > 2:
msg = "🚫⛔️ auto-ping, service busy. ⏳Try again soon."
pingCount = -1
else:
# set inital pingCount
try:
pingCount = int(message.split(" ")[1])
if pingCount == 123 or pingCount == 1234:
pingCount = 1
elif not autoPingInChannel and not isDM:
# no autoping in channels
pingCount = 1
if pingCount > 51:
pingCount = 50
except:
pingCount = -1
if pingCount > 1:
multiPingList.append({'message_from_id': message_from_id, 'count': pingCount + 1, 'type': type, 'deviceID': deviceID, 'channel_number': channel_number, 'startCount': pingCount})
if type == "🎙TEST":
msg = f"🛜Initalizing BufferTest, using chunks of about {int(maxBuffer // pingCount)}, max length {maxBuffer} in {pingCount} messages"
else:
msg = f"🚦Initalizing {pingCount} auto-ping"
# if not a DM add the username to the beginning of msg
if not useDMForResponse and not isDM:
msg = "@" + get_name_from_number(message_from_id, 'short', deviceID) + " " + msg
return msg
def handle_motd(message):
global MOTD
if "$" in message:
motd = message.split("$")[1]
MOTD = motd.rstrip()
return "MOTD Set to: " + MOTD
else:
return MOTD
def sysinfo(message, message_from_id, deviceID):
if "?" in message:
return "sysinfo command returns system information."
else:
return get_sysinfo(message_from_id, deviceID)
def handle_lheard(message, nodeid, deviceID, isDM):
if "?" in message and isDM:
return message.split("?")[0].title() + " command returns a list of the nodes that have been heard recently"
# display last heard nodes add to response
bot_response = "Last Heard\n"
bot_response += str(get_node_list(1))
# show last users of the bot with the cmdHistory list
history = handle_history(message, nodeid, deviceID, isDM, lheard=True)
if history:
bot_response += f'LastSeen\n{history}'
else:
# trim the last \n
bot_response = bot_response[:-1]
# bot_response += getNodeTelemetry(deviceID)
return bot_response
def onReceive(packet, interface):
global seenNodes
# Priocess the incoming packet, handles the responses to the packet with auto_response()
# Sends the packet to the correct handler for processing
# extract interface details from inbound packet
rxType = type(interface).__name__
# Valies assinged to the packet
rxNode, message_from_id, snr, rssi, hop, hop_away, channel_number = 0, 0, 0, 0, 0, 0, 0
pkiStatus = (False, 'ABC')
replyIDset = False
emojiSeen = False
isDM = False
if DEBUGpacket:
# Debug print the interface object
for item in interface.__dict__.items(): intDebug = f"{item}\n"
logger.debug(f"System: Packet Received on {rxType} Interface\n {intDebug} \n END of interface \n")
# Debug print the packet for debugging
logger.debug(f"Packet Received\n {packet} \n END of packet \n")
# set the value for the incomming interface
if rxType == 'SerialInterface':
rxInterface = interface.__dict__.get('devPath', 'unknown')
if port1 in rxInterface: rxNode = 1
elif multiple_interface and port2 in rxInterface: rxNode = 2
elif multiple_interface and port3 in rxInterface: rxNode = 3
elif multiple_interface and port4 in rxInterface: rxNode = 4
elif multiple_interface and port5 in rxInterface: rxNode = 5
elif multiple_interface and port6 in rxInterface: rxNode = 6
elif multiple_interface and port7 in rxInterface: rxNode = 7
elif multiple_interface and port8 in rxInterface: rxNode = 8
elif multiple_interface and port9 in rxInterface: rxNode = 9
if rxType == 'TCPInterface':
rxHost = interface.__dict__.get('hostname', 'unknown')
if hostname1 in rxHost and interface1_type == 'tcp': rxNode = 1
elif multiple_interface and hostname2 in rxHost and interface2_type == 'tcp': rxNode = 2
elif multiple_interface and hostname3 in rxHost and interface3_type == 'tcp': rxNode = 3
elif multiple_interface and hostname4 in rxHost and interface4_type == 'tcp': rxNode = 4
elif multiple_interface and hostname5 in rxHost and interface5_type == 'tcp': rxNode = 5
elif multiple_interface and hostname6 in rxHost and interface6_type == 'tcp': rxNode = 6
elif multiple_interface and hostname7 in rxHost and interface7_type == 'tcp': rxNode = 7
elif multiple_interface and hostname8 in rxHost and interface8_type == 'tcp': rxNode = 8
elif multiple_interface and hostname9 in rxHost and interface9_type == 'tcp': rxNode = 9
if rxType == 'BLEInterface':
if interface1_type == 'ble': rxNode = 1
elif multiple_interface and interface2_type == 'ble': rxNode = 2
elif multiple_interface and interface3_type == 'ble': rxNode = 3
elif multiple_interface and interface4_type == 'ble': rxNode = 4
elif multiple_interface and interface5_type == 'ble': rxNode = 5
elif multiple_interface and interface6_type == 'ble': rxNode = 6
elif multiple_interface and interface7_type == 'ble': rxNode = 7
elif multiple_interface and interface8_type == 'ble': rxNode = 8
elif multiple_interface and interface9_type == 'ble': rxNode = 9
# check if the packet has a channel flag use it
if packet.get('channel'):
channel_number = packet.get('channel', 0)
# set the message_from_id
message_from_id = packet['from']
# check if the packet has a channel flag use it
if packet.get('channel'):
channel_number = packet.get('channel', 0)
# handle TEXT_MESSAGE_APP
try:
if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
message_bytes = packet['decoded']['payload']
message_string = message_bytes.decode('utf-8')
# check if the packet is from us
if message_from_id == myNodeNum1 or message_from_id == myNodeNum2:
logger.warning(f"System: Packet from self {message_from_id} loop or traffic replay deteted")
# get the signal strength and snr if available
if packet.get('rxSnr') or packet.get('rxRssi'):
snr = packet.get('rxSnr', 0)
rssi = packet.get('rxRssi', 0)
# check if the packet has a publicKey flag use it
if packet.get('publicKey'):
pkiStatus = (packet.get('pkiEncrypted', False), packet.get('publicKey', 'ABC'))
# check if the packet has a hop count flag use it
if packet.get('hopsAway'):
hop_away = packet.get('hopsAway', 0)
else:
# if the packet does not have a hop count try other methods
if packet.get('hopLimit'):
hop_limit = packet.get('hopLimit', 0)
else:
hop_limit = 0
if packet.get('hopStart'):
hop_start = packet.get('hopStart', 0)
else:
hop_start = 0
if hop_start == hop_limit:
hop = "Direct"
hop_count = 0
elif hop_start == 0 and hop_limit > 0:
hop = "MQTT"
hop_count = 0
else:
# set hop to Direct if the message was sent directly otherwise set the hop count
if hop_away > 0:
hop_count = hop_away
else:
hop_count = hop_start - hop_limit
#print (f"calculated hop count: {hop_start} - {hop_limit} = {hop_count}")
hop = f"{hop_count} hops"
if help_message in message_string or welcome_message in message_string or "CMD?:" in message_string:
# ignore help and welcome messages
logger.warning(f"Got Own Welcome/Help header. From: {get_name_from_number(message_from_id, 'long', rxNode)}")
return
# If the packet is a DM (Direct Message) respond to it, otherwise validate its a message for us on the channel
if packet['to'] == myNodeNum1 or packet['to'] == myNodeNum2:
# message is DM to us
isDM = True
# check if the message contains a trap word, DMs are always responded to
if (messageTrap(message_string) and not llm_enabled) or messageTrap(message_string.split()[0]):
# log the message to the message log
logger.info(f"Device:{rxNode} Channel: {channel_number} " + CustomFormatter.green + f"Received DM: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\
"From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}")
# respond with DM
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode)
else:
logger.warning(f"Device:{rxNode} Ignoring DM: {message_string} From: {get_name_from_number(message_from_id, 'long', rxNode)}")
send_message(welcome_message, channel_number, message_from_id, rxNode)
time.sleep(responseDelay)
# log the message to the message log
msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-'))
else:
# message is on a channel
if messageTrap(message_string):
if ignoreDefaultChannel and channel_number == publicChannel:
logger.debug(f"System: ignoreDefaultChannel CMD:{message_string} From: {get_name_from_number(message_from_id, 'short', rxNode)}")
else:
# message is for bot to respond to
logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "ReceivedChannel: " + CustomFormatter.white + f"{message_string} " + CustomFormatter.purple +\
"From: " + CustomFormatter.white + f"{get_name_from_number(message_from_id, 'long', rxNode)}")
if useDMForResponse:
# respond to channel message via direct message
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode)
else:
# or respond to channel message on the channel itself
if channel_number == publicChannel and antiSpam:
# warning user spamming default channel
logger.warning(f"System: AntiSpam protection, sending DM to: {get_name_from_number(message_from_id, 'long', rxNode)}")
# respond to channel message via direct message
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, message_from_id, rxNode)
else:
# respond to channel message on the channel itself
send_message(auto_response(message_string, snr, rssi, hop, pkiStatus, message_from_id, channel_number, rxNode, isDM), channel_number, 0, rxNode)
else:
# message is not for bot to respond to
# ignore the message but add it to the message history list
if zuluTime:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p")
if len(msg_history) < storeFlimit:
msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode))
else:
msg_history.pop(0)
msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode))
# print the message to the log and sdout
logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\
f" {message_string} " + CustomFormatter.purple + "From:" + CustomFormatter.white + f" {get_name_from_number(message_from_id)}")
if log_messages_to_file:
msgLogger.info(f"Device:{rxNode} Channel:{channel_number} | {get_name_from_number(message_from_id, 'long', rxNode)} | " + message_string.replace('\n', '-nl-'))
# repeat the message on the other device
if repeater_enabled and multiple_interface:
# wait a responseDelay to avoid message collision from lora-ack.
time.sleep(responseDelay)
rMsg = (f"{message_string} From:{get_name_from_number(message_from_id, 'short', rxNode)}")
# if channel found in the repeater list repeat the message
if str(channel_number) in repeater_channels:
for i in range(1, 10):
if globals().get(f'interface{i}_enabled', False) and i != rxNode:
logger.debug(f"Repeating message on Device{i} Channel:{channel_number}")
send_message(rMsg, channel_number, 0, i)
time.sleep(responseDelay)
else:
# Evaluate non TEXT_MESSAGE_APP packets
consumeMetadata(packet, rxNode)
except KeyError as e:
logger.critical(f"System: Error processing packet: {e} Device:{rxNode}")
logger.debug(f"System: Error Packet = {packet}")
async def start_rx():
print (CustomFormatter.bold_white + f"\nMeshtastic Autoresponder Bot CTL+C to exit\n" + CustomFormatter.reset)
# Start the receive subscriber using pubsub via meshtastic library
pub.subscribe(onReceive, 'meshtastic.receive')
pub.subscribe(onDisconnect, 'meshtastic.connection.lost')
for i in range(1, 10):
if globals().get(f'interface{i}_enabled', False):
myNodeNum = globals().get(f'myNodeNum{i}', 0)
logger.info(f"System: Autoresponder Started for Device{i} {get_name_from_number(myNodeNum, 'long', i)},"
f"{get_name_from_number(myNodeNum, 'short', i)}. NodeID: {myNodeNum}, {decimal_to_hex(myNodeNum)}")
if log_messages_to_file:
logger.debug("System: Logging Messages to disk")
if syslog_to_file:
logger.debug("System: Logging System Logs to disk")
if solar_conditions_enabled:
logger.debug("System: Celestial Telemetry Enabled")
if motd_enabled:
logger.debug(f"System: MOTD Enabled using {MOTD}")
if sentry_enabled:
logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel}")
if store_forward_enabled:
logger.debug(f"System: Store and Forward Enabled using limit: {storeFlimit}")
if useDMForResponse:
logger.debug(f"System: Respond by DM only")
if repeater_enabled and multiple_interface:
logger.debug(f"System: Repeater Enabled for Channels: {repeater_channels}")
if file_monitor_enabled:
logger.debug(f"System: File Monitor Enabled for {file_monitor_file_path}, broadcasting to channels: {file_monitor_broadcastCh}")
if read_news_enabled:
logger.debug(f"System: File Monitor News Reader Enabled for {news_file_path}")
if scheduler_enabled:
# Examples of using the scheduler, Times here are in 24hr format
# https://schedule.readthedocs.io/en/stable/
# Reminder Scheduler is enabled every Monday at noon send a log message
schedule.every().monday.at("12:00").do(lambda: logger.info("System: Scheduled Broadcast Reminder"))
logger.debug("System: Starting the broadcast scheduler")
await BroadcastScheduler()
# here we go loopty loo
while True:
await asyncio.sleep(0.5)
pass
# Hello World
async def main():
meshRxTask = asyncio.create_task(start_rx())
watchdogTask = asyncio.create_task(watchdog())
if file_monitor_enabled:
fileMonTask: asyncio.Task = asyncio.create_task(handleFileWatcher())
await asyncio.gather(meshRxTask, watchdogTask)
if file_monitor_enabled:
await asyncio.gather(fileMonTask)
await asyncio.sleep(0.01)
try:
if __name__ == "__main__":
asyncio.run(main())
except KeyboardInterrupt:
exit_handler()
pass
# EOF