Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Broadlink A2 #791

Merged
merged 4 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .hub import s3
from .light import lb1, lb2
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1
from .sensor import a1, a2
from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b

SUPPORTED_TYPES = {
Expand Down Expand Up @@ -142,7 +142,10 @@
0x653C: ("RM4 pro", "Broadlink"),
},
a1: {
0x2714: ("e-Sensor", "Broadlink"),
0x2714: ("A1", "Broadlink"),
},
a2: {
0x4F60: ("A2", "Broadlink"),
},
mp1: {
0x4EB5: ("MP1-1K4S", "Broadlink"),
Expand Down
58 changes: 30 additions & 28 deletions broadlink/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,32 @@ class dooya2(Device):

TYPE = "DT360E-2"

def _send(self, operation: int, data: bytes):
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8

packet += bytes(data)
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8

packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8

resp = self.send_packet(0x6a, packet)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
Expand Down Expand Up @@ -119,31 +120,32 @@ class wser(Device):

TYPE = "WSER"

def _send(self, operation: int, data: bytes):
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8

packet += bytes(data)
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8

packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8

resp = self.send_packet(0x6a, packet)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
Expand All @@ -156,24 +158,24 @@ def get_position(self) -> int:

def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4a, 0x31, 0xa0])
resp = self._send(2, [0x4A, 0x31, 0xA0])
position = resp[0x0E]
return position

def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xa0])
resp = self._send(2, [0x61, 0x32, 0xA0])
position = resp[0x0E]
return position

def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4c, 0x73, 0xa0])
resp = self._send(2, [0x4C, 0x73, 0xA0])
position = resp[0x0E]
return position

def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xa0])
resp = self._send(2, [position, 0x70, 0xA0])
position = resp[0x0E]
return position
69 changes: 55 additions & 14 deletions broadlink/sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""Support for sensors."""
import struct

from . import exceptions as e
from .device import Device

Expand Down Expand Up @@ -29,19 +27,62 @@ def check_sensors(self) -> dict:
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
packet = bytearray([0x1])
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
data = payload[0x4:]
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
data = self.decrypt(resp[0x38:])

return {
"temperature": data[0x04] + data[0x05] / 10.0,
"humidity": data[0x06] + data[0x07] / 10.0,
"light": data[0x08],
"air_quality": data[0x0A],
"noise": data[0x0C],
}


class a2(Device):
"""Controls a Broadlink A2."""

TYPE = "A2"

temperature = struct.unpack("<bb", data[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 10.0
humidity = data[0x2] + data[0x3] / 10.0
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8

packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8

resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload

def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
data = self._send(1)

return {
"temperature": temperature,
"humidity": humidity,
"light": data[0x4],
"air_quality": data[0x6],
"noise": data[0x8],
"temperature": data[0x13] * 256 + data[0x14],
"humidity": data[0x15] * 256 + data[0x16],
"pm10": data[0x0D] * 256 + data[0x0E],
"pm2_5": data[0x0F] * 256 + data[0x10],
"pm1": data[0x11] * 256 + data[0x12],
}
Loading