diff --git a/README.md b/README.md index e1b345d..1fd390f 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Asynchronous Python library that listens to Crownstone SSE events. ## Requirements * Python 3.7 or higher -* Aiohttp 3.6.2 +* Aiohttp 3.6.1 ## Standard installation cd to the project folder and run: @@ -39,13 +39,20 @@ from crownstone_sse.client import CrownstoneSSE from crownstone_sse.events.SwitchStateUpdateEvent import SwitchStateUpdateEvent from crownstone_sse.events.SystemEvent import SystemEvent from crownstone_sse.events.PresenceEvent import PresenceEvent +from crownstone_sse.events.AbilityChangeEvent import AbilityChangeEvent from crownstone_sse.const import ( EVENT_SYSTEM_STREAM_START, EVENT_SWITCH_STATE_UPDATE, EVENT_PRESENCE_ENTER_LOCATION, + EVENT_ABILITY_CHANGE_DIMMING, ) +import logging import time +# enable logging +logging.basicConfig(format='%(levelname)s :%(message)s', level=logging.DEBUG) + + def crownstone_update(event: SwitchStateUpdateEvent): print("Crownstone {} state changed to {}".format(event.cloud_id, event.switch_state)) @@ -58,6 +65,10 @@ def notify_presence_changed(event: PresenceEvent): print("User {} has entered location {}".format(event.user_id, event.location_id)) +def notify_ability_changed(event: AbilityChangeEvent): + print("Ability {} has been {}".format(event.ability_type, event.ability_enabled)) + + # Create a sse client instance. Pass your crownstone account information. # email and password are required for logging in again when an access token has expired. sse_client = CrownstoneSSE('email', 'password') @@ -71,10 +82,10 @@ sse_client.start() sse_client.add_event_listener(EVENT_SYSTEM_STREAM_START, notify_stream_start) sse_client.add_event_listener(EVENT_SWITCH_STATE_UPDATE, crownstone_update) sse_client.add_event_listener(EVENT_PRESENCE_ENTER_LOCATION, notify_presence_changed) +sse_client.add_event_listener(EVENT_ABILITY_CHANGE_DIMMING, notify_ability_changed) -# let the client run for 20 seconds (block) -time.sleep(20) - +# block for 120 seconds (let the client run for 120 second before stopping) +time.sleep(120) # stop the client sse_client.stop() ``` @@ -95,12 +106,13 @@ def callback(event: PresenceEvent): ``` ## Event types -Currently, there are 5 different event types: +Currently, there are 6 different event types: * System event * Command event * Data change event * Presence event * Switch state update event +* Ability change event ### System event A system event is represented as: @@ -143,6 +155,18 @@ A switch state update event is represented as: * unique_id * switch_state +### Ability change event +An ability change event is represented as: +#### Sphere +* sphere_id +#### Crownstone +* cloud_id +* unique_id +#### Ability +* ability_type +* ability_enabled +* ability_synced_to_crownstone + ## Testing To run the tests using tox install tox first by running: ```console diff --git a/crownstone_sse/__init__.py b/crownstone_sse/__init__.py index abed033..3ea0560 100644 --- a/crownstone_sse/__init__.py +++ b/crownstone_sse/__init__.py @@ -2,4 +2,4 @@ from crownstone_sse.client import CrownstoneSSE from crownstone_sse.util.eventbus import EventBus -__version__ = '1.1.2' +__version__ = '1.2.0' diff --git a/crownstone_sse/client.py b/crownstone_sse/client.py index dbb5d06..f074a7a 100644 --- a/crownstone_sse/client.py +++ b/crownstone_sse/client.py @@ -2,6 +2,7 @@ import logging import json import hashlib +import time from threading import Thread from aiohttp import ( ClientSession, @@ -10,27 +11,39 @@ ) from crownstone_sse.util.eventbus import EventBus from crownstone_sse.const import ( - EVENT_CLIENT_STOP, - EVENT_BASE_URL, - LOGIN_URL, + EVENT_BASE_URL, LOGIN_URL, RECONNECTION_TIME, + CONNECTION_TIMEOUT, + EVENT_SYSTEM, + EVENT_COMMAND, + EVENT_PRESENCE, + EVENT_DATA_CHANGE, + OPERATION, + EVENT_SWITCH_STATE_UPDATE, + EVENT_ABILITY_CHANGE, + EVENT_CLIENT_STOP, EVENT_SYSTEM_TOKEN_EXPIRED, EVENT_SYSTEM_NO_CONNECTION, - EVENT_SWITCH_STATE_UPDATE, system_events, presence_events, command_events, data_change_events, - operations + ability_change_events, + operations, + TYPE, SUBTYPE, ID, ERROR, CODE, DATA, UTF8, PING, + RUNNING, NOT_RUNNING, STOPPING, + LOGIN_FAILED, LOGIN_FAILED_EMAIL_NOT_VERIFIED ) from crownstone_sse.events.SystemEvent import SystemEvent from crownstone_sse.events.CommandEvent import CommandEvent from crownstone_sse.events.DataChangeEvent import DataChangeEvent from crownstone_sse.events.PresenceEvent import PresenceEvent from crownstone_sse.events.SwitchStateUpdateEvent import SwitchStateUpdateEvent +from crownstone_sse.events.AbilityChangeEvent import AbilityChangeEvent from crownstone_sse.exceptions import ( sse_exception_handler, CrownstoneSseException, + CrownstoneConnectionTimeout, ConnectError, AuthError, ) @@ -48,7 +61,8 @@ def __init__(self, email: str, password: str) -> None: self.loop.set_exception_handler(sse_exception_handler) self.websession: ClientSession = ClientSession(loop=self.loop, read_timeout=None) self.event_bus: EventBus = EventBus() - self.state = "not_running" + self.state = NOT_RUNNING + self.available = False self.stop_event: Optional[asyncio.Event] = None # Instance information self.access_token: Optional[str] = None @@ -57,6 +71,11 @@ def __init__(self, email: str, password: str) -> None: # Initialize thread super().__init__(target=self.run) + @property + def is_available(self) -> bool: + """Return if Crownstone SSE is available""" + return self.available + def run(self): """Start the SSE client""" try: @@ -67,8 +86,8 @@ def run(self): async def async_start(self) -> None: """start the SSE client in current OS thread.""" - if self.state != "not_running": - _LOGGER.warning("Crownstone SSE client is already running") + if self.state != NOT_RUNNING: + _LOGGER.debug("Crownstone SSE client is already running") if self.access_token is None: if not self.email or not self.password: @@ -80,6 +99,7 @@ async def async_start(self) -> None: self.stop_event = asyncio.Event() # Connect to the event server & start streaming + self.state = RUNNING await self.connect() def set_access_token(self, access_token: str): @@ -87,7 +107,7 @@ def set_access_token(self, access_token: str): async def login(self) -> None: """Login to Crownstone using email and password""" - shasum = hashlib.sha1(self.password.encode('utf-8')) + shasum = hashlib.sha1(self.password.encode(UTF8)) hashed_password = shasum.hexdigest() # Create JSON object with login credentials @@ -100,20 +120,20 @@ async def login(self) -> None: async with self.websession.post(LOGIN_URL, data=data) as result: data = await result.json() if result.status == 200: - self.access_token = data['id'] + self.access_token = data[ID] elif result.status == 401: - if 'error' in data: - error = data['error'] - if error['code'] == 'LOGIN_FAILED': + if ERROR in data: + error = data[ERROR] + if error[CODE] == LOGIN_FAILED: raise CrownstoneSseException(AuthError.AUTHENTICATION_ERROR, "Wrong email/password") - elif error['code'] == 'LOGIN_FAILED_EMAIL_NOT_VERIFIED': + elif error[CODE] == LOGIN_FAILED_EMAIL_NOT_VERIFIED: raise CrownstoneSseException(AuthError.EMAIL_NOT_VERIFIED, "Email not verified") else: raise CrownstoneSseException(AuthError.UNKNOWN_ERROR, "Unknown error occurred") except ClientConnectionError: raise CrownstoneSseException(ConnectError.CONNECTION_FAILED_NO_INTERNET, "No internet connection") - _LOGGER.info("Login successful") + _LOGGER.debug("Login successful") async def connect(self): """ @@ -124,40 +144,49 @@ async def connect(self): response.raise_for_status() await self.stream(response) except ClientConnectionError: - _LOGGER.warning('Internet connection lost. Reconnection in {} seconds'.format(RECONNECTION_TIME)) + _LOGGER.debug('Internet connection lost. Reconnection in {} seconds'.format(RECONNECTION_TIME)) await asyncio.sleep(RECONNECTION_TIME) + if self.state == NOT_RUNNING: + return await self.connect() async def stream(self, stream_response): """Start streaming""" # aiohttp StreamReader instance stream_reader = stream_response.content - # client is now running, and can be stopped - self.state = "running" + # client is now available for receiving events + self.available = True _LOGGER.info("Crownstone SSE Client has started.") try: + # start params line_in_bytes = b'' + start_time = time.perf_counter() while stream_response.status != 204: # no data # read the buffer of the stream chunk = stream_reader.read_nowait() for line in chunk.splitlines(True): line_in_bytes += line if line_in_bytes.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')): - line = line_in_bytes.decode('utf8') # string + line = line_in_bytes.decode(UTF8) # string line = line.rstrip('\n').rstrip('\r') # remove returns - if line.startswith('data:'): - line = line.lstrip('data:') + if line.startswith(PING): + # connection alive received, update start time + start_time = time.perf_counter() + + if line.startswith(DATA): + line = line.lstrip(DATA) data = json.loads(line) # type dict # check for access token expiration and login + restart client # no need to fire event for this first - if data['type'] == 'system' and data['subType'] == EVENT_SYSTEM_TOKEN_EXPIRED: + if data[TYPE] == EVENT_SYSTEM and data[SUBTYPE] == EVENT_SYSTEM_TOKEN_EXPIRED: await self.refresh_token() # check for no connection between the sse server and the crownstone cloud - # simply try to reconnect - if data['type'] == 'system' and data['subType'] == EVENT_SYSTEM_NO_CONNECTION: - await self.connect() + # log this issue + if data[TYPE] == EVENT_SYSTEM and data[SUBTYPE] == EVENT_SYSTEM_NO_CONNECTION: + _LOGGER.warning("No connection to Crownstone cloud, waiting for server to come back " + "online") # handle firing of events self.fire_events(data) @@ -168,49 +197,68 @@ async def stream(self, stream_response): if self.stop_event.is_set(): break + # a ping is sent every 30 seconds to notify the connection is alive + # if a ping event has not been sent after 40 seconds (10 second margin), raise connection error + if time.perf_counter() - start_time > CONNECTION_TIMEOUT: + raise CrownstoneConnectionTimeout(ConnectError.CONNECTION_TIMEOUT, "Connection to server timed out") + # let buffer fill itself with data await asyncio.sleep(0.05) + except CrownstoneConnectionTimeout: + # Internet connection was lost, try to reconnect + _LOGGER.warning("Connection to server timed out, trying to reconnect...") + self.available = False + await self.connect() + except ClientPayloadError: # Internet connection was lost, payload uncompleted. try to reconnect - # .connect() will handle further reconnection + # This exception only occurred on Windows. + self.available = False await self.connect() + except KeyboardInterrupt: # Ctrl + C pressed or other command that causes interrupt await self.async_stop() def fire_events(self, data) -> None: """Fire event based on the data""" - if data['type'] == 'system': + if data[TYPE] == EVENT_SYSTEM: for system_event in system_events: - if data['subType'] == system_event: + if data[SUBTYPE] == system_event: event = SystemEvent(data) self.event_bus.fire(system_event, event) - if data['type'] == 'command': + if data[TYPE] == EVENT_COMMAND: for command_event in command_events: - if data['subType'] == command_event: + if data[SUBTYPE] == command_event: event = CommandEvent(data) self.event_bus.fire(command_event, event) - if data['type'] == EVENT_SWITCH_STATE_UPDATE: + if data[TYPE] == EVENT_SWITCH_STATE_UPDATE: event = SwitchStateUpdateEvent(data) self.event_bus.fire(EVENT_SWITCH_STATE_UPDATE, event) - if data['type'] == 'dataChange': + if data[TYPE] == EVENT_DATA_CHANGE: for data_change_event in data_change_events: - if data['subType'] == data_change_event: + if data[SUBTYPE] == data_change_event: for operation in operations: - if data['operation'] == operation: + if data[OPERATION] == operation: event = DataChangeEvent(data, data_change_event, operation) self.event_bus.fire(data_change_event, event) - if data['type'] == 'presence': + if data[TYPE] == EVENT_PRESENCE: for presence_event in presence_events: - if data['subType'] == presence_event: + if data[SUBTYPE] == presence_event: event = PresenceEvent(data, presence_event) self.event_bus.fire(presence_event, event) + if data[TYPE] == EVENT_ABILITY_CHANGE: + for ability_change_event in ability_change_events: + if data[SUBTYPE] == ability_change_event: + event = AbilityChangeEvent(data, ability_change_event) + self.event_bus.fire(ability_change_event, event) + def add_event_listener(self, event_type, callback): self.event_bus.add_event_listener(event_type=event_type, event_listener=callback) @@ -222,25 +270,26 @@ async def refresh_token(self) -> None: def stop(self) -> None: """Stop the Crownstone SSE client from an other thread.""" # ignore if not running - if self.state == 'not_running': + if self.state == NOT_RUNNING: return - self.loop.create_task(self.async_stop()) + asyncio.run_coroutine_threadsafe(self.async_stop(), self.loop) async def async_stop(self) -> None: """Stop Crownstone SSE client from within the loop.""" # ignore if not running or already stopping - if self.state == "not_running": + if self.state == NOT_RUNNING: return - if self.state == "stopping": - _LOGGER.warning("stop already called") + if self.state == STOPPING: + _LOGGER.debug("stop already called") return - self.state = "stopping" + self.state = STOPPING + self.available = False self.event_bus.fire(EVENT_CLIENT_STOP) # for callback # Close the ClientSession await self.websession.close() - self.state = "not_running" + self.state = NOT_RUNNING self.stop_event.set() _LOGGER.info("Crownstone SSE client stopped.") diff --git a/crownstone_sse/const.py b/crownstone_sse/const.py index 44286ca..86332fe 100644 --- a/crownstone_sse/const.py +++ b/crownstone_sse/const.py @@ -7,6 +7,7 @@ # SSE client EVENT_CLIENT_STOP = "client_stop" RECONNECTION_TIME = 30 +CONNECTION_TIMEOUT = 40 # SSE System events EVENT_SYSTEM = "system" @@ -24,6 +25,7 @@ EVENT_DATA_CHANGE_LOCATIONS = "locations" # dataChange operations +OPERATION = "operation" OPERATION_CREATE = "create" OPERATION_DELETE = "delete" OPERATION_UPDATE = "update" @@ -36,11 +38,39 @@ EVENT_COMMAND_SWITCH_CROWNSTONE = "switchCrownstone" # SSE presence events +EVENT_PRESENCE = "presence" EVENT_PRESENCE_ENTER_SPHERE = "enterSphere" EVENT_PRESENCE_EXIT_SPHERE = "exitSphere" EVENT_PRESENCE_ENTER_LOCATION = "enterLocation" EVENT_PRESENCE_EXIT_LOCATION = "exitLocation" +# SSE abilityChange events +EVENT_ABILITY_CHANGE = "abilityChange" +EVENT_ABILITY_CHANGE_DIMMING = "dimming" +EVENT_ABILITY_CHANGE_SWITCHCRAFT = "switchcraft" +EVENT_ABILITY_CHANGE_TAP_TO_TOGGLE = "tapToToggle" + +# syntax +TYPE = "type" +SUBTYPE = "subType" +ID = "id" +ERROR = "error" +CODE = "code" +UTF8 = "utf-8" + +# SSE data prefix +DATA = "data:" +PING = ":ping" + +# errors +LOGIN_FAILED = "LOGIN_FAILED" +LOGIN_FAILED_EMAIL_NOT_VERIFIED = "LOGIN_FAILED_EMAIL_NOT_VERIFIED" + +# SSE states +RUNNING = "running" +NOT_RUNNING = "not_running" +STOPPING = "stopping" + # lists for iteration system_events = [ EVENT_SYSTEM_TOKEN_EXPIRED, @@ -64,6 +94,12 @@ EVENT_DATA_CHANGE_LOCATIONS, ] +ability_change_events = [ + EVENT_ABILITY_CHANGE_DIMMING, + EVENT_ABILITY_CHANGE_SWITCHCRAFT, + EVENT_ABILITY_CHANGE_TAP_TO_TOGGLE +] + command_events = [ EVENT_COMMAND_SWITCH_CROWNSTONE ] diff --git a/crownstone_sse/events/AbilityChangeEvent.py b/crownstone_sse/events/AbilityChangeEvent.py new file mode 100644 index 0000000..eb28270 --- /dev/null +++ b/crownstone_sse/events/AbilityChangeEvent.py @@ -0,0 +1,31 @@ +class AbilityChangeEvent: + """Event that indicates an ability change""" + + def __init__(self, data, type) -> None: + """Init event""" + self.data = data + self.type = type + + @property + def sphere_id(self) -> str: + return self.data['sphere']['id'] + + @property + def cloud_id(self) -> str: + return self.data['stone']['id'] + + @property + def unique_id(self) -> str: + return self.data['stone']['uid'] + + @property + def ability_type(self) -> str: + return self.data['ability']['type'] + + @property + def ability_enabled(self) -> bool: + return self.data['ability']['enabled'] + + @property + def ability_synced_to_crownstone(self) -> bool: + return self.data['ability']['syncedToCrownstone'] diff --git a/crownstone_sse/exceptions.py b/crownstone_sse/exceptions.py index f52c54f..8f23dfc 100644 --- a/crownstone_sse/exceptions.py +++ b/crownstone_sse/exceptions.py @@ -8,6 +8,7 @@ class ConnectError(Enum): CONNECTION_FAILED_NO_INTERNET = 'CONNECTION_FAILED_NO_INTERNET' + CONNECTION_TIMEOUT = 'CONNECTION_TIMEOUT' class AuthError(Enum): @@ -25,6 +26,15 @@ def __init__(self, type, message=None): self.message = message +class CrownstoneConnectionTimeout(Exception): + type = None + message = None + + def __init__(self, type, message=None): + self.type = type + self.message = message + + def sse_exception_handler(_: Any, context: Dict) -> None: """Handle all exceptions inside the client.""" kwargs = {} diff --git a/examples/example.py b/examples/example.py index 85aa665..90b3902 100644 --- a/examples/example.py +++ b/examples/example.py @@ -2,19 +2,25 @@ Example receiving Crownstone SSE events and creating callbacks for the received data. Created by Ricardo Steijn. -Last update on 14-5-2020 +Last update on 2-7-2020 """ from crownstone_sse.client import CrownstoneSSE from crownstone_sse.events.SwitchStateUpdateEvent import SwitchStateUpdateEvent from crownstone_sse.events.SystemEvent import SystemEvent from crownstone_sse.events.PresenceEvent import PresenceEvent +from crownstone_sse.events.AbilityChangeEvent import AbilityChangeEvent from crownstone_sse.const import ( EVENT_SYSTEM_STREAM_START, EVENT_SWITCH_STATE_UPDATE, EVENT_PRESENCE_ENTER_LOCATION, + EVENT_ABILITY_CHANGE_DIMMING, ) +import logging import time +# enable logging +logging.basicConfig(format='%(levelname)s :%(message)s', level=logging.DEBUG) + def crownstone_update(event: SwitchStateUpdateEvent): print("Crownstone {} state changed to {}".format(event.cloud_id, event.switch_state)) @@ -28,6 +34,10 @@ def notify_presence_changed(event: PresenceEvent): print("User {} has entered location {}".format(event.user_id, event.location_id)) +def notify_ability_changed(event: AbilityChangeEvent): + print("Ability {} has been {}".format(event.ability_type, event.ability_enabled)) + + # Create a sse client instance. Pass your crownstone account information. # email and password are required for logging in again when an access token has expired. sse_client = CrownstoneSSE('email', 'password') @@ -41,8 +51,9 @@ def notify_presence_changed(event: PresenceEvent): sse_client.add_event_listener(EVENT_SYSTEM_STREAM_START, notify_stream_start) sse_client.add_event_listener(EVENT_SWITCH_STATE_UPDATE, crownstone_update) sse_client.add_event_listener(EVENT_PRESENCE_ENTER_LOCATION, notify_presence_changed) +sse_client.add_event_listener(EVENT_ABILITY_CHANGE_DIMMING, notify_ability_changed) -# block for 20 seconds (let the client run for 20 second before stopping) -time.sleep(20) +# block for 120 seconds (let the client run for 120 second before stopping) +time.sleep(120) # stop the client -sse_client.stop() \ No newline at end of file +sse_client.stop() diff --git a/setup.py b/setup.py index ef7907c..4fb9290 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='crownstone-sse', - version='1.1.2', + version='1.2.0', url='https://github.com/crownstone/crownstone-lib-python-sse', author='Crownstone B.V.', long_description=long_description, diff --git a/tests/test_client.py b/tests/test_client.py index 0c340bb..1a7453a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -118,14 +118,6 @@ async def test_stream(self, mock_stream): # test refresh call refresh_mock.assert_called_once() - # mock no connection to cloud - mock_stream.return_value = no_connection_data - with asynctest.patch.object(CrownstoneSSE, 'connect') as connect_mock: - await self.sse_client.stream(mock_stream_response) - - # test reconnection - connect_mock.assert_called_once() - # mock connect function mock_stream.side_effect = aiohttp.ClientPayloadError('test') with asynctest.patch.object(CrownstoneSSE, 'connect') as connect_mock: