-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
193 lines (174 loc) · 7.26 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import json
import threading
import fastapi
import os
from datetime import datetime
from netifaces import interfaces, ifaddresses, AF_INET
import requests
from urllib.parse import quote
import base64
import time
import math
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from fastapi import Body, FastAPI, Header, Path, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.encoders import jsonable_encoder
from jinja2 import Template
from typing import Any, AnyStr, Optional, Union, List, Dict
from mfrc522 import SimpleMFRC522
import RPi.GPIO as GPIO
# GPIO.setwarnings(False)
JSONBody = Union[List[Any], Dict[AnyStr, Any]]
def get_temporary_token(client_id, client_secret, code, redirect_uri):
req = requests.post('https://accounts.spotify.com/api/token', data={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri
}, headers={
'Authorization': 'Basic ' + base64.b64encode(f'{client_id}:{client_secret}'.encode()).decode(),
'Content-Type': 'application/x-www-form-urlencoded'
})
req = json.loads(req.text)
req['expires_at'] = math.floor(time.time()) + req['expires_in']
open('.cache', 'w').write(json.dumps(req, indent=2))
return req
def read_html(filename):
return '\n'.join(open(f'templates/{filename}.html', 'r').readlines())
ALL_IPS = []
for iface in interfaces():
for x in [inet['addr'] for inet in ifaddresses(iface).setdefault(AF_INET, [{'addr': 'N/A'}]) if inet['addr'] not in ['127.0.0.1', 'N/A']]:
if x not in ALL_IPS:
ALL_IPS.append(x)
CONFIG = {
'client_id': '',
'client_secret': '',
'tags': []
}
ip_addr = os.popen('hostname -I').read().strip()
REDIRECT_URI = f'http://{ip_addr}:8000/callback'
SCOPE = ['user-library-read', 'user-modify-playback-state', 'user-read-playback-state', 'user-read-currently-playing', 'user-follow-modify', 'user-follow-read', 'user-read-recently-played', 'user-read-playback-position', 'user-top-read', 'playlist-read-collaborative', 'playlist-modify-public', 'playlist-read-private', 'playlist-modify-private', 'app-remote-control', 'streaming', 'user-read-email', 'user-read-private', 'user-library-modify', 'user-library-read']
SPOTIPY_CLIENT = None
RFID_READER = SimpleMFRC522()
if os.path.exists('config.json'):
CONFIG = json.load(open('config.json', 'r'))
SPOTIPY_CLIENT = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=CONFIG['client_id'], client_secret=CONFIG['client_secret'], redirect_uri=REDIRECT_URI, scope=SCOPE))
READER_BUSY = False
LAST_TAPPED = None
def read_rfid_tag():
global RFID_READER
tag_id = None
print('Trying to read tag, please wait...')
tag_id = str(RFID_READER.read()[0])
print('Tag has been read')
try:
tag_id = str(RFID_READER.read()[0])
except Exception as e:
print('ERROR:', e)
finally:
print('Cleaning up...')
GPIO.cleanup()
return tag_id
def read_html(filename: str = None, data: dict = None):
if not data:
return '\n'.join(open(f'templates/{filename}.html', 'r').readlines())
return Template('\n'.join(open(f'templates/{filename}.html', 'r').readlines())).render(data)
app = FastAPI(docs_url=None, redoc_url=None)
app.add_middleware(
CORSMiddleware, allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'])
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
@app.get('/')
def index(request: fastapi.Request):
if os.path.exists('config.json'):
return templates.TemplateResponse(name='template.html', context={'request': request, 'page_content': read_html('index', {'num_tags': len(CONFIG['tags'])}), 'page_title': 'Home'})
return RedirectResponse('/setup', 307)
@app.get('/tags')
def index(request: fastapi.Request):
if os.path.exists('config.json'):
return templates.TemplateResponse(name='template.html', context={'request': request, 'page_content': read_html('tags'), 'page_title': 'Tags'})
return RedirectResponse('/setup', 307)
@app.post('/config')
def save_config(body: JSONBody = None):
global SPOTIPY_CLIENT
global CONFIG
data = {}
if os.path.exists('config.json'):
data = json.load(open('config.json', 'r'))
body = jsonable_encoder(body)
for key in list(body.keys()):
data[key] = body[key]
open('config.json', 'w').write(json.dumps(data, indent=2))
CONFIG = data
return {'success': True}
@app.get('/callback')
def callback(code: str = None):
global CONFIG
global SPOTIPY_CLIENT
if not code:
return HTTPException(400)
x = get_temporary_token(CONFIG['client_id'], CONFIG['client_secret'], code, REDIRECT_URI)
return x
@app.get('/{item_type}/{item_id}')
def get_information(item_type: str, item_id: str):
global SPOTIPY_CLIENT
if item_type == 'album':
return SPOTIPY_CLIENT.album(item_id)
elif item_type == 'playlist':
return SPOTIPY_CLIENT.playlist(item_id)
@app.get('/token')
def retrieve_spotify_token():
global SPOTIPY_CLIENT
SPOTIPY_CLIENT = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=CONFIG['client_id'], client_secret=CONFIG['client_secret'], redirect_uri=REDIRECT_URI, scope=SCOPE, open_browser=True))
return {'success': True, 'config': CONFIG, 'token': json.load(open('.cache', 'r'))}
@app.get('/setup', response_class=HTMLResponse)
def setup(request: fastapi.Request, page: str = None):
if not page:
return templates.TemplateResponse(name='setup/intro.html', context={'request': request})
return templates.TemplateResponse(name=f'setup/{page}.html', context={'request': request, 'ip_addr': 'ip_addr', 'redirect_uri': REDIRECT_URI, 'oauth_url': f'https://accounts.spotify.com/authorize?response_type=code&client_id={CONFIG["client_id"]}&scope={" ".join(SCOPE)}&redirect_uri={REDIRECT_URI}&state=ekko'})
@app.get('/read_tag')
def read_tag():
global READER_BUSY
READER_BUSY = True
print('Reading tag @ /read_tag...')
return_data = {'data': read_rfid_tag()}
READER_BUSY = False
return return_data
@app.post('/tags')
def create_tag(body: JSONBody = None):
global CONFIG
if 'tags' not in CONFIG:
CONFIG['tags'] = []
body = jsonable_encoder(body)
CONFIG['tags'].append(body)
open('config.json', 'w').write(json.dumps(CONFIG, indent=2))
return {'success': True, 'data': body}
def tag_reading_daemon():
global READER_BUSY
global SPOTIPY_CLIENT
global LAST_TAPPED
time.sleep(30)
while True:
if READER_BUSY:
print('Reader is currently busy, waiting...')
time.sleep(10)
continue
print('Reading tag')
tag_id = read_rfid_tag()
if tag_id:
try:
uri = [tag['uri'] for tag in CONFIG['tags'] if tag['tag_id'] == tag_id][0]
print('Detected ' + uri)
if uri != LAST_TAPPED:
SPOTIPY_CLIENT.start_playback(context_uri=uri)
LAST_TAPPED = uri
except:
pass
time.sleep(5)
threading.Thread(target=tag_reading_daemon).start()