-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
224 lines (187 loc) · 7.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import sys
from optparse import OptionParser
import requests
import json
import socket
import subprocess
import time
import websocket
import json
import socket
import os
import sys
import logging
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, jsonify
from flask_cors import CORS
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
logger.disabled = True
app = Flask(__name__)
CORS(app, resources={r"*": {"origins": "*"}})
erb = None
song_data = {}
@app.route('/')
def index():
# Serve the HTML interface for controlling the player
return render_template('index.html')
@app.route('/send_command', methods=['POST'])
def send_command():
print(request.json)
command = request.json.get('command')
if command:
result = handle_command(command)
print(f"Command '{command}' sent. Result: {result}")
return jsonify({'result': result})
return jsonify({'error': 'No command provided'}), 400
@app.route('/api/song', methods=['GET', 'POST'])
def current_song():
if request.method == 'POST':
# Update the current song with data from the Tidal app
global song_data
song_data = request.json
#print(song_data) # Just for debugging
return jsonify({'status': 'Song updated'})
else:
# Get the current song data
return jsonify(song_data)
@app.route('/api/playlist', methods=['GET', 'POST'])
def playlist():
# Endpoint for getting or updating the playlist
if request.method == 'POST':
# Update the playlist with data from the Tidal app
playlist_data = request.json
# Process the playlist_data as needed
return jsonify({'status': 'Playlist updated'})
else:
# Get the current playlist data
playlist_data = {} # Replace with actual data retrieval logic
return jsonify(playlist_data)
@app.route('/static/<path:path>')
def send_static(path):
return send_from_directory('static', path)
def handle_command(command):
# This function would interact with the Tidal app through the ElectronRemoteDebugger
# It should send the appropriate JavaScript to the Tidal app to perform the action
# associated with the command. The response from the Tidal app should be processed
# and returned to the caller.
print("Received Command: "+ command)
for w in (_ for _ in erb.windows()):
print(w)
try:
erb.eval(w, command)
except Exception as e:
logger.exception(e)
class LazyWebsocket(object):
def __init__(self, url):
self.url = url
self.ws = None
def _connect(self):
if not self.ws:
self.ws = websocket.create_connection(self.url)
return self.ws
def send(self, *args, **kwargs):
return self._connect().send(*args, **kwargs)
def recv(self, *args, **kwargs):
return self.ws.recv(*args, **kwargs)
def sendrcv(self, msg):
self.send(msg)
return self.recv()
def close(self):
self.ws.close()
class ElectronRemoteDebugger(object):
def __init__(self, host, port):
self.params = {'host': host, 'port': port}
def windows(self):
params = self.params.copy()
params.update({'ts': int(time.time())})
ret = []
for w in self.requests_get("http://%(host)s:%(port)s/json/list?t=%(ts)d" % params).json():
url = w.get("webSocketDebuggerUrl")
if not url:
continue
w['ws'] = LazyWebsocket(url)
ret.append(w)
return ret
def requests_get(self, url, tries=5, delay=1):
last_exception = Exception("failed to request after %d tries."%tries)
for _ in range(tries):
try:
return requests.get(url)
except requests.exceptions.ConnectionError as ce:
# ignore it
last_exception = ce
time.sleep(delay)
raise last_exception
def sendrcv(self, w, msg):
return w['ws'].sendrcv(msg)
def eval(self, w, expression):
data = {'id': 1,
'method': "Runtime.evaluate",
'params': {'contextId': 1,
'doNotPauseOnExceptionsAndMuteConsole': False,
'expression': expression,
'generatePreview': False,
'includeCommandLineAPI': True,
'objectGroup': 'console',
'returnByValue': False,
'userGesture': True}}
ret = json.loads(w['ws'].sendrcv(json.dumps(data)))
if "result" not in ret:
return ret
if ret['result'].get('wasThrown'):
raise Exception(ret['result']['result'])
return ret['result']
@classmethod
def execute(cls, path, port=None):
if port is None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 0))
port = sock.getsockname()[1]
sock.close()
cmd = "%s %s" % (path, "--remote-debugging-port=%d" % port)
print (cmd)
p = subprocess.Popen(cmd, shell=True)
time.sleep(0.5)
if p.poll() is not None:
raise Exception("Could not execute cmd (not found or already running?): %r"%cmd)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for _ in range(30):
result = sock.connect_ex(('localhost', port))
if result > 0:
break
time.sleep(1)
return cls("localhost", port=port)
def init(target):
global erb
erb = ElectronRemoteDebugger.execute(target, None)
windows_visited = set()
while True:
for w in (_ for _ in erb.windows() if _.get('id') not in windows_visited):
try:
with open("script.js", "r") as file:
script = file.read()
erb.eval(w, script)
except Exception as e:
logger.exception(e)
finally:
# patch windows only once
windows_visited.add(w.get('id'))
if all(w.get('id') in windows_visited for w in erb.windows()):
break
def run_server():
app.run(host='0.0.0.0', port=5000, debug=False)
if __name__ == "__main__":
target = "%LOCALAPPDATA%\TIDAL\TIDAL.exe"
os.system("taskkill /F /im TIDAL.exe")
init(target)
run_server()
"""
TidalClearPlayQueue()
var footerPlayer = document.getElementById('footerPlayer');
var PlayButton = document.evaluate(".//button[@aria-label='Pause']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue || document.evaluate(".//button[@aria-label='Play']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
var NextButton = document.evaluate(".//button[@aria-label='Next']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
var PreviousButton = document.evaluate(".//button[@aria-label='Previous']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
var PreviousButton = document.evaluate(".//button[@aria-label='Shuffle']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
var PreviousButton = document.evaluate(".//button[@aria-label='Repeat']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
var LikeButton = document.evaluate(".//button[@aria-label='Add to My Collection']", footerPlayer, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
"""