From 0b6813bcd56ba6d6466ad8afcd1ebabfe4c6b922 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sat, 6 May 2023 14:49:17 +0200 Subject: [PATCH 01/11] Setup flake8 for codestyle checks and mask flase-positives --- .flake8 | 16 ++++++++++++++++ test/test_examples.py | 20 ++++++++++---------- 2 files changed, 26 insertions(+), 10 deletions(-) create mode 100644 .flake8 diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..b0efa707 --- /dev/null +++ b/.flake8 @@ -0,0 +1,16 @@ +[flake8] +exclude= + .cache, + .git, + dist, + docs, + htmlcov, + node_modules, + venv, + venv*, + .mypy_cache, + test/assets +max-line-length=120 +per-file-ignores= + */__init__.py:F401 + example/*:F403,F405 diff --git a/test/test_examples.py b/test/test_examples.py index 261c76aa..262c16f0 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -13,41 +13,41 @@ async def shutdown() -> None: class BasicTest(unittest.TestCase): def test_ui_showcase(self) -> None: shc.supervisor.event_loop.create_task(shutdown()) - import example.ui_showcase # type: ignore + import example.ui_showcase # type: ignore # noqa: F401 shc.main() def test_ui_logging_showcase(self) -> None: shc.supervisor.event_loop.create_task(shutdown()) - import example.ui_logging_showcase # type: ignore + import example.ui_logging_showcase # type: ignore # noqa: F401 shc.main() def test_server_client_example(self) -> None: shc.supervisor.event_loop.create_task(shutdown()) # The examples should actually be able to coexist in a single SHC instance - import example.server_client.server # type: ignore - import example.server_client.client # type: ignore + import example.server_client.server # type: ignore # noqa: F401 + import example.server_client.client # type: ignore # noqa: F401 shc.main() def test_tasmota_led_example(self) -> None: - import example.tasmota_led_ir_with_ui # type: ignore + import example.tasmota_led_ir_with_ui # type: ignore # noqa: F401 def test_telegram_example(self) -> None: - import example.telegram # type: ignore + import example.telegram # type: ignore # noqa: F401 def test_pulseaudio_sink_example(self) -> None: - import example.pulseaudio_sink # type: ignore + import example.pulseaudio_sink # type: ignore # noqa: F401 def test_knx_specifics_example(self) -> None: - import example.knx_specifics # type: ignore + import example.knx_specifics # type: ignore # noqa: F401 def test_custom_ui_widet_example(self) -> None: - import example.custom_ui_widget.main # type: ignore + import example.custom_ui_widget.main # type: ignore # noqa: F401 # TODO add selenium test for actual position of the indicator def test_sun_position_weather_forecast_example(self) -> None: - import example.sun_position_weather_forecast # type: ignore + import example.sun_position_weather_forecast # type: ignore # noqa: F401 @classmethod def setUpClass(cls) -> None: From 699688cf13990d7e75083b91b81c2469f73ddbb9 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sat, 6 May 2023 14:57:47 +0200 Subject: [PATCH 02/11] ci: Use flake8 instead of pycodestyle --- .github/workflows/build.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 86f6528d..d429b084 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -57,14 +57,14 @@ jobs: - name: Install Python dependencies run: | python -m pip install --upgrade pip - pip install pycodestyle mypy + pip install flake8 mypy pip install -r requirements.txt - name: Check typing with MyPy run: | mypy - - name: Check code style with PyCodestyle + - name: Check code style with flake8 run: | - pycodestyle --max-line-length 120 shc/ test/ example/ + flake8 package: runs-on: ubuntu-latest From 046966477f469c9f9c7800435a1b28a4549ab03d Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sat, 6 May 2023 15:08:15 +0200 Subject: [PATCH 03/11] flake8: Add complexity checks with max-complecity=15 --- .flake8 | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.flake8 b/.flake8 index b0efa707..19073e02 100644 --- a/.flake8 +++ b/.flake8 @@ -10,7 +10,8 @@ exclude= venv*, .mypy_cache, test/assets -max-line-length=120 per-file-ignores= */__init__.py:F401 example/*:F403,F405 +max-line-length=120 +max-complexity=15 From b663850c9b545c1ab7f996285cfdf40e3b7d2b89 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sat, 6 May 2023 15:09:05 +0200 Subject: [PATCH 04/11] readme: Add information about mypy and flake8 --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index ea3ef8a7..52074592 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,8 @@ Please, consult the [documentation](https://smarthomeconnect.readthedocs.io/en/l If you want to help with the development of *Smart Home Connect*, your Pull Requests are always appreciated. +### Development setup + Setting up a dev environment for SHC is simple: Clone the git repository and install the development dependencies, listed in `requirements.txt` (+ the `python-rtmidi` module if you want to run the MIDI tests). These include all dependencies of smarthomeconnect with all extras: @@ -214,6 +216,9 @@ pip3 install python-rtmidi ``` You may want to use a virtual environment to avoid messing up your Python packages. + +### Web UI Frontend Assets + Additionally, you'll need NodeJS and NPM on your machine for downloading and packing the web UI frontend asset files. Use the following commands to download all frontend dependencies from NPM and package them into `/shc/web/static` (using Parcel.js): ```bash @@ -225,6 +230,9 @@ When working on the web UI source files themselves (which are located in `web_ui npx parcel web_ui_src/main.js --dist-dir shc/web/static/pack --public-url ./ ``` + +### Tests and Code Style + Please make sure that all the unittests are passing, when submitting a Pull Request: ```bash python3 -m unittest @@ -236,4 +244,10 @@ To check it, you may want to determine it locally, using the `coverage` tool: ```bash coverage run -m unittest coverage html +# open htmlcov/index.html ``` + +We also enforce static type correctness with MyPy and Python codestyle rules with flake8. +To run the static type checks and codestyle checks locally, simply install MyPy and flake8 and execute the `mypy` and `flake8` commands in the shc project repository. + +All these checks are also performed by the GitHub Actions CI for Pull Requests and the master branch. From 7056bf8409d554367585bb2caaea75f60d9dc35b Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sat, 6 May 2023 15:26:16 +0200 Subject: [PATCH 05/11] Fix unused imports --- example/ui_showcase.py | 1 - shc/base.py | 2 +- shc/expressions.py | 2 +- shc/interfaces/midi.py | 2 +- shc/interfaces/mqtt.py | 4 ++-- shc/interfaces/pulse.py | 4 ++-- shc/interfaces/shc_client.py | 1 - shc/interfaces/tasmota.py | 1 - shc/interfaces/telegram.py | 2 +- shc/log/file_persistence.py | 3 +-- shc/misc.py | 2 +- shc/supervisor.py | 3 +-- shc/util/check_shc.py | 3 +-- shc/variables.py | 2 +- shc/web/log_widgets.py | 2 -- shc/web/widgets.py | 2 +- test/interfaces/test_midi.py | 3 +-- test/interfaces/test_shc_client.py | 3 +-- test/interfaces/test_tasmota.py | 1 - test/test_interfaces_stub.py | 6 +++--- test/test_timer.py | 4 ++-- test/test_variables.py | 2 +- test/test_web.py | 4 ++-- 23 files changed, 24 insertions(+), 35 deletions(-) diff --git a/example/ui_showcase.py b/example/ui_showcase.py index 5a6f30fa..219a6cea 100644 --- a/example/ui_showcase.py +++ b/example/ui_showcase.py @@ -20,7 +20,6 @@ """ import random import enum -from pathlib import Path import markupsafe diff --git a/shc/base.py b/shc/base.py index 3ca3c52e..81e43420 100644 --- a/shc/base.py +++ b/shc/base.py @@ -16,7 +16,7 @@ import inspect import logging import random -from typing import Generic, List, Any, Tuple, Callable, Optional, Type, TypeVar, Awaitable, Union, Dict, Set +from typing import Generic, List, Any, Tuple, Callable, Optional, Type, TypeVar, Awaitable, Union, Dict from . import conversion diff --git a/shc/expressions.py b/shc/expressions.py index 0bb2e46f..73dce9aa 100644 --- a/shc/expressions.py +++ b/shc/expressions.py @@ -20,7 +20,7 @@ from . import conversion from .base import Readable, Subscribable, T, Connectable, Writable, S, LogicHandler, UninitializedError -from .datatypes import RangeFloat1, RangeUInt8, HSVFloat1, RGBFloat1, RGBUInt8 +from .datatypes import RangeFloat1 class ExpressionBuilder(Connectable[T], metaclass=abc.ABCMeta): diff --git a/shc/interfaces/midi.py b/shc/interfaces/midi.py index 2fd60513..6dc90911 100644 --- a/shc/interfaces/midi.py +++ b/shc/interfaces/midi.py @@ -16,7 +16,7 @@ import mido -from ..base import Subscribable, Writable, T +from ..base import Subscribable, Writable from ..datatypes import RangeUInt8 from ..supervisor import stop, AbstractInterface diff --git a/shc/interfaces/mqtt.py b/shc/interfaces/mqtt.py index 5833aa6e..05cd75dc 100644 --- a/shc/interfaces/mqtt.py +++ b/shc/interfaces/mqtt.py @@ -14,9 +14,9 @@ import itertools import json import logging -from typing import List, Any, Generic, Type, Callable, Awaitable, Optional, Tuple, Union, Dict, Deque +from typing import List, Any, Generic, Type, Callable, Awaitable, Optional, Union, Dict, Deque -from paho.mqtt.client import MQTTMessage, MQTTv311 +from paho.mqtt.client import MQTTMessage from asyncio_mqtt import Client, MqttError, ProtocolVersion from paho.mqtt.matcher import MQTTMatcher diff --git a/shc/interfaces/pulse.py b/shc/interfaces/pulse.py index f32f73c0..72cf70a3 100644 --- a/shc/interfaces/pulse.py +++ b/shc/interfaces/pulse.py @@ -18,11 +18,11 @@ if TYPE_CHECKING: from pulsectl import ( - PulseEventInfo, PulseSinkInfo, PulseSourceInfo, PulseServerInfo, PulseVolumeInfo) + PulseEventInfo, PulseSinkInfo, PulseSourceInfo, PulseServerInfo) from pulsectl_asyncio import PulseAsync import shc.conversion -from shc.base import Connectable, Subscribable, Readable, T, UninitializedError, Writable +from shc.base import Subscribable, Readable, T, UninitializedError, Writable from shc.datatypes import RangeFloat1, Balance from shc.interfaces._helper import SupervisedClientInterface diff --git a/shc/interfaces/shc_client.py b/shc/interfaces/shc_client.py index b93ffed7..d2332b68 100644 --- a/shc/interfaces/shc_client.py +++ b/shc/interfaces/shc_client.py @@ -20,7 +20,6 @@ from ._helper import SupervisedClientInterface from ..base import T, Subscribable, Writable, Readable, UninitializedError, Reading from ..conversion import SHCJsonEncoder, from_json -from ..supervisor import register_interface, stop logger = logging.getLogger(__name__) diff --git a/shc/interfaces/tasmota.py b/shc/interfaces/tasmota.py index dd652f40..7a76f89b 100644 --- a/shc/interfaces/tasmota.py +++ b/shc/interfaces/tasmota.py @@ -16,7 +16,6 @@ import json import logging import re -import time from typing import List, Any, Dict, Deque, Generic, Union, Type, TypeVar, Tuple, cast, Optional, NamedTuple from paho.mqtt.client import MQTTMessage diff --git a/shc/interfaces/telegram.py b/shc/interfaces/telegram.py index f8553b5e..c343881f 100644 --- a/shc/interfaces/telegram.py +++ b/shc/interfaces/telegram.py @@ -2,7 +2,7 @@ import asyncio import logging import re -from typing import Generic, TypeVar, Set, Type, Optional, List, Pattern, Tuple, Dict, Any, Callable +from typing import Generic, TypeVar, Set, Type, Optional, List, Dict, Any, Callable import aiogram from aiogram.bot.api import TelegramAPIServer, TELEGRAM_PRODUCTION diff --git a/shc/log/file_persistence.py b/shc/log/file_persistence.py index b11f39bf..1527d29d 100644 --- a/shc/log/file_persistence.py +++ b/shc/log/file_persistence.py @@ -1,8 +1,7 @@ import asyncio -import io import json import logging -from typing import IO, Any, Dict, Tuple, Optional, Generic, List, Type +from typing import Any, Dict, Tuple, Optional, Generic, List, Type from pathlib import Path import aiofile diff --git a/shc/misc.py b/shc/misc.py index 395910a0..76c40afe 100644 --- a/shc/misc.py +++ b/shc/misc.py @@ -14,7 +14,7 @@ from typing import Generic, Type, List, Any, Optional, Callable, Dict, Awaitable from shc import conversion -from shc.base import Readable, Subscribable, Writable, handler, T, ConnectableWrapper, UninitializedError, Reading, S +from shc.base import Readable, Subscribable, Writable, T, ConnectableWrapper, UninitializedError, Reading, S from shc.datatypes import RangeFloat1, FadeStep from shc.expressions import ExpressionWrapper from shc.timer import Every diff --git a/shc/supervisor.py b/shc/supervisor.py index 3f5575d4..546885df 100644 --- a/shc/supervisor.py +++ b/shc/supervisor.py @@ -10,12 +10,11 @@ # specific language governing permissions and limitations under the License. import abc import asyncio -import collections import enum import functools import logging import signal -from typing import Set, NamedTuple, Dict, Any, Union, Iterable, Deque, Tuple +from typing import Set, NamedTuple, Iterable from .base import Readable from .timer import timer_supervisor diff --git a/shc/util/check_shc.py b/shc/util/check_shc.py index a2590d6a..14f79b16 100644 --- a/shc/util/check_shc.py +++ b/shc/util/check_shc.py @@ -21,11 +21,10 @@ import argparse import enum import json -import re import sys import urllib.request import urllib.error -from typing import NamedTuple, Optional, Tuple, NoReturn, List +from typing import NoReturn def main() -> None: diff --git a/shc/variables.py b/shc/variables.py index 98b5bef4..53f5fde5 100644 --- a/shc/variables.py +++ b/shc/variables.py @@ -12,7 +12,7 @@ import asyncio import logging import warnings -from typing import Generic, Type, Optional, List, Any, Union, Dict, NamedTuple +from typing import Generic, Type, Optional, List, Any, Union, Dict from .base import Writable, T, Readable, Subscribable, UninitializedError, Reading from .expressions import ExpressionWrapper diff --git a/shc/web/log_widgets.py b/shc/web/log_widgets.py index 15d2d884..04419b95 100644 --- a/shc/web/log_widgets.py +++ b/shc/web/log_widgets.py @@ -1,8 +1,6 @@ import datetime -from pathlib import Path from typing import Iterable, Optional, Generic, Union, Callable, NamedTuple, Tuple, List -import jinja2 from markupsafe import Markup from ..log.generic import PersistenceVariable, LoggingRawWebUIView, AggregationMethod, LoggingAggregatedWebUIView diff --git a/shc/web/widgets.py b/shc/web/widgets.py index 9767e60c..dfeb0488 100644 --- a/shc/web/widgets.py +++ b/shc/web/widgets.py @@ -1,4 +1,4 @@ -# Copyright 2020 Michael Thies +# Copyright 2020-2022 Michael Thies # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at diff --git a/test/interfaces/test_midi.py b/test/interfaces/test_midi.py index 42dfd400..c3e596aa 100644 --- a/test/interfaces/test_midi.py +++ b/test/interfaces/test_midi.py @@ -1,4 +1,3 @@ -import asyncio import time import unittest import unittest.mock @@ -7,7 +6,7 @@ import shc.interfaces.midi from shc.datatypes import RangeUInt8 -from .._helper import InterfaceThreadRunner, AsyncMock +from .._helper import InterfaceThreadRunner try: diff --git a/test/interfaces/test_shc_client.py b/test/interfaces/test_shc_client.py index 9da3c469..fa2e955e 100644 --- a/test/interfaces/test_shc_client.py +++ b/test/interfaces/test_shc_client.py @@ -9,7 +9,6 @@ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. import asyncio -import datetime import logging import unittest import unittest.mock @@ -21,7 +20,7 @@ import shc.web import shc.interfaces.shc_client from test._helper import ExampleReadable, InterfaceThreadRunner, ExampleWritable, ExampleSubscribable, async_test, \ - ClockMock, AsyncMock + AsyncMock class ExampleType(NamedTuple): diff --git a/test/interfaces/test_tasmota.py b/test/interfaces/test_tasmota.py index 36cd6757..cb11943c 100644 --- a/test/interfaces/test_tasmota.py +++ b/test/interfaces/test_tasmota.py @@ -1,5 +1,4 @@ import asyncio -import datetime import json import shutil import subprocess diff --git a/test/test_interfaces_stub.py b/test/test_interfaces_stub.py index 553b040f..517988de 100644 --- a/test/test_interfaces_stub.py +++ b/test/test_interfaces_stub.py @@ -3,10 +3,10 @@ class InterfaceImportTest(unittest.TestCase): def test_knx(self): - import shc.interfaces.knx + import shc.interfaces.knx # noqa: F401 def test_dmx(self): - import shc.interfaces.dmx + import shc.interfaces.dmx # noqa: F401 def test_midi(self): - import shc.interfaces.midi + import shc.interfaces.midi # noqa: F401 diff --git a/test/test_timer.py b/test/test_timer.py index 68f5058c..9f2193d1 100644 --- a/test/test_timer.py +++ b/test/test_timer.py @@ -6,8 +6,8 @@ from typing import Optional, List import shc.base -from shc import timer, base, datatypes -from ._helper import ClockMock, async_test, ExampleSubscribable, AsyncMock, ExampleWritable, ExampleReadable +from shc import timer, datatypes +from ._helper import ClockMock, async_test, ExampleSubscribable, ExampleWritable, ExampleReadable class LogarithmicSleepTest(unittest.TestCase): diff --git a/test/test_variables.py b/test/test_variables.py index 518da547..4a61341a 100644 --- a/test/test_variables.py +++ b/test/test_variables.py @@ -8,7 +8,7 @@ import mypy.api from shc import variables, base, expressions -from ._helper import async_test, ExampleReadable, ExampleWritable, AsyncMock +from ._helper import async_test, ExampleReadable, ExampleWritable class SimpleVariableTest(unittest.TestCase): diff --git a/test/test_web.py b/test/test_web.py index 6d3b55e4..d88b8b5d 100644 --- a/test/test_web.py +++ b/test/test_web.py @@ -24,8 +24,8 @@ import shc.web.widgets from shc.datatypes import RangeFloat1, RGBUInt8, RangeUInt8 from shc.interfaces._helper import ReadableStatusInterface -from shc.supervisor import AbstractInterface, InterfaceStatus, ServiceStatus -from ._helper import InterfaceThreadRunner, ExampleReadable, AsyncMock, async_test +from shc.supervisor import InterfaceStatus, ServiceStatus +from ._helper import InterfaceThreadRunner, ExampleReadable, async_test class StatusTestInterface(ReadableStatusInterface): From 6685caa2a8adf4df0eeb126076efef03030b717b Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sat, 6 May 2023 16:27:29 +0200 Subject: [PATCH 06/11] Fix unused variables and unnecessary f-strings --- shc/interfaces/shc_client.py | 2 +- shc/timer.py | 1 - test/interfaces/test_knx.py | 9 ++++---- test/interfaces/test_shc_client.py | 4 ++-- test/interfaces/test_tasmota.py | 6 ++--- test/interfaces/test_telegram.py | 31 ++++++++++++------------- test/test_misc.py | 2 +- test/test_timer.py | 36 +++++++++++++++--------------- test/test_variables.py | 4 ++-- test/test_web.py | 6 ++--- 10 files changed, 51 insertions(+), 50 deletions(-) diff --git a/shc/interfaces/shc_client.py b/shc/interfaces/shc_client.py index d2332b68..c7c29275 100644 --- a/shc/interfaces/shc_client.py +++ b/shc/interfaces/shc_client.py @@ -141,7 +141,7 @@ def _websocket_dispatch(self, msg: aiohttp.WSMessage) -> None: try: name = message["name"] - status = message["status"] + _status = message["status"] # noqa: F841 except KeyError: logger.warning("Websocket message from SHC server does not include 'name' and 'status' fields: %s", msg.data) diff --git a/shc/timer.py b/shc/timer.py index f49e6c09..62540aa7 100644 --- a/shc/timer.py +++ b/shc/timer.py @@ -617,7 +617,6 @@ async def __set_delayed(self, value: T, origin: List[Any]): await _logarithmic_sleep(datetime.datetime.now() + self.delay) except asyncio.CancelledError: return - changed = value != self._value logger.debug("Value %s for Delay %s is now active and published", value, self) self._value = value self._publish(value, origin) diff --git a/test/interfaces/test_knx.py b/test/interfaces/test_knx.py index 9de40d8e..099e4097 100644 --- a/test/interfaces/test_knx.py +++ b/test/interfaces/test_knx.py @@ -130,10 +130,11 @@ async def test_send(self) -> None: @async_test async def test_respond(self) -> None: - group_connector1 = self.interface.group(knx.KNXGAD(1, 2, 3), "1")\ - .connect(ExampleReadable(bool, True), read=True) - group_connector2 = self.interface.group(knx.KNXGAD(17, 5, 127), "10")\ - .connect(ExampleReadable(knxdclient.KNXTime, knxdclient.KNXTime(datetime.time(6, 0, 11), 5)), read=True) + _group_connector1 = self.interface.group(knx.KNXGAD(1, 2, 3), "1")\ + .connect(ExampleReadable(bool, True), read=True) # noqa: F841 + _group_connector2 = self.interface.group(knx.KNXGAD(17, 5, 127), "10")\ + .connect(ExampleReadable(knxdclient.KNXTime, knxdclient.KNXTime(datetime.time(6, 0, 11), 5)), # noqa: F841 + read=True) self.interface_runner.start() diff --git a/test/interfaces/test_shc_client.py b/test/interfaces/test_shc_client.py index fa2e955e..0f5d53ff 100644 --- a/test/interfaces/test_shc_client.py +++ b/test/interfaces/test_shc_client.py @@ -157,8 +157,8 @@ def test_reconnect(self) -> None: client_bar = self.client.object(ExampleType, 'bar') bar_target = ExampleWritable(ExampleType)\ .connect(client_bar) - client_foo = self.client.object(int, 'foo')\ - .connect(ExampleReadable(int, 56), read=True) + _client_foo = self.client.object(int, 'foo')\ + .connect(ExampleReadable(int, 56), read=True) # noqa: F841 client_status_target = ExampleWritable(shc.supervisor.InterfaceStatus)\ .connect(self.client.monitoring_connector()) diff --git a/test/interfaces/test_tasmota.py b/test/interfaces/test_tasmota.py index cb11943c..a616ed09 100644 --- a/test/interfaces/test_tasmota.py +++ b/test/interfaces/test_tasmota.py @@ -158,7 +158,7 @@ def construct_color(r, g, b, w): target_power._write.assert_called_once_with(False, unittest.mock.ANY) async with asyncio_mqtt.Client("localhost", 42883, client_id="some-other-client") as c: - await c.publish(f"cmnd/test-device/color", b'#aabbcc', retain=True) + await c.publish("cmnd/test-device/color", b'#aabbcc', retain=True) await asyncio.sleep(0.1) target_color._write.assert_called_with(construct_color(170, 187, 204, 0), [conn_color]) # rounding error @@ -220,7 +220,7 @@ async def test_sensor_ir(self) -> None: target_ir._write.assert_not_called() async with asyncio_mqtt.Client("localhost", 42883, client_id="some-other-client") as c: - await c.publish(f"tele/test-device/RESULT", + await c.publish("tele/test-device/RESULT", json.dumps({"Time": "1970-01-10T03:12:43", "IrReceived": {"Protocol": "NEC", "Bits": 32, "Data": "0x00F7609F"}}) .encode('ascii')) @@ -258,7 +258,7 @@ async def test_sensor_power(self) -> None: target_total._write.assert_not_called() async with asyncio_mqtt.Client("localhost", 42883, client_id="some-other-client") as c: - await c.publish(f"tele/test-device/SENSOR", + await c.publish("tele/test-device/SENSOR", json.dumps({"Time": "1970-01-01T00:49:50", "ENERGY": {"TotalStartTime": "1970-01-01T00:00:00", "Total": 0.012, "Yesterday": 0.000, "Today": 0.012, "Period": 2, "Power": 15, diff --git a/test/interfaces/test_telegram.py b/test/interfaces/test_telegram.py index 6d616bab..36b3b3cc 100644 --- a/test/interfaces/test_telegram.py +++ b/test/interfaces/test_telegram.py @@ -72,8 +72,8 @@ async def test_start(self) -> None: @async_test async def test_write(self) -> None: foo = self.client.on_off_connector("Foo", {'max', 'tim'}) - foo_target = ExampleWritable(bool)\ - .connect(foo) + _foo_target = ExampleWritable(bool)\ + .connect(foo) # noqa: F841 foobar = self.client.generic_connector(int, "Foobar", lambda x: str(x), lambda x: int(x), {'max', 'alice'}) foobar_target = ExampleWritable(int)\ .connect(foobar) @@ -171,11 +171,11 @@ async def test_write(self) -> None: @async_test async def test_inline_cancel(self) -> None: - foo = self.client.on_off_connector("Foo", {'max', 'tim'})\ - .connect(ExampleReadable(bool, False)) - foobar = self.client.generic_connector(int, "Foobar", lambda x: str(x), lambda x: int(x), {'max', 'alice'})\ + _foo = self.client.on_off_connector("Foo", {'max', 'tim'})\ + .connect(ExampleReadable(bool, False)) # noqa: F841 + _foobar = self.client.generic_connector(int, "Foobar", lambda x: str(x), lambda x: int(x), {'max', 'alice'})\ .connect(ExampleReadable(int, 42))\ - .connect(ExampleWritable(int)) + .connect(ExampleWritable(int)) # noqa: F841 await self.api_mock.start() self.client_runner.start() @@ -230,7 +230,7 @@ async def test_inline_cancel(self) -> None: @async_test async def test_auth_errors(self) -> None: - foo = self.client.on_off_connector("Foo", {'max', 'tim'}) + _foo = self.client.on_off_connector("Foo", {'max', 'tim'}) # noqa: F841 await self.api_mock.start() self.client_runner.start() @@ -322,15 +322,16 @@ async def test_write_on_off(self) -> None: @async_test async def test_read(self) -> None: - foo = self.client.str_connector("Foo", {'max'}, {'max', 'tim'})\ + _foo = self.client.str_connector("Foo", {'max'}, {'max', 'tim'})\ .connect(ExampleWritable(str))\ - .connect(ExampleReadable(str, "hello, world!")) - foobar = self.client.generic_connector(int, "Foobar", lambda x: str(x), lambda x: int(x), {'max', 'alice'}, - options=["0", "5", "15"])\ + .connect(ExampleReadable(str, "hello, world!")) # noqa: F841 + _foobar = self.client.generic_connector(int, "Foobar", # noqa: F841 + lambda x: str(x), lambda x: int(x), {'max', 'alice'}, + options=["0", "5", "15"])\ .connect(ExampleWritable(int))\ - .connect(ExampleReadable(int, 42)) - bar = self.client.generic_connector(str, "Bar", lambda x: x, lambda x: x, {'alice', 'max'}, {'alice'})\ - .connect(ExampleWritable(str)) + .connect(ExampleReadable(int, 42)) # noqa: F841 + _bar = self.client.generic_connector(str, "Bar", lambda x: x, lambda x: x, {'alice', 'max'}, {'alice'})\ + .connect(ExampleWritable(str)) # noqa: F841 await self.api_mock.start() self.client_runner.start() @@ -543,7 +544,7 @@ async def _any_method(self, request: aiohttp.web.Request) -> aiohttp.web.Respons def _create_send_message_response(self, data: Dict[str, JSON_TYPE]) -> Dict[str, JSON_TYPE]: if 'text' not in data: - raise aiohttp.web.HTTPNotImplemented(text=f"Only text messages are implemented in the Mock") + raise aiohttp.web.HTTPNotImplemented(text="Only text messages are implemented in the Mock") return { 'message_id': random.randint(0, 2**32), 'from': self.user_object, diff --git a/test/test_misc.py b/test/test_misc.py index d42603f4..b8b7f7a2 100644 --- a/test/test_misc.py +++ b/test/test_misc.py @@ -215,7 +215,7 @@ async def test_simple_concurrent_update(self) -> None: var2 = shc.Variable(int) _exchange = shc.misc.UpdateExchange(int) \ .connect(var1)\ - .connect(var2) + .connect(var2) # noqa: F841 await asyncio.gather(var1.write(42, []), var2.write(56, [])) await asyncio.sleep(0.1) diff --git a/test/test_timer.py b/test/test_timer.py index 9f2193d1..60cf267c 100644 --- a/test/test_timer.py +++ b/test/test_timer.py @@ -208,7 +208,7 @@ def _assert_datetime(self, expected: datetime.datetime, actual: Optional[datetim self.assertAlmostEqual(expected.astimezone(), actual, delta=datetime.timedelta(seconds=.1)) def test_simple_next(self) -> None: - with ClockMock(datetime.datetime(2020, 1, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2020, 1, 1, 15, 7, 17)): once_timer = timer.At(hour=15, minute=7, second=17, millis=200) self._assert_datetime(datetime.datetime(2020, 1, 1, 15, 7, 17, 200000), once_timer._next_execution()) once_timer = timer.At(hour=15, minute=7, second=25) @@ -229,7 +229,7 @@ def test_simple_next(self) -> None: self._assert_datetime(datetime.datetime(2020, 4, 6, 0, 0, 0), once_timer._next_execution()) def test_spec_forms(self) -> None: - with ClockMock(datetime.datetime(2020, 1, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2020, 1, 1, 15, 7, 17)): once_timer = timer.At(hour=timer.EveryNth(2)) self._assert_datetime(datetime.datetime(2020, 1, 1, 16, 0, 0), once_timer._next_execution()) once_timer = timer.At(hour=timer.EveryNth(6)) @@ -242,7 +242,7 @@ def test_spec_forms(self) -> None: self._assert_datetime(datetime.datetime(2020, 1, 1, 15, 7, 17), once_timer._next_execution()) def test_stepback(self) -> None: - with ClockMock(datetime.datetime(2020, 1, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2020, 1, 1, 15, 7, 17)): once_timer = timer.At(hour=None, minute=0) self._assert_datetime(datetime.datetime(2020, 1, 1, 16, 0, 0), once_timer._next_execution()) once_timer = timer.At(hour=15, minute=0) @@ -251,28 +251,28 @@ def test_stepback(self) -> None: self._assert_datetime(datetime.datetime(2021, 1, 1, 0, 5, 16), once_timer._next_execution()) def test_overflows(self) -> None: - with ClockMock(datetime.datetime(2020, 12, 31, 23, 59, 46)) as clock: + with ClockMock(datetime.datetime(2020, 12, 31, 23, 59, 46)): once_timer = timer.At(hour=None, minute=None, second=timer.EveryNth(15)) self._assert_datetime(datetime.datetime(2021, 1, 1, 0, 0, 0), once_timer._next_execution()) - with ClockMock(datetime.datetime(2019, 2, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2019, 2, 1, 15, 7, 17)): once_timer = timer.At(day=29) self._assert_datetime(datetime.datetime(2019, 3, 29, 0, 0, 0), once_timer._next_execution()) - with ClockMock(datetime.datetime(2020, 2, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2020, 2, 1, 15, 7, 17)): once_timer = timer.At(day=29) self._assert_datetime(datetime.datetime(2020, 2, 29, 0, 0, 0), once_timer._next_execution()) - with ClockMock(datetime.datetime(2020, 4, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2020, 4, 1, 15, 7, 17)): once_timer = timer.At(day=31) self._assert_datetime(datetime.datetime(2020, 5, 31, 0, 0, 0), once_timer._next_execution()) - with ClockMock(datetime.datetime(2019, 1, 1, 15, 7, 17)) as clock: + with ClockMock(datetime.datetime(2019, 1, 1, 15, 7, 17)): once_timer = timer.At(weeknum=53) self._assert_datetime(datetime.datetime(2020, 12, 28, 0, 0, 0), once_timer._next_execution()) - with ClockMock(datetime.datetime(2020, 1, 1, 0, 0, 0)) as clock: + with ClockMock(datetime.datetime(2020, 1, 1, 0, 0, 0)): once_timer = timer.At(year=2019) self.assertIsNone(once_timer._next_execution()) def test_exception(self) -> None: with self.assertRaises(ValueError): - once_timer = timer.At(day=[1, 5, 15], weeknum=timer.EveryNth(2)) + timer.At(day=[1, 5, 15], weeknum=timer.EveryNth(2)) class BoolTimerTest(unittest.TestCase): @@ -288,7 +288,7 @@ def save_time(*args): ton = timer.TOn(base, datetime.timedelta(seconds=42)) with unittest.mock.patch.object(ton, "_publish", new=Mock(side_effect=save_time)) as publish_mock: - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): self.assertFalse(await ton.read()) # False should not be forwarded, when value is already False @@ -340,7 +340,7 @@ def save_time(*args): toff = timer.TOff(base, datetime.timedelta(seconds=42)) with unittest.mock.patch.object(toff, "_publish", new=Mock(side_effect=save_time)) as publish_mock: - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): self.assertFalse(await toff.read()) # False should not be forwarded, when value is already False @@ -498,7 +498,7 @@ async def test_duration(self) -> None: timerswitch = timer.TimerSwitch([pub_on1, pub_on2], duration=datetime.timedelta(seconds=42)) with unittest.mock.patch.object(timerswitch, "_publish") as publish_mock: - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): self.assertFalse(await timerswitch.read()) await pub_on2.publish(None, [self]) @@ -550,7 +550,7 @@ def save_time(*args): self.assertIs(rate_limiter.type, int) with unittest.mock.patch.object(rate_limiter, "_publish", new=Mock(side_effect=save_time)) as publish_mock: - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): # First value should be forwarded immediately await base.publish(42, [self]) await asyncio.sleep(0.1) @@ -609,7 +609,7 @@ async def test_simple(self) -> None: with self.assertRaises(shc.base.UninitializedError): await ramp2.read() - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): await subscribable1.publish(datatypes.RangeUInt8(0), [self]) await subscribable2.publish(datatypes.RangeFloat1(0), [self]) await subscribable3.publish(BLACK, [self]) @@ -680,7 +680,7 @@ async def test_enable_ramp(self) -> None: ramp1 = timer.RGBHSVRamp(subscribable1, datetime.timedelta(seconds=1), max_frequency=2, enable_ramp=enable1) writable1 = ExampleWritable(datatypes.RGBUInt8).connect(ramp1) - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): # Ramping should work normal (as in the other test await subscribable1.publish(BLACK, [self]) writable1._write.assert_called_once_with(BLACK, [self, subscribable1, ramp1]) @@ -722,7 +722,7 @@ async def test_stateful_target(self) -> None: variable1 = shc.Variable(datatypes.RangeUInt8).connect(ramp1) writable1 = ExampleWritable(datatypes.RangeUInt8).connect(variable1) - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): await subscribable1.publish(datatypes.RangeUInt8(0), [self]) await asyncio.sleep(0.05) writable1._write.assert_called_once_with(datatypes.RangeUInt8(0), [self, subscribable1, ramp1, variable1]) @@ -759,7 +759,7 @@ async def test_fade_step_ramp(self) -> None: variable1 = shc.Variable(datatypes.RangeFloat1).connect(ramp1) writable1 = ExampleWritable(datatypes.RangeFloat1).connect(variable1) - with ClockMock(begin, actual_sleep=0.05) as clock: + with ClockMock(begin, actual_sleep=0.05): with self.assertLogs() as logs: await subscribable1.publish(datatypes.FadeStep(0.5), [self]) await asyncio.sleep(0.05) diff --git a/test/test_variables.py b/test/test_variables.py index 4a61341a..7604b014 100644 --- a/test/test_variables.py +++ b/test/test_variables.py @@ -180,9 +180,9 @@ async def test_recursive_field_writing_legacy(self): with warnings.catch_warnings(record=True): # There seems to be a bug, that record=False breaks the catching with self.assertRaises(AttributeError): - var_c = var.c + _var_c = var.c # noqa: F841 with self.assertRaises(AttributeError): - var_c = var.a.c + _var_c = var.a.c # noqa: F841 with self.assertRaises(base.UninitializedError): await var.a.a.write(21, [self]) diff --git a/test/test_web.py b/test/test_web.py index d88b8b5d..b69bc354 100644 --- a/test/test_web.py +++ b/test/test_web.py @@ -630,7 +630,7 @@ def tearDown(self) -> None: self.server_runner.stop() def test_rest_get(self) -> None: - api_object = self.server.api(int, "the_api_object").connect(ExampleReadable(int, 42)) + _api_object = self.server.api(int, "the_api_object").connect(ExampleReadable(int, 42)) # noqa: F841 self.server_runner.start() with self.assertRaises(urllib.error.HTTPError) as cm: @@ -819,7 +819,7 @@ def tearDown(self) -> None: @async_test async def test_errors(self) -> None: - api_object = self.server.api(int, "the_api_object").connect(ExampleReadable(int, 42)) + _api_object = self.server.api(int, "the_api_object").connect(ExampleReadable(int, 42)) # noqa: F841 self.server_runner.start() await self.start_websocket() @@ -869,7 +869,7 @@ async def test_errors(self) -> None: @async_test async def test_get(self) -> None: - api_object = self.server.api(int, "the_api_object").connect(ExampleReadable(int, 42)) + _api_object = self.server.api(int, "the_api_object").connect(ExampleReadable(int, 42)) # noqa: F841 self.server_runner.start() await self.start_websocket() From 71ba6bffa64f3663115ba6e42de5bf112c4c5f51 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sun, 7 May 2023 14:06:48 +0200 Subject: [PATCH 07/11] interfaces._helper: Split up _supervise to reduce code complexity --- shc/interfaces/_helper.py | 216 ++++++++++++++++++++++++-------------- 1 file changed, 138 insertions(+), 78 deletions(-) diff --git a/shc/interfaces/_helper.py b/shc/interfaces/_helper.py index 305c19af..e3e82042 100644 --- a/shc/interfaces/_helper.py +++ b/shc/interfaces/_helper.py @@ -238,53 +238,15 @@ async def _supervise(self) -> None: sleep_interval = self.backoff_base self._status_connector.update_status(status=ServiceStatus.WARNING, message="Interface has not been started yet") + # Reconnect loop while True: exception = None - wait_stopping = asyncio.create_task(self._stopping.wait()) try: - # Connect - logger.debug("Running _connect for interface %s ...", self) - connect_task = asyncio.create_task(self._connect()) - # TODO timeout - done, _ = await asyncio.wait((connect_task, wait_stopping), return_when=asyncio.FIRST_COMPLETED) - if connect_task not in done: - logger.debug("Interface %s stopped before _connect finished", self) - connect_task.cancel() - connect_task.result() # raise exception if any - - # Start _run task and wait for _running - logger.debug("Starting _run task for interface %s ...", self) - run_task = asyncio.create_task(self._run()) - wait_running = asyncio.create_task(self._running.wait()) - # TODO timeout - logger.debug("Waiting for interface %s to report it is running ...", self) - done, _ = await asyncio.wait((wait_running, run_task), return_when=asyncio.FIRST_COMPLETED) - if wait_running not in done: - wait_running.cancel() - if run_task not in done: # This should not happen (without timeout) - await self._disconnect() - await run_task - raise RuntimeError("Run task stopped before _running has been set") - - # Subscribe - logger.debug("Starting _subscribe task for interface %s ...", self) - subscribe_task = asyncio.create_task(self._subscribe()) - # TODO timeout - done, _ = await asyncio.wait((subscribe_task, run_task), return_when=asyncio.FIRST_COMPLETED) - if subscribe_task not in done: - if run_task not in done: # This should not happen (without timeout) - await self._disconnect() - await run_task - raise RuntimeError("Run task stopped before _subscribe task finished") - subscribe_exception = subscribe_task.exception() - if subscribe_exception is not None: - await self._disconnect() - try: - await run_task - except Exception as e: - logger.debug("Ignoring Exception %s in run task of interface %s, during shutdown due to " - "exception in _subscribe task", repr(e), self) - raise subscribe_exception + await self.__do_connect() + + run_task = await self.__start_run_task() + + await self.__do_subscribe(run_task) logger.debug("Starting up interface %s completed", self) self._status_connector.update_status(status=ServiceStatus.OK, message="") @@ -299,44 +261,12 @@ async def _supervise(self) -> None: except Exception as e: exception = e pass - finally: - wait_stopping.cancel() self._running.clear() - # If we have not been started successfully yet, report startup as finished (if failsafe) or report startup - # error and quit - if not self._started.done(): - if self.failsafe_start: - self._started.set_result(None) - else: - logger.debug("Startup of interface %s has not been finished due to exception", self) - self._started.set_exception(exception if exception is not None else asyncio.CancelledError()) - await self._disconnect() - return - - # Return if we are stopping - if self._stopping.is_set(): - if exception: - logger.debug("Ignoring exception %s in interface %s while stopping", repr(exception), self) - return - - # Shut down SHC if no auto_reconnect shall be attempted - if not self.auto_reconnect: - if exception: - logger.critical("Error in interface %s:", exc_info=exception) - else: - logger.critical("Unexpected shutdown of interface %s", self) - asyncio.create_task(interface_failure(repr(self))) + lets_stop = await self.__handle_exception(exception) + if lets_stop: return - if exception: - logger.error("Error in interface %s. Attempting reconnect ...", self, exc_info=exception) - self._status_connector.update_status(status=ServiceStatus.CRITICAL, message=str(exception)) - else: - logger.error("Unexpected shutdown of interface %s. Attempting reconnect ...", self) - self._status_connector.update_status(status=ServiceStatus.CRITICAL, - message="Unexpected shutdown of interface") - # Sleep before reconnect logger.info("Waiting %s seconds before reconnect of interface %s ...", sleep_interval, self) wait_stopping = asyncio.create_task(self._stopping.wait()) @@ -348,3 +278,133 @@ async def _supervise(self) -> None: wait_stopping.cancel() sleep_interval *= self.backoff_exponent logger.info("Attempting reconnect of interface %s ...", self) + + async def __do_connect(self) -> None: + """ + 1st sub-step of _supervise(): Execute :meth:`_connect` and await its completion, unless self._stopping is set in + the meantime + + :raises: Any exception that is raised in _connect() (including `CancelledError`, when _stopping was set) + """ + wait_stopping = asyncio.create_task(self._stopping.wait()) + try: + logger.debug("Running _connect for interface %s ...", self) + connect_task = asyncio.create_task(self._connect()) + # TODO timeout + done, _ = await asyncio.wait((connect_task, wait_stopping), return_when=asyncio.FIRST_COMPLETED) + if connect_task not in done: + logger.debug("Interface %s stopped before _connect finished", self) + connect_task.cancel() + connect_task.result() # raise exception if any + finally: + wait_stopping.cancel() + + async def __start_run_task(self) -> asyncio.Task: + """ + 2nd sub-step of _supervise(): Start :meth:`_run` in a new Task and wait for self._running to be set + + :return: The new Task in which `_run()` is executed + :raises RuntimeError: when `_run()` exits unexpectedly (before `_running` is set) + """ + logger.debug("Starting _run task for interface %s ...", self) + run_task = asyncio.create_task(self._run()) + wait_running = asyncio.create_task(self._running.wait()) + # TODO timeout + logger.debug("Waiting for interface %s to report it is running ...", self) + done, _ = await asyncio.wait((wait_running, run_task), return_when=asyncio.FIRST_COMPLETED) + if wait_running not in done: + wait_running.cancel() + if run_task not in done: # This should not happen (without timeout) + await self._disconnect() + await run_task + # TODO report exception from run_task if any. + raise RuntimeError("Run task stopped before _running has been set") + return run_task + + async def __do_subscribe(self, run_task: asyncio.Task) -> None: + """ + 3rd sub-step of _supervise(): Execute :meth:`_subscribe` and await its completion + + In case of failure :meth:`_disconnect` is called and an exception is raised. In case of premature exit of the + `_run()` task, an exception is raised, as well. + + :param run_task: The Task in which :meth:`_run` is executed; used to supervise that it's running smoothly (not + returning or raising an exception) while waiting for `_subscribe()` to complete + :raises RuntimeError: when `_run()` exits unexpectedly (before `_running` is set) + """ + logger.debug("Starting _subscribe task for interface %s ...", self) + subscribe_task = asyncio.create_task(self._subscribe()) + # TODO timeout + done, _ = await asyncio.wait((subscribe_task, run_task), return_when=asyncio.FIRST_COMPLETED) + if subscribe_task not in done: + if run_task not in done: # This should not happen (without timeout) + await self._disconnect() + await run_task + # TODO cancel subscribe_task + # TODO report exception from run_task if any. + raise RuntimeError("Run task stopped before _subscribe task finished") + subscribe_exception = subscribe_task.exception() + if subscribe_exception is not None: + await self._disconnect() + try: + await run_task + except Exception as e: + logger.debug("Ignoring Exception %s in run task of interface %s, during shutdown due to " + "exception in _subscribe task", repr(e), self) + raise subscribe_exception + + async def __handle_exception(self, exception: Optional[Exception]) -> bool: + """ + Error-handling stage of _supervise(): Handle any exception that was raised during startup or operation + + - If `_started` is not yet been set (i.e. :meth:`start` is still awaiting startup): + - If not `failsafe_start`: + Resolve `_started` with an exception to make :meth:`start` raise the exception + - else: + Resolve `_started` without exception (to let `start()` return), log the exception and try a reconnect + asynchronously + - If `_stopping` is set: Short log of the exception (if any) and exit + - If not `auto_reconnect`: Log the exception, call :fun:`shc.supervisor.interface_failure` to terminate the SHC + application and exit + - Otherwise: Log the exception and try a reconnect. + + This method also takes care of updating the `_status_connector` when an error occurs. It should be updated + again, after successful reconnect, somewhere else. + + :param exception: The exception that was raised during starup or by the `_run()` method or None + :return: True if the reconnect_loop should be exited, False, if a reconnect should be tried + """ + # If we have not been started successfully yet, report startup as finished (if failsafe) or report startup + # error and quit + if not self._started.done(): + if self.failsafe_start: + self._started.set_result(None) + else: + logger.debug("Startup of interface %s has not been finished due to exception", self) + self._started.set_exception(exception if exception is not None else asyncio.CancelledError()) + await self._disconnect() + return True + + # Return if we are stopping + if self._stopping.is_set(): + if exception: + logger.debug("Ignoring exception %s in interface %s while stopping", repr(exception), self) + return True + + # Shut down SHC if no auto_reconnect shall be attempted + if not self.auto_reconnect: + if exception: + logger.critical("Error in interface %s:", exc_info=exception) + else: + logger.critical("Unexpected shutdown of interface %s", self) + asyncio.create_task(interface_failure(repr(self))) + return True + + if exception: + logger.error("Error in interface %s. Attempting reconnect ...", self, exc_info=exception) + self._status_connector.update_status(status=ServiceStatus.CRITICAL, message=str(exception)) + else: + logger.error("Unexpected shutdown of interface %s. Attempting reconnect ...", self) + self._status_connector.update_status(status=ServiceStatus.CRITICAL, + message="Unexpected shutdown of interface") + return False From 04a14d98de4b7307aad1bb4ec6039b13172d9c68 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sun, 7 May 2023 14:16:09 +0200 Subject: [PATCH 08/11] interfaces._helper: Improve reporting of exceptions from _run() method during startup --- shc/interfaces/_helper.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/shc/interfaces/_helper.py b/shc/interfaces/_helper.py index e3e82042..7f0d6ea4 100644 --- a/shc/interfaces/_helper.py +++ b/shc/interfaces/_helper.py @@ -305,6 +305,7 @@ async def __start_run_task(self) -> asyncio.Task: :return: The new Task in which `_run()` is executed :raises RuntimeError: when `_run()` exits unexpectedly (before `_running` is set) + :raises: Any exception that was raised by `_run()` while waiting for _running to be set """ logger.debug("Starting _run task for interface %s ...", self) run_task = asyncio.create_task(self._run()) @@ -317,7 +318,8 @@ async def __start_run_task(self) -> asyncio.Task: if run_task not in done: # This should not happen (without timeout) await self._disconnect() await run_task - # TODO report exception from run_task if any. + # Raise any exception from run task if any + run_task.result() raise RuntimeError("Run task stopped before _running has been set") return run_task @@ -330,19 +332,23 @@ async def __do_subscribe(self, run_task: asyncio.Task) -> None: :param run_task: The Task in which :meth:`_run` is executed; used to supervise that it's running smoothly (not returning or raising an exception) while waiting for `_subscribe()` to complete - :raises RuntimeError: when `_run()` exits unexpectedly (before `_running` is set) + :raises RuntimeError: when `_run()` exits unexpectedly or raises an exception (before `_running` is set) """ logger.debug("Starting _subscribe task for interface %s ...", self) subscribe_task = asyncio.create_task(self._subscribe()) # TODO timeout done, _ = await asyncio.wait((subscribe_task, run_task), return_when=asyncio.FIRST_COMPLETED) if subscribe_task not in done: + subscribe_task.cancel() if run_task not in done: # This should not happen (without timeout) await self._disconnect() await run_task - # TODO cancel subscribe_task - # TODO report exception from run_task if any. - raise RuntimeError("Run task stopped before _subscribe task finished") + run_exception = run_task.exception() + if run_exception: + raise RuntimeError("Run task raised an exception while awaiting _subscribe: %s", repr(run_exception)) \ + from run_exception + else: + raise RuntimeError("Run task stopped before _subscribe task finished") subscribe_exception = subscribe_task.exception() if subscribe_exception is not None: await self._disconnect() From bec1f0bc59f7573712a1cdb9a3bd67efed1e3493 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sun, 7 May 2023 16:04:02 +0200 Subject: [PATCH 09/11] pulse: Split up _dispatch_pulse_event() to reduce complexity --- shc/interfaces/pulse.py | 109 +++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 47 deletions(-) diff --git a/shc/interfaces/pulse.py b/shc/interfaces/pulse.py index 72cf70a3..2c42866a 100644 --- a/shc/interfaces/pulse.py +++ b/shc/interfaces/pulse.py @@ -291,31 +291,15 @@ async def _dispatch_pulse_event(self, event: "PulseEventInfo") -> None: if event.t is PulseEventTypeEnum.new: if event.facility is PulseEventFacilityEnum.sink: - data = await self.pulse.sink_info(event.index) - name = data.name - for connector in self.sink_connectors_by_name.get(name, []): - connector.change_id(event.index) - self.sink_connectors_by_id[event.index].append(connector) - connector.on_change(data, []) + await self.__on_event_new_sink(event.index) elif event.facility is PulseEventFacilityEnum.source: - data = await self.pulse.source_info(event.index) - name = data.name - for source_connector in self.source_connectors_by_name.get(name, []): - source_connector.change_id(event.index) - self.source_connectors_by_id[event.index].append(source_connector) - source_connector.on_change(data, []) + await self.__on_event_new_source(event.index) elif event.t is PulseEventTypeEnum.remove: if event.facility is PulseEventFacilityEnum.sink: - for connector in self.sink_connectors_by_id.get(event.index, []): - connector.change_id(None) - if event.index in self.sink_connectors_by_id: - del self.sink_connectors_by_id[event.index] + await self.__on_event_remove_sink(event.index) elif event.facility is PulseEventFacilityEnum.source: - for source_connector in self.source_connectors_by_id.get(event.index, []): - source_connector.change_id(None) - if event.index in self.source_connectors_by_id: - del self.source_connectors_by_id[event.index] + await self.__on_event_remove_source(event.index) elif event.t is PulseEventTypeEnum.change: # Check if the event has probably been caused by a value update from SHC. In this case, we should have the @@ -336,33 +320,64 @@ async def _dispatch_pulse_event(self, event: "PulseEventInfo") -> None: source_connector.on_change(data, origin) elif event.facility is PulseEventFacilityEnum.server: - # For server change events, we need to update our default_*_name connectors and possibly change the - # current_id of all the default_sink/source_* connectors (and update them with the current state of the - # new default sink/source) - server_info = await self.pulse.server_info() - self._default_sink_name_connector._update(server_info, origin) - self._default_source_name_connector._update(server_info, origin) - # Update default sink/source connectors - default_sink_data = await self.pulse.get_sink_by_name(server_info.default_sink_name) - default_source_data = await self.pulse.get_source_by_name(server_info.default_source_name) - for connector in self.sink_connectors_by_name.get(None, []): - if connector.current_id is not None: - try: - self.sink_connectors_by_id[connector.current_id].remove(connector) - except ValueError: - pass - connector.change_id(default_sink_data.index) - self.sink_connectors_by_id[default_sink_data.index].append(connector) - connector.on_change(default_sink_data, []) # should we set the original origin here? - for source_connector in self.source_connectors_by_name.get(None, []): - if source_connector.current_id is not None: - try: - self.source_connectors_by_id[source_connector.current_id].remove(source_connector) - except ValueError: - pass - source_connector.change_id(default_source_data.index) - self.source_connectors_by_id[default_source_data.index].append(source_connector) - source_connector.on_change(default_source_data, []) # should we set the original origin here? + await self.__on_event_server_change(origin) + + async def __on_event_new_sink(self, sink_index: int) -> None: + data = await self.pulse.sink_info(sink_index) + name = data.name + for connector in self.sink_connectors_by_name.get(name, []): + connector.change_id(sink_index) + self.sink_connectors_by_id[sink_index].append(connector) + connector.on_change(data, []) + + async def __on_event_new_source(self, source_index: int) -> None: + data = await self.pulse.source_info(source_index) + name = data.name + for source_connector in self.source_connectors_by_name.get(name, []): + source_connector.change_id(source_index) + self.source_connectors_by_id[source_index].append(source_connector) + source_connector.on_change(data, []) + + async def __on_event_remove_sink(self, sink_index: int) -> None: + for connector in self.sink_connectors_by_id.get(sink_index, []): + connector.change_id(None) + if sink_index in self.sink_connectors_by_id: + del self.sink_connectors_by_id[sink_index] + + async def __on_event_remove_source(self, source_index: int) -> None: + for source_connector in self.source_connectors_by_id.get(source_index, []): + source_connector.change_id(None) + if source_index in self.source_connectors_by_id: + del self.source_connectors_by_id[source_index] + + async def __on_event_server_change(self, origin: List[Any]) -> None: + # For server change events, we need to update our default_*_name connectors and possibly change the + # current_id of all the default_sink/source_* connectors (and update them with the current state of the + # new default sink/source) + server_info = await self.pulse.server_info() + self._default_sink_name_connector._update(server_info, origin) + self._default_source_name_connector._update(server_info, origin) + # Update default sink/source connectors + default_sink_data = await self.pulse.get_sink_by_name(server_info.default_sink_name) + default_source_data = await self.pulse.get_source_by_name(server_info.default_source_name) + for connector in self.sink_connectors_by_name.get(None, []): + if connector.current_id is not None: + try: + self.sink_connectors_by_id[connector.current_id].remove(connector) + except ValueError: + pass + connector.change_id(default_sink_data.index) + self.sink_connectors_by_id[default_sink_data.index].append(connector) + connector.on_change(default_sink_data, []) # should we set the original origin here? + for source_connector in self.source_connectors_by_name.get(None, []): + if source_connector.current_id is not None: + try: + self.source_connectors_by_id[source_connector.current_id].remove(source_connector) + except ValueError: + pass + source_connector.change_id(default_source_data.index) + self.source_connectors_by_id[default_source_data.index].append(source_connector) + source_connector.on_change(default_source_data, []) # should we set the original origin here? def _register_origin_callback(self, facility: str, index: int, origin: List[Any]) -> None: # Avoid infinite growing of event_origin dict's lists, by adding origins of events that will never be From 65d5f24d99a5d5b61c6fde66caee719427561192 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sun, 7 May 2023 19:06:43 +0200 Subject: [PATCH 10/11] web: Split up _api_websocket_dispatch() to reduce complexity --- shc/web/interface.py | 137 +++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 71 deletions(-) diff --git a/shc/web/interface.py b/shc/web/interface.py index 44ea0c22..5a880b76 100644 --- a/shc/web/interface.py +++ b/shc/web/interface.py @@ -353,7 +353,7 @@ async def _api_websocket_handler(self, request: aiohttp.web.Request) -> aiohttp. if msg.type == aiohttp.WSMsgType.TEXT: # It does not make sense to avoid the task creation here, since we would have to create a task in # any branch of the _api_websocket_dispatch() to asynchronously do writing to websockets then. - asyncio.create_task(self._api_websocket_dispatch(request, ws, msg)) + asyncio.create_task(self._api_websocket_dispatch_message(request, ws, msg)) elif msg.type == aiohttp.WSMsgType.ERROR: logger.info('API websocket connection closed with exception %s', ws.exception()) finally: @@ -366,8 +366,8 @@ async def _api_websocket_handler(self, request: aiohttp.web.Request) -> aiohttp. await obj.http_post(value, ws) return ws - async def _api_websocket_dispatch(self, request: aiohttp.web.Request, ws: aiohttp.web.WebSocketResponse, - msg: aiohttp.WSMessage) -> None: + async def _api_websocket_dispatch_message(self, request: aiohttp.web.Request, ws: aiohttp.web.WebSocketResponse, + msg: aiohttp.WSMessage) -> None: try: message = msg.json() except JSONDecodeError: @@ -398,68 +398,19 @@ async def _api_websocket_dispatch(self, request: aiohttp.web.Request, ws: aiohtt await ws.send_json(result) return - try: - # subscribe action - if action == "subscribe": - logger.debug("got websocket subscribe request for API object %s from %s", name, request.remote) - await obj.websocket_subscribe(ws, handle) - return - - # post action - elif action == "post": - value_exists = False - try: - value = message["value"] - value_exists = True - except KeyError: - result['status'] = 422 - result['error'] = "message does not include a 'value' field" - logger.warning("Websocket API POST message from %s without 'value' field: %s", request.remote, - message) - if value_exists: - logger.debug("got post request for API object %s via websocket from %s with value %s", - name, request.remote, value) - try: - await obj.http_post(value, ws) - except (ValueError, TypeError) as e: - logger.warning("Error while updating API object %s with value via websocket from %s (error was " - "%s): %s", name, request.remote, e, value) - result['status'] = 422 - result['error'] = "Could not use provided value to update API object: {}".format(e) - - # lastwill action - elif action == "lastwill": - value_exists = False - try: - value = message["value"] - value_exists = True - except KeyError: - result['status'] = 422 - result['error'] = "message does not include a 'value' field" - logger.warning("Websocket API LASTWILL message from %s without 'value' field: %s", request.remote, - message) - if value_exists: - logger.debug("got LASTWILL request for API object %s via websocket from %s with value %s", - name, request.remote, value) - try: - self._api_ws_last_will[ws] = (obj, obj._check_last_will(value)) - except (ValueError, TypeError) as e: - logger.warning("Error while setting last will of websocket client %s for API object %s (error" - "was %s): %s", name, request.remote, e, value) - result['status'] = 422 - result['error'] = "Could not use provided value to set last will: {}".format(e) - - # get action - elif action == "get": - logger.debug("got get request for API object %s via websocket from %s", name, request.remote) - value = (await obj.http_get())[1] - result['status'] = 200 if value is not None else 409 - result['value'] = value - - else: - logger.warning("Unknown websocket API action '%s', requested by %s", action, request.remote) + value = None + # 'post' and 'lastwill' actions require a 'value' field in the message + if action in ("post", "lastwill"): + try: + value = message["value"] + except KeyError: result['status'] = 422 - result['error'] = "Not a valid action: '{}'".format(action) + result['error'] = "message does not include a 'value' field" + logger.warning("Websocket API %s message from %s without 'value' field: %s", request.remote, action, + message) + + try: + result = await self.__api_websocket_handle_request(ws, action, handle, obj, request.remote or "?", value) except Exception as e: logger.error("Error while processing API websocket message from %s: %s", request.remote, message, exc_info=e) @@ -470,6 +421,54 @@ async def _api_websocket_dispatch(self, request: aiohttp.web.Request, ws: aiohtt logger.debug("Sending websocket response: %s", result) await ws.send_str(json.dumps(result, cls=SHCJsonEncoder)) + async def __api_websocket_handle_request(self, ws: aiohttp.web.WebSocketResponse, action: str, handle: Any, + api_object: "WebApiObject", client_ip: str, value: Any) -> Any: + result = {'status': 204, + 'name': api_object.name, + 'action': action, + 'handle': handle} + + if action == "get": + logger.debug("got get request for API object %s via websocket from %s", api_object.name, client_ip) + value = (await api_object.http_get())[1] + result['status'] = 200 if value is not None else 409 + result['value'] = value + + elif action == "subscribe": + logger.debug("got websocket subscribe request for API object %s from %s", api_object.name, client_ip) + value = await api_object.websocket_subscribe(ws) + if value is not None: + result['status'] = 200 + result['value'] = value + + elif action == "post": + logger.debug("got post request for API object %s vi a websocket from %s with value %s", + api_object.name, client_ip, value) + try: + await api_object.http_post(value, ws) + except (ValueError, TypeError) as e: + logger.warning("Error while updating API object %s with value via websocket from %s (error was " + "%s): %s", api_object.name, client_ip, e, value) + result['status'] = 422 + result['error'] = "Could not use provided value to update API object: {}".format(e) + + elif action == "lastwill": + logger.debug("got LASTWILL request for API object %s via websocket from %s with value %s", + api_object.name, client_ip, value) + try: + self._api_ws_last_will[ws] = (api_object, api_object._check_last_will(value)) + except (ValueError, TypeError) as e: + logger.warning("Error while setting last will of websocket client %s for API object %s (error" + "was %s): %s", api_object.name, client_ip, e, value) + result['status'] = 422 + result['error'] = "Could not use provided value to set last will: {}".format(e) + + else: + logger.warning("Unknown websocket API action '%s', requested by %s", action, client_ip) + result['status'] = 422 + result['error'] = "Not a valid action: '{}'".format(action) + return result + async def _api_get_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: try: api_object = self._api_objects[request.match_info['name']] @@ -983,20 +982,16 @@ async def _publish_http(self, value: T, skip_websocket: Optional[object] = None) await asyncio.gather(*(ws.send_str(data) for ws in self.subscribed_websockets if ws is not skip_websocket)) - async def websocket_subscribe(self, ws: aiohttp.web.WebSocketResponse, handle: Any) -> None: + async def websocket_subscribe(self, ws: aiohttp.web.WebSocketResponse) -> Optional[T]: self.subscribed_websockets.add(ws) current_value = await self._from_provider() - if current_value is not None: - data = {'status': 200, 'action': 'subscribe', 'name': self.name, 'value': current_value, 'handle': handle} - else: - data = {'status': 204, 'action': 'subscribe', 'name': self.name, 'handle': handle} - await ws.send_str(json.dumps(data, cls=SHCJsonEncoder)) + return current_value def websocket_close(self, ws: aiohttp.web.WebSocketResponse) -> None: self.subscribed_websockets.discard(ws) async def http_get(self, wait: bool = False, timeout: float = 30, etag_match: Optional[str] = None - ) -> Tuple[bool, Any, str]: + ) -> Tuple[bool, Optional[T], str]: """ Get the current value or await a new value. From 7cc682b6266239a3678d8d3b372559daafb387f0 Mon Sep 17 00:00:00 2001 From: Michael Thies Date: Sun, 7 May 2023 20:07:24 +0200 Subject: [PATCH 11/11] log: Ignore high complexity of retrieve_aggregated_log() for now This method will be by the ongoing rework of the data logging code, so let's avoid serious merge conflicts here. --- shc/log/generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shc/log/generic.py b/shc/log/generic.py index cf20827f..67404379 100644 --- a/shc/log/generic.py +++ b/shc/log/generic.py @@ -66,7 +66,7 @@ async def retrieve_log(self, start_time: datetime.datetime, end_time: datetime.d """ pass - async def retrieve_aggregated_log(self, start_time: datetime.datetime, end_time: datetime.datetime, + async def retrieve_aggregated_log(self, start_time: datetime.datetime, end_time: datetime.datetime, # noqa: C901 aggregation_method: AggregationMethod, aggregation_interval: datetime.timedelta ) -> List[Tuple[datetime.datetime, float]]: data = await self.retrieve_log(start_time, end_time, include_previous=True)