-
Notifications
You must be signed in to change notification settings - Fork 0
/
sample-rx.py
65 lines (51 loc) · 1.89 KB
/
sample-rx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import sys
sys.path.append(".")
sys.path.append("build")
import socket
from nfiniity_cube_radio_pb2 import *
class CubeEvk:
def __init__(self, cube_ip):
self.cube_port_tx = 33210
self.cube_port_rx = 33211
self.cube_ip = cube_ip
self.sock_tx = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_rx = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_rx.bind(("127.0.0.1", self.cube_port_rx))
def send_data(self, data):
msg = LinkLayerTransmission()
msg.source = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
msg.destination = bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
msg.priority = BEST_EFFORT
msg.payload = data
cmd = CommandRequest()
cmd.linklayer_tx.CopyFrom(msg)
self.sock_tx.sendto(cmd.SerializeToString(), (self.cube_ip, self.cube_port_tx))
def run_reception(self):
while True:
data, address = self.sock_rx.recvfrom(1024)
gossip = GossipMessage()
gossip.ParseFromString(data)
match gossip.WhichOneof("kind"):
case "cbr":
# got CBR; use this for your DCC
# cbr.busy and cbr.total
cbr = gossip.cbr
case "linklayer_rx":
# got a packet
print(f"Payload: {gossip.linklayer_rx.payload.decode()}")
print(f"Power: {gossip.linklayer_rx.power_cbm / 10} dbm")
def close(self):
self.sock_tx.close()
self.sock_rx.close()
def main():
cube_ip = "127.0.0.1"
cube = CubeEvk(cube_ip)
try:
# the evk needs one inital packet from its host
# in order to know where to forward received packages
cube.send_data(b"init")
cube.run_reception()
finally:
cube.close()
if __name__ == "__main__":
main()