diff --git a/pylgtv/__init__.py b/pylgtv/__init__.py new file mode 100644 index 0000000..22083da --- /dev/null +++ b/pylgtv/__init__.py @@ -0,0 +1,2 @@ +from .webos_client import WebOsClient +from .webos_client import PyLGTVPairException diff --git a/pylgtv/endpoints.py b/pylgtv/endpoints.py new file mode 100644 index 0000000..fea138b --- /dev/null +++ b/pylgtv/endpoints.py @@ -0,0 +1,38 @@ +EP_GET_SERVICES = "api/getServiceList" +EP_SET_MUTE = "audio/setMute" +EP_GET_AUDIO_STATUS = "audio/getStatus" +EP_GET_VOLUME = "audio/getVolume" +EP_SET_VOLUME = "audio/setVolume" +EP_VOLUME_UP = "audio/volumeUp" +EP_VOLUME_DOWN= "audio/volumeDown" +EP_GET_CURRENT_APP_INFO = "com.webos.applicationManager/getForegroundAppInfo" +EP_LAUNCH_APP = "com.webos.applicationManager/launch" +EP_GET_APPS = "com.webos.applicationManager/listLaunchPoints" +EP_GET_APP_STATUS = "com.webos.service.appstatus/getAppStatus" +EP_SEND_ENTER = "com.webos.service.ime/sendEnterKey" +EP_SEND_DELETE = "com.webos.service.ime/deleteCharacters" +EP_3D_ON = "com.webos.service.tv.display/set3DOn" +EP_3D_OFF = "com.webos.service.tv.display/set3DOff" +EP_GET_SOFTWARE_INFO = "com.webos.service.update/getCurrentSWInformation" +EP_MEDIA_PLAY = "media.controls/play" +EP_MEDIA_STOP = "media.controls/stop" +EP_MEDIA_PAUSE = "media.controls/pause" +EP_MEDIA_REWIND = "media.controls/rewind" +EP_MEDIA_FAST_FORWARD = "media.controls/fastForward" +EP_MEDIA_CLOSE = "media.viewer/close" +EP_POWER_OFF = "system/turnOff" +EP_POWER_ON = "system/turnOn" +EP_SHOW_MESSAGE = "system.notifications/createToast" +EP_LAUNCHER_CLOSE = "system.launcher/close" +EP_GET_APP_STATE = "system.launcher/getAppState" +EP_LAUNCH = "system.launcher/launch" +EP_OPEN = "system.launcher/open" +EP_TV_CHANNEL_DOWN = "tv/channelDown" +EP_TV_CHANNEL_UP = "tv/channelUp" +EP_GET_TV_CHANNELS = "tv/getChannelList" +EP_GET_CHANNEL_INFO = "tv/getChannelProgramInfo" +EP_GET_CURRENT_CHANNEL = "tv/getCurrentChannel" +EP_GET_INPUTS = "tv/getExternalInputList" +EP_SET_CHANNEL = "tv/openChannel" +EP_SET_INPUT = "tv/switchInput" +EP_CLOSE_WEB_APP = "webapp/closeWebApp" diff --git a/pylgtv/handshake.json b/pylgtv/handshake.json new file mode 100644 index 0000000..27bbf6c --- /dev/null +++ b/pylgtv/handshake.json @@ -0,0 +1,74 @@ +{ + "type": "register", + "id": "register_0", + "payload": { + "forcePairing": false, + "pairingType": "PROMPT", + "manifest": { + "manifestVersion": 1, + "appVersion": "1.1", + "signed": { + "created": "20140509", + "appId": "com.lge.test", + "vendorId": "com.lge", + "localizedAppNames": { + "": "LG Remote App", + "ko-KR": "리모컨 앱", + "zxx-XX": "ЛГ Rэмotэ AПП" + }, + "localizedVendorNames": { + "": "LG Electronics" + }, + "permissions": [ + "TEST_SECURE", + "CONTROL_INPUT_TEXT", + "CONTROL_MOUSE_AND_KEYBOARD", + "READ_INSTALLED_APPS", + "READ_LGE_SDX", + "READ_NOTIFICATIONS", + "SEARCH", + "WRITE_SETTINGS", + "WRITE_NOTIFICATION_ALERT", + "CONTROL_POWER", + "READ_CURRENT_CHANNEL", + "READ_RUNNING_APPS", + "READ_UPDATE_INFO", + "UPDATE_FROM_REMOTE_APP", + "READ_LGE_TV_INPUT_EVENTS", + "READ_TV_CURRENT_TIME" + ], + "serial": "2f930e2d2cfe083771f68e4fe7bb07" + }, + "permissions": [ + "LAUNCH", + "LAUNCH_WEBAPP", + "APP_TO_APP", + "CLOSE", + "TEST_OPEN", + "TEST_PROTECTED", + "CONTROL_AUDIO", + "CONTROL_DISPLAY", + "CONTROL_INPUT_JOYSTICK", + "CONTROL_INPUT_MEDIA_RECORDING", + "CONTROL_INPUT_MEDIA_PLAYBACK", + "CONTROL_INPUT_TV", + "CONTROL_POWER", + "READ_APP_STATUS", + "READ_CURRENT_CHANNEL", + "READ_INPUT_DEVICE_LIST", + "READ_NETWORK_STATE", + "READ_RUNNING_APPS", + "READ_TV_CHANNEL_LIST", + "WRITE_NOTIFICATION_TOAST", + "READ_POWER_STATE", + "READ_COUNTRY_INFO" + ], + "signatures": [ + { + "signatureVersion": 1, + "signature": "eyJhbGdvcml0aG0iOiJSU0EtU0hBMjU2Iiwia2V5SWQiOiJ0ZXN0LXNpZ25pbmctY2VydCIsInNpZ25hdHVyZVZlcnNpb24iOjF9.hrVRgjCwXVvE2OOSpDZ58hR+59aFNwYDyjQgKk3auukd7pcegmE2CzPCa0bJ0ZsRAcKkCTJrWo5iDzNhMBWRyaMOv5zWSrthlf7G128qvIlpMT0YNY+n/FaOHE73uLrS/g7swl3/qH/BGFG2Hu4RlL48eb3lLKqTt2xKHdCs6Cd4RMfJPYnzgvI4BNrFUKsjkcu+WD4OO2A27Pq1n50cMchmcaXadJhGrOqH5YmHdOCj5NSHzJYrsW0HPlpuAx/ECMeIZYDh6RMqaFM2DXzdKX9NmmyqzJ3o/0lkk/N97gfVRLW5hA29yeAwaCViZNCP8iC9aO0q9fQojoa7NQnAtw==" + } + ] + } + } +} \ No newline at end of file diff --git a/pylgtv/webos_client.py b/pylgtv/webos_client.py new file mode 100644 index 0000000..1460f84 --- /dev/null +++ b/pylgtv/webos_client.py @@ -0,0 +1,473 @@ +import asyncio +import base64 +import codecs +import json +import os +import websockets +import logging + +logger = logging.getLogger(__name__) + +from .endpoints import * + +KEY_FILE_NAME = '.pylgtv' +USER_HOME = 'HOME' +HANDSHAKE_FILE_NAME = 'handshake.json' + + +class PyLGTVPairException(Exception): + def __init__(self, id, message): + self.id = id + self.message = message + + +class WebOsClient(object): + def __init__(self, ip, key_file_path=None, timeout_connect=2): + """Initialize the client.""" + self.ip = ip + self.port = 3000 + self.key_file_path = key_file_path + self.client_key = None + self.web_socket = None + self.command_count = 0 + self.last_response = None + self.timeout_connect = timeout_connect + + self.load_key_file() + + @staticmethod + def _get_key_file_path(): + """Return the key file path.""" + if os.getenv(USER_HOME) is not None and os.access(os.getenv(USER_HOME), + os.W_OK): + return os.path.join(os.getenv(USER_HOME), KEY_FILE_NAME) + + return os.path.join(os.getcwd(), KEY_FILE_NAME) + + def load_key_file(self): + """Try to load the client key for the current ip.""" + self.client_key = None + if self.key_file_path: + key_file_path = self.key_file_path + else: + key_file_path = self._get_key_file_path() + key_dict = {} + + logger.debug('load keyfile from %s', key_file_path); + + if os.path.isfile(key_file_path): + with open(key_file_path, 'r') as f: + raw_data = f.read() + if raw_data: + key_dict = json.loads(raw_data) + + logger.debug('getting client_key for %s from %s', self.ip, key_file_path); + if self.ip in key_dict: + self.client_key = key_dict[self.ip] + + def save_key_file(self): + """Save the current client key.""" + if self.client_key is None: + return + + if self.key_file_path: + key_file_path = self.key_file_path + else: + key_file_path = self._get_key_file_path() + + logger.debug('save keyfile to %s', key_file_path); + + with open(key_file_path, 'w+') as f: + raw_data = f.read() + key_dict = {} + + if raw_data: + key_dict = json.loads(raw_data) + + key_dict[self.ip] = self.client_key + + f.write(json.dumps(key_dict)) + + @asyncio.coroutine + def _send_register_payload(self, websocket): + """Send the register payload.""" + file = os.path.join(os.path.dirname(__file__), HANDSHAKE_FILE_NAME) + + data = codecs.open(file, 'r', 'utf-8') + raw_handshake = data.read() + + handshake = json.loads(raw_handshake) + handshake['payload']['client-key'] = self.client_key + + yield from websocket.send(json.dumps(handshake)) + raw_response = yield from websocket.recv() + response = json.loads(raw_response) + + if response['type'] == 'response' and \ + response['payload']['pairingType'] == 'PROMPT': + raw_response = yield from websocket.recv() + response = json.loads(raw_response) + if response['type'] == 'registered': + self.client_key = response['payload']['client-key'] + self.save_key_file() + + def is_registered(self): + """Paired with the tv.""" + return self.client_key is not None + + @asyncio.coroutine + def _register(self): + """Register wrapper.""" + logger.debug('register on %s', "ws://{}:{}".format(self.ip, self.port)); + try: + websocket = yield from websockets.connect( + "ws://{}:{}".format(self.ip, self.port), timeout=self.timeout_connect) + + except: + logger.error('register failed to connect to %s', "ws://{}:{}".format(self.ip, self.port)); + return False + + logger.debug('register websocket connected to %s', "ws://{}:{}".format(self.ip, self.port)); + + try: + yield from self._send_register_payload(websocket) + + finally: + logger.debug('close register connection to %s', "ws://{}:{}".format(self.ip, self.port)); + yield from websocket.close() + + def register(self): + """Pair client with tv.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._register()) + + @asyncio.coroutine + def _command(self, msg): + """Send a command to the tv.""" + logger.debug('send command to %s', "ws://{}:{}".format(self.ip, self.port)); + try: + websocket = yield from websockets.connect( + "ws://{}:{}".format(self.ip, self.port), timeout=self.timeout_connect) + except: + logger.debug('command failed to connect to %s', "ws://{}:{}".format(self.ip, self.port)); + return False + + logger.debug('command websocket connected to %s', "ws://{}:{}".format(self.ip, self.port)); + + try: + yield from self._send_register_payload(websocket) + + if not self.client_key: + raise PyLGTVPairException("Unable to pair") + + yield from websocket.send(json.dumps(msg)) + + if msg['type'] == 'request': + raw_response = yield from websocket.recv() + self.last_response = json.loads(raw_response) + + finally: + logger.debug('close command connection to %s', "ws://{}:{}".format(self.ip, self.port)); + yield from websocket.close() + + def command(self, request_type, uri, payload): + """Build and send a command.""" + self.command_count += 1 + + if payload is None: + payload = {} + + message = { + 'id': "{}_{}".format(type, self.command_count), + 'type': request_type, + 'uri': "ssap://{}".format(uri), + 'payload': payload, + } + + self.last_response = None + + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(asyncio.wait_for(self._command(message), self.timeout_connect, loop=loop)) + finally: + loop.close() + + def request(self, uri, payload=None): + """Send a request.""" + self.command('request', uri, payload) + + def send_message(self, message, icon_path=None): + """Show a floating message.""" + icon_encoded_string = '' + icon_extension = '' + + if icon_path is not None: + icon_extension = os.path.splitext(icon_path)[1][1:] + with open(icon_path, 'rb') as icon_file: + icon_encoded_string = base64.b64encode(icon_file.read()).decode('ascii') + + self.request(EP_SHOW_MESSAGE, { + 'message': message, + 'iconData': icon_encoded_string, + 'iconExtension': icon_extension + }) + + # Apps + def get_apps(self): + """Return all apps.""" + self.request(EP_GET_APPS) + return {} if self.last_response is None else self.last_response.get('payload').get('launchPoints') + + def get_current_app(self): + """Get the current app id.""" + self.request(EP_GET_CURRENT_APP_INFO) + return None if self.last_response is None else self.last_response.get('payload').get('appId') + + def launch_app(self, app): + """Launch an app.""" + self.command('request', EP_LAUNCH, { + 'id': app + }) + + def launch_app_with_params(self, app, params): + """Launch an app with parameters.""" + self.request(EP_LAUNCH, { + 'id': app, + 'params': params + }) + + def launch_app_with_content_id(self, app, contentId): + """Launch an app with contentId.""" + self.request(EP_LAUNCH, { + 'id': app, + 'contentId': contentId + }) + + def close_app(self, app): + """Close the current app.""" + self.request(EP_LAUNCHER_CLOSE, { + 'id': app + }) + + # Services + def get_services(self): + """Get all services.""" + self.request(EP_GET_SERVICES) + return {} if self.last_response is None else self.last_response.get('payload').get('services') + + def get_software_info(self): + """Return the current software status.""" + self.request(EP_GET_SOFTWARE_INFO) + return {} if self.last_response is None else self.last_response.get('payload') + + def power_off(self): + """Play media.""" + self.request(EP_POWER_OFF) + + def power_on(self): + """Play media.""" + self.request(EP_POWER_ON) + + # 3D Mode + def turn_3d_on(self): + """Turn 3D on.""" + self.request(EP_3D_ON) + + def turn_3d_off(self): + """Turn 3D off.""" + self.request(EP_3D_OFF) + + # Inputs + def get_inputs(self): + """Get all inputs.""" + self.request(EP_GET_INPUTS) + return {} if self.last_response is None else self.last_response.get('payload').get('devices') + + def get_input(self): + """Get current input.""" + return self.get_current_app() + + def set_input(self, input): + """Set the current input.""" + self.request(EP_SET_INPUT, { + 'inputId': input + }) + + # Audio + def get_audio_status(self): + """Get the current audio status""" + self.request(EP_GET_AUDIO_STATUS) + return {} if self.last_response is None else self.last_response.get('payload') + + def get_muted(self): + """Get mute status.""" + return self.get_audio_status().get('mute') + + def set_mute(self, mute): + """Set mute.""" + self.request(EP_SET_MUTE, { + 'mute': mute + }) + + def get_volume(self): + """Get the current volume.""" + self.request(EP_GET_VOLUME) + return 0 if self.last_response is None else self.last_response.get('payload').get('volume') + + def set_volume(self, volume): + """Set volume.""" + volume = max(0, volume) + self.request(EP_SET_VOLUME, { + 'volume': volume + }) + + def volume_up(self): + """Volume up.""" + self.request(EP_VOLUME_UP) + + def volume_down(self): + """Volume down.""" + self.request(EP_VOLUME_DOWN) + + # TV Channel + def channel_up(self): + """Channel up.""" + self.request(EP_TV_CHANNEL_UP) + + def channel_down(self): + """Channel down.""" + self.request(EP_TV_CHANNEL_DOWN) + + def get_channels(self): + """Get all tv channels.""" + self.request(EP_GET_TV_CHANNELS) + return {} if self.last_response is None else self.last_response.get('payload').get('channelList') + + def get_current_channel(self): + """Get the current tv channel.""" + self.request(EP_GET_CURRENT_CHANNEL) + return {} if self.last_response is None else self.last_response.get('payload') + + def get_channel_info(self): + """Get the current channel info.""" + self.request(EP_GET_CHANNEL_INFO) + return {} if self.last_response is None else self.last_response.get('payload') + + def set_channel(self, channel): + """Set the current channel.""" + self.request(EP_SET_CHANNEL, { + 'channelId': channel + }) + + # Media control + def play(self): + """Play media.""" + self.request(EP_MEDIA_PLAY) + + def pause(self): + """Pause media.""" + self.request(EP_MEDIA_PAUSE) + + def stop(self): + """Stop media.""" + self.request(EP_MEDIA_STOP) + + def close(self): + """Close media.""" + self.request(EP_MEDIA_CLOSE) + + def rewind(self): + """Rewind media.""" + self.request(EP_MEDIA_REWIND) + + def fast_forward(self): + """Fast Forward media.""" + self.request(EP_MEDIA_FAST_FORWARD) + + # Keys + def send_enter_key(self): + """Send enter key.""" + self.request(EP_SEND_ENTER) + + def send_delete_key(self): + """Send delete key.""" + self.request(EP_SEND_DELETE) + + # Web + def open_url(self, url): + """Open URL.""" + self.request(EP_OPEN, { + 'target': url + }) + + def close_web(self): + """Close web app.""" + self.request(EP_CLOSE_WEB_APP) + + # Mouse controls + @asyncio.coroutine + def _input_command(self, msg, ws=None): + """Send an input command to the tv.""" + logger.debug('send command to %s', "{}".format(ws)); + try: + mouse_sock = yield from websockets.connect(ws, timeout=self.timeout_connect) + except: + logger.debug('command failed to connect to %s', "{}".format(ws)); + return False + + logger.debug('command websocket connected to %s', "{}".format(ws)); + + try: + yield from mouse_sock.send(msg) + + finally: + logger.debug('close command connection to %s', "{}".format(ws)); + yield from mouse_sock.close() + + def input_command(self, payload): + """Retrieve mouse websocket and send an input command.""" + #generate input socket + self.request(uri="com.webos.service.networkinput/getPointerInputSocket") + res = self.last_response + sock_path = res.get("payload").get("socketPath") + + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(asyncio.wait_for(self._input_command(payload, sock_path), self.timeout_connect, loop=loop)) + finally: + loop.close() + + def send_button(self, button): + #possible buttons LEFT, RIGHT, DOWN, UP, HOME, BACK, ENTER, DASH, INFO, + # 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, ASTERISK, CC, EXIT, MUTE, RED, GREEN, + # BLUE, VOLUMEUP, VOLUMEDOWN, CHANNELUP, CHANNELDOWN + """Send button command.""" + button = str(button).upper() + payload = 'type:button' + '\n' 'name:' + button + '\n\n' + self.input_command(payload) + + def send_click(self): + """Send click command.""" + payload = 'type:click\n\n' + self.input_command(payload) + + def send_scroll(self, dx='0', dy='0', down='0'): + #scroll wheel, ex. dy=1 scroll up 1 dy=-1 scroll down 1 + """Send scroll command.""" + dx = str(dx) + dy = str(dy) + down = str(down) + payload = 'type:scroll' + '\n' 'dx:' + dx + '\n' 'dy:' + dy + '\ndown:' + down + '\n\n' + self.input_command(payload) + + def send_move(self, dx='0', dy='0', down='0'): + #not sure how this works on TV, doesn't seem to be absolute or relative + """Send move command.""" + dx = str(dx) + dy = str(dy) + down = str(down) + payload = 'type:move' + '\n' 'dx:' + dx + '\n' 'dy:' + dy + '\ndown:' + down + '\n\n' + self.input_command(payload)