Skip to content

Commit

Permalink
Added AbilityChangeEvent, fixed reconnection issue on Linux, created …
Browse files Browse the repository at this point in the history
…more constants.
  • Loading branch information
RicArch97 committed Jul 2, 2020
1 parent 03f83ee commit 7268375
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 62 deletions.
34 changes: 29 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Asynchronous Python library that listens to Crownstone SSE events.

## Requirements
* Python 3.7 or higher
* Aiohttp 3.6.2
* Aiohttp 3.6.1

## Standard installation
cd to the project folder and run:
Expand Down Expand Up @@ -39,13 +39,20 @@ from crownstone_sse.client import CrownstoneSSE
from crownstone_sse.events.SwitchStateUpdateEvent import SwitchStateUpdateEvent
from crownstone_sse.events.SystemEvent import SystemEvent
from crownstone_sse.events.PresenceEvent import PresenceEvent
from crownstone_sse.events.AbilityChangeEvent import AbilityChangeEvent
from crownstone_sse.const import (
EVENT_SYSTEM_STREAM_START,
EVENT_SWITCH_STATE_UPDATE,
EVENT_PRESENCE_ENTER_LOCATION,
EVENT_ABILITY_CHANGE_DIMMING,
)
import logging
import time

# enable logging
logging.basicConfig(format='%(levelname)s :%(message)s', level=logging.DEBUG)


def crownstone_update(event: SwitchStateUpdateEvent):
print("Crownstone {} state changed to {}".format(event.cloud_id, event.switch_state))

Expand All @@ -58,6 +65,10 @@ def notify_presence_changed(event: PresenceEvent):
print("User {} has entered location {}".format(event.user_id, event.location_id))


def notify_ability_changed(event: AbilityChangeEvent):
print("Ability {} has been {}".format(event.ability_type, event.ability_enabled))


# Create a sse client instance. Pass your crownstone account information.
# email and password are required for logging in again when an access token has expired.
sse_client = CrownstoneSSE('email', 'password')
Expand All @@ -71,10 +82,10 @@ sse_client.start()
sse_client.add_event_listener(EVENT_SYSTEM_STREAM_START, notify_stream_start)
sse_client.add_event_listener(EVENT_SWITCH_STATE_UPDATE, crownstone_update)
sse_client.add_event_listener(EVENT_PRESENCE_ENTER_LOCATION, notify_presence_changed)
sse_client.add_event_listener(EVENT_ABILITY_CHANGE_DIMMING, notify_ability_changed)

# let the client run for 20 seconds (block)
time.sleep(20)

# block for 120 seconds (let the client run for 120 second before stopping)
time.sleep(120)
# stop the client
sse_client.stop()
```
Expand All @@ -95,12 +106,13 @@ def callback(event: PresenceEvent):
```

## Event types
Currently, there are 5 different event types:
Currently, there are 6 different event types:
* System event
* Command event
* Data change event
* Presence event
* Switch state update event
* Ability change event

### System event
A system event is represented as:
Expand Down Expand Up @@ -143,6 +155,18 @@ A switch state update event is represented as:
* unique_id
* switch_state

### Ability change event
An ability change event is represented as:
#### Sphere
* sphere_id
#### Crownstone
* cloud_id
* unique_id
#### Ability
* ability_type
* ability_enabled
* ability_synced_to_crownstone

## Testing
To run the tests using tox install tox first by running:
```console
Expand Down
2 changes: 1 addition & 1 deletion crownstone_sse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from crownstone_sse.client import CrownstoneSSE
from crownstone_sse.util.eventbus import EventBus

__version__ = '1.1.2'
__version__ = '1.2.0'
135 changes: 92 additions & 43 deletions crownstone_sse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import json
import hashlib
import time
from threading import Thread
from aiohttp import (
ClientSession,
Expand All @@ -10,27 +11,39 @@
)
from crownstone_sse.util.eventbus import EventBus
from crownstone_sse.const import (
EVENT_CLIENT_STOP,
EVENT_BASE_URL,
LOGIN_URL,
EVENT_BASE_URL, LOGIN_URL,
RECONNECTION_TIME,
CONNECTION_TIMEOUT,
EVENT_SYSTEM,
EVENT_COMMAND,
EVENT_PRESENCE,
EVENT_DATA_CHANGE,
OPERATION,
EVENT_SWITCH_STATE_UPDATE,
EVENT_ABILITY_CHANGE,
EVENT_CLIENT_STOP,
EVENT_SYSTEM_TOKEN_EXPIRED,
EVENT_SYSTEM_NO_CONNECTION,
EVENT_SWITCH_STATE_UPDATE,
system_events,
presence_events,
command_events,
data_change_events,
operations
ability_change_events,
operations,
TYPE, SUBTYPE, ID, ERROR, CODE, DATA, UTF8, PING,
RUNNING, NOT_RUNNING, STOPPING,
LOGIN_FAILED, LOGIN_FAILED_EMAIL_NOT_VERIFIED
)
from crownstone_sse.events.SystemEvent import SystemEvent
from crownstone_sse.events.CommandEvent import CommandEvent
from crownstone_sse.events.DataChangeEvent import DataChangeEvent
from crownstone_sse.events.PresenceEvent import PresenceEvent
from crownstone_sse.events.SwitchStateUpdateEvent import SwitchStateUpdateEvent
from crownstone_sse.events.AbilityChangeEvent import AbilityChangeEvent
from crownstone_sse.exceptions import (
sse_exception_handler,
CrownstoneSseException,
CrownstoneConnectionTimeout,
ConnectError,
AuthError,
)
Expand All @@ -48,7 +61,8 @@ def __init__(self, email: str, password: str) -> None:
self.loop.set_exception_handler(sse_exception_handler)
self.websession: ClientSession = ClientSession(loop=self.loop, read_timeout=None)
self.event_bus: EventBus = EventBus()
self.state = "not_running"
self.state = NOT_RUNNING
self.available = False
self.stop_event: Optional[asyncio.Event] = None
# Instance information
self.access_token: Optional[str] = None
Expand All @@ -57,6 +71,11 @@ def __init__(self, email: str, password: str) -> None:
# Initialize thread
super().__init__(target=self.run)

@property
def is_available(self) -> bool:
"""Return if Crownstone SSE is available"""
return self.available

def run(self):
"""Start the SSE client"""
try:
Expand All @@ -67,8 +86,8 @@ def run(self):

async def async_start(self) -> None:
"""start the SSE client in current OS thread."""
if self.state != "not_running":
_LOGGER.warning("Crownstone SSE client is already running")
if self.state != NOT_RUNNING:
_LOGGER.debug("Crownstone SSE client is already running")

if self.access_token is None:
if not self.email or not self.password:
Expand All @@ -80,14 +99,15 @@ async def async_start(self) -> None:
self.stop_event = asyncio.Event()

# Connect to the event server & start streaming
self.state = RUNNING
await self.connect()

def set_access_token(self, access_token: str):
self.access_token = access_token

async def login(self) -> None:
"""Login to Crownstone using email and password"""
shasum = hashlib.sha1(self.password.encode('utf-8'))
shasum = hashlib.sha1(self.password.encode(UTF8))
hashed_password = shasum.hexdigest()

# Create JSON object with login credentials
Expand All @@ -100,20 +120,20 @@ async def login(self) -> None:
async with self.websession.post(LOGIN_URL, data=data) as result:
data = await result.json()
if result.status == 200:
self.access_token = data['id']
self.access_token = data[ID]
elif result.status == 401:
if 'error' in data:
error = data['error']
if error['code'] == 'LOGIN_FAILED':
if ERROR in data:
error = data[ERROR]
if error[CODE] == LOGIN_FAILED:
raise CrownstoneSseException(AuthError.AUTHENTICATION_ERROR, "Wrong email/password")
elif error['code'] == 'LOGIN_FAILED_EMAIL_NOT_VERIFIED':
elif error[CODE] == LOGIN_FAILED_EMAIL_NOT_VERIFIED:
raise CrownstoneSseException(AuthError.EMAIL_NOT_VERIFIED, "Email not verified")
else:
raise CrownstoneSseException(AuthError.UNKNOWN_ERROR, "Unknown error occurred")
except ClientConnectionError:
raise CrownstoneSseException(ConnectError.CONNECTION_FAILED_NO_INTERNET, "No internet connection")

_LOGGER.info("Login successful")
_LOGGER.debug("Login successful")

async def connect(self):
"""
Expand All @@ -124,40 +144,49 @@ async def connect(self):
response.raise_for_status()
await self.stream(response)
except ClientConnectionError:
_LOGGER.warning('Internet connection lost. Reconnection in {} seconds'.format(RECONNECTION_TIME))
_LOGGER.debug('Internet connection lost. Reconnection in {} seconds'.format(RECONNECTION_TIME))
await asyncio.sleep(RECONNECTION_TIME)
if self.state == NOT_RUNNING:
return
await self.connect()

async def stream(self, stream_response):
"""Start streaming"""
# aiohttp StreamReader instance
stream_reader = stream_response.content
# client is now running, and can be stopped
self.state = "running"
# client is now available for receiving events
self.available = True
_LOGGER.info("Crownstone SSE Client has started.")

try:
# start params
line_in_bytes = b''
start_time = time.perf_counter()
while stream_response.status != 204: # no data
# read the buffer of the stream
chunk = stream_reader.read_nowait()
for line in chunk.splitlines(True):
line_in_bytes += line
if line_in_bytes.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')):
line = line_in_bytes.decode('utf8') # string
line = line_in_bytes.decode(UTF8) # string
line = line.rstrip('\n').rstrip('\r') # remove returns

if line.startswith('data:'):
line = line.lstrip('data:')
if line.startswith(PING):
# connection alive received, update start time
start_time = time.perf_counter()

if line.startswith(DATA):
line = line.lstrip(DATA)
data = json.loads(line) # type dict
# check for access token expiration and login + restart client
# no need to fire event for this first
if data['type'] == 'system' and data['subType'] == EVENT_SYSTEM_TOKEN_EXPIRED:
if data[TYPE] == EVENT_SYSTEM and data[SUBTYPE] == EVENT_SYSTEM_TOKEN_EXPIRED:
await self.refresh_token()
# check for no connection between the sse server and the crownstone cloud
# simply try to reconnect
if data['type'] == 'system' and data['subType'] == EVENT_SYSTEM_NO_CONNECTION:
await self.connect()
# log this issue
if data[TYPE] == EVENT_SYSTEM and data[SUBTYPE] == EVENT_SYSTEM_NO_CONNECTION:
_LOGGER.warning("No connection to Crownstone cloud, waiting for server to come back "
"online")
# handle firing of events
self.fire_events(data)

Expand All @@ -168,49 +197,68 @@ async def stream(self, stream_response):
if self.stop_event.is_set():
break

# a ping is sent every 30 seconds to notify the connection is alive
# if a ping event has not been sent after 40 seconds (10 second margin), raise connection error
if time.perf_counter() - start_time > CONNECTION_TIMEOUT:
raise CrownstoneConnectionTimeout(ConnectError.CONNECTION_TIMEOUT, "Connection to server timed out")

# let buffer fill itself with data
await asyncio.sleep(0.05)

except CrownstoneConnectionTimeout:
# Internet connection was lost, try to reconnect
_LOGGER.warning("Connection to server timed out, trying to reconnect...")
self.available = False
await self.connect()

except ClientPayloadError:
# Internet connection was lost, payload uncompleted. try to reconnect
# .connect() will handle further reconnection
# This exception only occurred on Windows.
self.available = False
await self.connect()

except KeyboardInterrupt:
# Ctrl + C pressed or other command that causes interrupt
await self.async_stop()

def fire_events(self, data) -> None:
"""Fire event based on the data"""
if data['type'] == 'system':
if data[TYPE] == EVENT_SYSTEM:
for system_event in system_events:
if data['subType'] == system_event:
if data[SUBTYPE] == system_event:
event = SystemEvent(data)
self.event_bus.fire(system_event, event)

if data['type'] == 'command':
if data[TYPE] == EVENT_COMMAND:
for command_event in command_events:
if data['subType'] == command_event:
if data[SUBTYPE] == command_event:
event = CommandEvent(data)
self.event_bus.fire(command_event, event)

if data['type'] == EVENT_SWITCH_STATE_UPDATE:
if data[TYPE] == EVENT_SWITCH_STATE_UPDATE:
event = SwitchStateUpdateEvent(data)
self.event_bus.fire(EVENT_SWITCH_STATE_UPDATE, event)

if data['type'] == 'dataChange':
if data[TYPE] == EVENT_DATA_CHANGE:
for data_change_event in data_change_events:
if data['subType'] == data_change_event:
if data[SUBTYPE] == data_change_event:
for operation in operations:
if data['operation'] == operation:
if data[OPERATION] == operation:
event = DataChangeEvent(data, data_change_event, operation)
self.event_bus.fire(data_change_event, event)

if data['type'] == 'presence':
if data[TYPE] == EVENT_PRESENCE:
for presence_event in presence_events:
if data['subType'] == presence_event:
if data[SUBTYPE] == presence_event:
event = PresenceEvent(data, presence_event)
self.event_bus.fire(presence_event, event)

if data[TYPE] == EVENT_ABILITY_CHANGE:
for ability_change_event in ability_change_events:
if data[SUBTYPE] == ability_change_event:
event = AbilityChangeEvent(data, ability_change_event)
self.event_bus.fire(ability_change_event, event)

def add_event_listener(self, event_type, callback):
self.event_bus.add_event_listener(event_type=event_type, event_listener=callback)

Expand All @@ -222,25 +270,26 @@ async def refresh_token(self) -> None:
def stop(self) -> None:
"""Stop the Crownstone SSE client from an other thread."""
# ignore if not running
if self.state == 'not_running':
if self.state == NOT_RUNNING:
return
self.loop.create_task(self.async_stop())
asyncio.run_coroutine_threadsafe(self.async_stop(), self.loop)

async def async_stop(self) -> None:
"""Stop Crownstone SSE client from within the loop."""
# ignore if not running or already stopping
if self.state == "not_running":
if self.state == NOT_RUNNING:
return
if self.state == "stopping":
_LOGGER.warning("stop already called")
if self.state == STOPPING:
_LOGGER.debug("stop already called")
return

self.state = "stopping"
self.state = STOPPING
self.available = False
self.event_bus.fire(EVENT_CLIENT_STOP) # for callback

# Close the ClientSession
await self.websession.close()

self.state = "not_running"
self.state = NOT_RUNNING
self.stop_event.set()
_LOGGER.info("Crownstone SSE client stopped.")
Loading

0 comments on commit 7268375

Please sign in to comment.