Skip to content

Commit

Permalink
First functional
Browse files Browse the repository at this point in the history
  • Loading branch information
felipengeletrica committed Apr 13, 2024
1 parent eda33e8 commit 60edf8b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 30 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ If you are not running "pybluez", install the following dependencies:
```bash
sudo apt-get install bluetooth libbluetooth-dev
```
For scan devices:

bluetoothctl

10 changes: 7 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
pyserial==3.4
pynmeagps==1.0.20
pynmea2==1.19.0
setuptools==57.5.0
PyBluez==0.22
setuptools==58
git+https://github.com/pybluez/pybluez.git#egg=pybluez
RPi.GPIO==0.7.1
keyboard==0.13.5
bluepy
pyserial
bluepy
pygobject
dbus-python
bleak

65 changes: 40 additions & 25 deletions src/BLEConnector.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,53 @@
import threading
from bluepy.btle import Scanner, DefaultDelegate
from bluepy.btle import UUID, Peripheral, DefaultDelegate


# UUIDs
SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b"
CHARACTERISTIC_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"


class DelegateBT(DefaultDelegate):
def __init__(self, params):
DefaultDelegate.__init__(self)

def handleNotification(self, cHandle, data):
print("Received data: {}".format(data.decode()))


class BLEConnector(threading.Thread):
def __init__(self, device_name):
self.device_name = device_name
self.service_uuid = None
self.characteristic_uuid = None
def run(self, service_uuid, characteristic_uuid):
self.service_uuid = service_uuid
self.characteristic_uuid = characteristic_uuid
self.service_uuid = SERVICE_UUID
self.characteristic_uuid = CHARACTERISTIC_UUID
self.mac_address = None

def run(self, mac_address):
self.mac_address = mac_address
self._process = p = threading.Thread(target=self._process)
p.daemon = True
p.start()

def _process(self):

while True:
# Connect to the BLE device
print("MAC: " + self.mac_address)
peripheral = Peripheral(self.mac_address)
peripheral.setDelegate(DelegateBT(None))

# Find the service and characteristic
service = peripheral.getServiceByUUID(UUID(self.service_uuid))
characteristic = service.getCharacteristics(UUID(self.characteristic_uuid))[0]

# Enable notifications
peripheral.writeCharacteristic(characteristic.valHandle + 1, b"\x01\x00", withResponse=True)

try:
device = self.scan_devices()
if not device:
continue
try:
device.connect()
services = device.getServices()
for service in services:
if service.uuid == self.service_uuid:
characteristics = service.getCharacteristics()
for char in characteristics:
if char.uuid == self.characteristic_uuid:
data = char.read()
print(data)
continue
except Exception as e:
return None, str(e)

except Exception as e:
return None, str(e)
while True:
if peripheral.waitForNotifications(1.0):
continue
finally:
peripheral.disconnect()


3 changes: 1 addition & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def init_data_instances(datajson):
)
)
devs[index].run(
service_uuid = devices[index]['service_uuid'],
characteristic_uuid = devices[index]['characteristic_uuid'],
mac_address=devices[index]['mac-address']
)
else:
Exception("Invalid device")
Expand Down

0 comments on commit 60edf8b

Please sign in to comment.