Skip to content

Commit

Permalink
Zigpy serial protocol (#177)
Browse files Browse the repository at this point in the history
* Migrate `Gateway`

* Migrate API and Application

* Bump minimum zigpy version

* Get rid of unnecessary probe tests

* Fix unit tests

* Increase test coverage

* Use correct data type when clearing buffer
  • Loading branch information
puddly authored Oct 28, 2024
1 parent 1b5da1d commit c2d2f5b
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 246 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"zigpy>=0.60.0",
"zigpy>=0.70.0",
]

[tool.setuptools.packages.find]
Expand Down Expand Up @@ -43,6 +43,7 @@ ignore_errors = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.flake8]
exclude = [".venv", ".git", ".tox", "docs", "venv", "bin", "lib", "deps", "build"]
Expand Down
9 changes: 0 additions & 9 deletions tests/async_mock.py

This file was deleted.

138 changes: 37 additions & 101 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
"""Tests for API."""

import asyncio
from unittest import mock

import pytest
import serial
import zigpy.config
import zigpy.exceptions
import zigpy.types as t

from zigpy_xbee import api as xbee_api, types as xbee_t, uart
from zigpy_xbee import api as xbee_api, types as xbee_t
from zigpy_xbee.exceptions import ATCommandError, ATCommandException, InvalidCommand
from zigpy_xbee.zigbee.application import ControllerApplication

import tests.async_mock as mock

DEVICE_CONFIG = zigpy.config.SCHEMA_DEVICE(
{
zigpy.config.CONF_DEVICE_PATH: "/dev/null",
Expand All @@ -26,24 +24,49 @@
def api():
"""Sample XBee API fixture."""
api = xbee_api.XBee(DEVICE_CONFIG)
api._uart = mock.MagicMock()
api._uart = mock.AsyncMock()
return api


async def test_connect(monkeypatch):
async def test_connect():
"""Test connect."""
api = xbee_api.XBee(DEVICE_CONFIG)
monkeypatch.setattr(uart, "connect", mock.AsyncMock())
await api.connect()
api._command = mock.AsyncMock(spec=api._command)

with mock.patch("zigpy_xbee.uart.connect"):
await api.connect()


async def test_connect_initial_timeout_success():
"""Test connect, initial command times out."""
api = xbee_api.XBee(DEVICE_CONFIG)
api._at_command = mock.AsyncMock(side_effect=asyncio.TimeoutError)
api.init_api_mode = mock.AsyncMock(return_value=True)

with mock.patch("zigpy_xbee.uart.connect"):
await api.connect()


async def test_connect_initial_timeout_failure():
"""Test connect, initial command times out."""
api = xbee_api.XBee(DEVICE_CONFIG)
api._at_command = mock.AsyncMock(side_effect=asyncio.TimeoutError)
api.init_api_mode = mock.AsyncMock(return_value=False)

with mock.patch("zigpy_xbee.uart.connect") as mock_connect:
with pytest.raises(zigpy.exceptions.APIException):
await api.connect()

assert mock_connect.return_value.disconnect.mock_calls == [mock.call()]

def test_close(api):

async def test_disconnect(api):
"""Test connection close."""
uart = api._uart
api.close()
await api.disconnect()

assert api._uart is None
assert uart.close.call_count == 1
assert uart.disconnect.call_count == 1


def test_commands():
Expand Down Expand Up @@ -599,97 +622,10 @@ def test_handle_many_to_one_rri(api):
api._handle_many_to_one_rri(ieee, nwk, 0)


@mock.patch.object(xbee_api.XBee, "_at_command", new_callable=mock.AsyncMock)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
async def test_probe_success(mock_connect, mock_at_cmd):
"""Test device probing."""

res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is True
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "init_api_mode", return_value=True)
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
async def test_probe_success_api_mode(mock_connect, mock_at_cmd, mock_api_mode):
"""Test device probing."""

res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is True
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_api_mode.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "init_api_mode")
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
@pytest.mark.parametrize(
"exception",
(asyncio.TimeoutError, serial.SerialException, zigpy.exceptions.APIException),
)
async def test_probe_fail(mock_connect, mock_at_cmd, mock_api_mode, exception):
"""Test device probing fails."""

mock_api_mode.side_effect = exception
mock_api_mode.reset_mock()
mock_at_cmd.reset_mock()
mock_connect.reset_mock()
res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is False
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_api_mode.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "init_api_mode", return_value=False)
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
async def test_probe_fail_api_mode(mock_connect, mock_at_cmd, mock_api_mode):
"""Test device probing fails."""

mock_api_mode.reset_mock()
mock_at_cmd.reset_mock()
mock_connect.reset_mock()
res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is False
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_api_mode.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "connect", return_value=mock.MagicMock())
async def test_xbee_new(conn_mck):
"""Test new class method."""
api = await xbee_api.XBee.new(mock.sentinel.application, DEVICE_CONFIG)
assert isinstance(api, xbee_api.XBee)
assert conn_mck.call_count == 1
assert conn_mck.await_count == 1


@mock.patch.object(xbee_api.XBee, "connect", return_value=mock.MagicMock())
async def test_connection_lost(conn_mck):
async def test_connection_lost(api):
"""Test `connection_lost` propagataion."""
api = await xbee_api.XBee.new(mock.sentinel.application, DEVICE_CONFIG)
await api.connect()

app = api._app = mock.MagicMock()
api.set_application(mock.AsyncMock())

err = RuntimeError()
api.connection_lost(err)

app.connection_lost.assert_called_once_with(err)
api._app.connection_lost.assert_called_once_with(err)
34 changes: 13 additions & 21 deletions tests/test_application.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for ControllerApplication."""

import asyncio
from unittest import mock

import pytest
import zigpy.config as config
Expand All @@ -15,8 +16,6 @@
import zigpy_xbee.types as xbee_t
from zigpy_xbee.zigbee import application

import tests.async_mock as mock

APP_CONFIG = {
config.CONF_DEVICE: {
config.CONF_DEVICE_PATH: "/dev/null",
Expand Down Expand Up @@ -374,13 +373,12 @@ def init_api_mode_mock():
api_mode = api_config_succeeds
return api_config_succeeds

with mock.patch("zigpy_xbee.api.XBee") as XBee_mock:
api_mock = mock.MagicMock()
api_mock._at_command = mock.AsyncMock(side_effect=_at_command_mock)
api_mock.init_api_mode = mock.AsyncMock(side_effect=init_api_mode_mock)

XBee_mock.new = mock.AsyncMock(return_value=api_mock)
api_mock = mock.MagicMock()
api_mock._at_command = mock.AsyncMock(side_effect=_at_command_mock)
api_mock.init_api_mode = mock.AsyncMock(side_effect=init_api_mode_mock)
api_mock.connect = mock.AsyncMock()

with mock.patch("zigpy_xbee.api.XBee", return_value=api_mock):
await app.connect()

app.form_network = mock.AsyncMock()
Expand Down Expand Up @@ -418,23 +416,17 @@ async def test_start_network(app):

async def test_start_network_no_api_mode(app):
"""Test start network when not in API mode."""
await _test_start_network(app, ai_status=0x00, api_mode=False)
assert app.state.node_info.nwk == 0x0000
assert app.state.node_info.ieee == t.EUI64(range(1, 9))
assert app._api.init_api_mode.call_count == 1
assert app._api._at_command.call_count >= 16
with pytest.raises(asyncio.TimeoutError):
await _test_start_network(app, ai_status=0x00, api_mode=False)


async def test_start_network_api_mode_config_fails(app):
"""Test start network when not when API config fails."""
with pytest.raises(zigpy.exceptions.ControllerException):
with pytest.raises(asyncio.TimeoutError):
await _test_start_network(
app, ai_status=0x00, api_mode=False, api_config_succeeds=False
)

assert app._api.init_api_mode.call_count == 1
assert app._api._at_command.call_count == 1


async def test_permit(app):
"""Test permit joins."""
Expand Down Expand Up @@ -559,11 +551,11 @@ async def test_force_remove(app):

async def test_shutdown(app):
"""Test application shutdown."""
mack_close = mock.MagicMock()
app._api.close = mack_close
await app.shutdown()
mock_disconnect = mock.AsyncMock()
app._api.disconnect = mock_disconnect
await app.disconnect()
assert app._api is None
assert mack_close.call_count == 1
assert mock_disconnect.call_count == 1


async def test_remote_at_cmd(app, device):
Expand Down
26 changes: 9 additions & 17 deletions tests/test_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ def test_command_mode_send(gw):
gw._transport.write.assert_called_once_with(data)


def test_close(gw):
async def test_disconnect(gw):
"""Test closing connection."""
gw.close()
assert gw._transport.close.call_count == 1
transport = gw._transport
asyncio.get_running_loop().call_soon(gw.connection_lost, None)
await gw.disconnect()
assert transport.close.call_count == 1


def test_data_received_chunk_frame(gw):
Expand Down Expand Up @@ -228,22 +230,12 @@ def test_unescape_underflow(gw):

def test_connection_lost_exc(gw):
"""Test cannection lost callback is called."""
gw._connected_future = asyncio.Future()

gw.connection_lost(ValueError())

conn_lost = gw._api.connection_lost
assert conn_lost.call_count == 1
assert isinstance(conn_lost.call_args[0][0], Exception)
assert gw._connected_future.done()
assert gw._connected_future.exception()
err = RuntimeError()
gw.connection_lost(err)
assert gw._api.connection_lost.mock_calls == [mock.call(err)]


def test_connection_closed(gw):
"""Test connection closed."""
gw._connected_future = asyncio.Future()
gw.connection_lost(None)

assert gw._api.connection_lost.call_count == 0
assert gw._connected_future.done()
assert gw._connected_future.result() is True
assert gw._api.connection_lost.mock_calls == [mock.call(None)]
Loading

0 comments on commit c2d2f5b

Please sign in to comment.