Skip to content

Commit

Permalink
Updated galaxy api, implemented play time tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonardFiedrowicz committed Sep 17, 2019
1 parent 413d94f commit 4e745df
Show file tree
Hide file tree
Showing 13 changed files with 597 additions and 468 deletions.
2 changes: 1 addition & 1 deletion galaxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
__path__: str = __import__('pkgutil').extend_path(__path__, __name__)
20 changes: 12 additions & 8 deletions galaxy/api/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ class Platform(Enum):
NintendoDs = "nds"
Nintendo3Ds = "3ds"
PathOfExile = "pathofexile"
Twitch = "twitch"
Minecraft = "minecraft"
GameSessions = "gamesessions"
Nuuvem = "nuuvem"
FXStore = "fxstore"
IndieGala = "indiegala"
Playfire = "playfire"
Oculus = "oculus"
Test = "test"


class Feature(Enum):
"""Possible features that can be implemented by an integration.
Expand All @@ -98,6 +108,8 @@ class Feature(Enum):
ImportUsers = "ImportUsers"
VerifyGame = "VerifyGame"
ImportFriends = "ImportFriends"
ShutdownPlatformClient = "ShutdownPlatformClient"
LaunchPlatformClient = "LaunchPlatformClient"


class LicenseType(Enum):
Expand All @@ -116,11 +128,3 @@ class LocalGameState(Flag):
None_ = 0
Installed = 1
Running = 2


class PresenceState(Enum):
""""Possible states that a user can be in."""
Unknown = "Unknown"
Online = "online"
Offline = "offline"
Away = "away"
2 changes: 1 addition & 1 deletion galaxy/api/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from galaxy.api.jsonrpc import ApplicationError, UnknownError

UnknownError = UnknownError
assert UnknownError

class AuthenticationRequired(ApplicationError):
def __init__(self, data=None):
Expand Down
51 changes: 28 additions & 23 deletions galaxy/api/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json

from galaxy.reader import StreamLineReader
from galaxy.task_manager import TaskManager

class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
Expand Down Expand Up @@ -52,7 +53,8 @@ def __init__(self, data=None):
super().__init__(0, "Unknown error", data)

Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None])
Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"])
Method = namedtuple("Method", ["callback", "signature", "immediate", "sensitive_params"])


def anonymise_sensitive_params(params, sensitive_params):
anomized_data = "****"
Expand All @@ -74,9 +76,9 @@ def __init__(self, reader, writer, encoder=json.JSONEncoder()):
self._encoder = encoder
self._methods = {}
self._notifications = {}
self._eof_listeners = []
self._task_manager = TaskManager("jsonrpc server")

def register_method(self, name, callback, internal, sensitive_params=False):
def register_method(self, name, callback, immediate, sensitive_params=False):
"""
Register method
Expand All @@ -86,9 +88,9 @@ def register_method(self, name, callback, internal, sensitive_params=False):
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
self._methods[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params)

def register_notification(self, name, callback, internal, sensitive_params=False):
def register_notification(self, name, callback, immediate, sensitive_params=False):
"""
Register notification
Expand All @@ -98,10 +100,7 @@ def register_notification(self, name, callback, internal, sensitive_params=False
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)

def register_eof(self, callback):
self._eof_listeners.append(callback)
self._notifications[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params)

async def run(self):
while self._active:
Expand All @@ -118,14 +117,16 @@ async def run(self):
self._handle_input(data)
await asyncio.sleep(0) # To not starve task queue

def stop(self):
def close(self):
logging.info("Closing JSON-RPC server - not more messages will be read")
self._active = False

async def wait_closed(self):
await self._task_manager.wait()

def _eof(self):
logging.info("Received EOF")
self.stop()
for listener in self._eof_listeners:
listener()
self.close()

def _handle_input(self, data):
try:
Expand All @@ -145,20 +146,19 @@ def _handle_notification(self, request):
logging.error("Received unknown notification: %s", request.method)
return

callback, signature, internal, sensitive_params = method
callback, signature, immediate, sensitive_params = method
self._log_request(request, sensitive_params)

try:
bound_args = signature.bind(**request.params)
except TypeError:
self._send_error(request.id, InvalidParams())

if internal:
# internal requests are handled immediately
if immediate:
callback(*bound_args.args, **bound_args.kwargs)
else:
try:
asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs))
self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method)
except Exception:
logging.exception("Unexpected exception raised in notification handler")

Expand All @@ -169,16 +169,15 @@ def _handle_request(self, request):
self._send_error(request.id, MethodNotFound())
return

callback, signature, internal, sensitive_params = method
callback, signature, immediate, sensitive_params = method
self._log_request(request, sensitive_params)

try:
bound_args = signature.bind(**request.params)
except TypeError:
self._send_error(request.id, InvalidParams())

if internal:
# internal requests are handled immediately
if immediate:
response = callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, response)
else:
Expand All @@ -190,11 +189,13 @@ async def handle():
self._send_error(request.id, MethodNotFound())
except JsonRpcError as error:
self._send_error(request.id, error)
except asyncio.CancelledError:
self._send_error(request.id, Aborted())
except Exception as e: #pylint: disable=broad-except
logging.exception("Unexpected exception raised in plugin handler")
self._send_error(request.id, UnknownError(str(e)))

asyncio.create_task(handle())
self._task_manager.create_task(handle(), request.method)

@staticmethod
def _parse_request(data):
Expand All @@ -215,7 +216,7 @@ def _send(self, data):
logging.debug("Sending data: %s", line)
data = (line + "\n").encode("utf-8")
self._writer.write(data)
asyncio.create_task(self._writer.drain())
self._task_manager.create_task(self._writer.drain(), "drain")
except TypeError as error:
logging.error(str(error))

Expand Down Expand Up @@ -255,6 +256,7 @@ def __init__(self, writer, encoder=json.JSONEncoder()):
self._writer = writer
self._encoder = encoder
self._methods = {}
self._task_manager = TaskManager("notification client")

def notify(self, method, params, sensitive_params=False):
"""
Expand All @@ -273,13 +275,16 @@ def notify(self, method, params, sensitive_params=False):
self._log(method, params, sensitive_params)
self._send(notification)

async def close(self):
await self._task_manager.wait()

def _send(self, data):
try:
line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8")
logging.debug("Sending %d byte of data", len(data))
self._writer.write(data)
asyncio.create_task(self._writer.drain())
self._task_manager.create_task(self._writer.drain(), "drain")
except TypeError as error:
logging.error("Failed to parse outgoing message: %s", str(error))

Expand Down
Loading

0 comments on commit 4e745df

Please sign in to comment.