-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
upload_zoom_recordings.py
296 lines (236 loc) · 11.1 KB
/
upload_zoom_recordings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#!/usr/bin/env python
#
# Description:
#
# This script will download cloud recordings from Zoom Meetings, and
# upload them to YouTube.
#
# Usage:
#
# python scripts/upload_zoom_recordings.py
#
# Environment Variables:
#
# EDGI_ZOOM_CLIENT_ID - Client ID for the Zoom OAuth app for this script
# ZOOM_CLIENT_SECRET - Client Secret for the Zoom OAuth app for this script
# ZOOM_ACCOUNT_ID - Account ID for the Zoom OAuth app for this script
# EDGI_ZOOM_DELETE_AFTER_UPLOAD - If set to 'true', cloud recording will be
# deleted after upload to YouTube.
#
# Configuration:
#
# This script expects one file to be available to enable YouTube upload:
#
# * `.youtube-upload-credentials.json`
#
# See README for how to generate this files.
from datetime import datetime
import os
import re
import requests
import subprocess
import sys
import tempfile
from typing import Dict
from urllib.parse import urlparse
from zoomus import ZoomClient
from lib.constants import VIDEO_CATEGORY_IDS, ZOOM_ROLES
from lib.youtube import get_youtube_client, upload_video, add_video_to_playlist, validate_youtube_credentials
YOUTUBE_CREDENTIALS_PATH = '.youtube-upload-credentials.json'
ZOOM_CLIENT_ID = os.environ['EDGI_ZOOM_CLIENT_ID']
ZOOM_CLIENT_SECRET = os.environ['EDGI_ZOOM_CLIENT_SECRET']
ZOOM_ACCOUNT_ID = os.environ['EDGI_ZOOM_ACCOUNT_ID']
MEETINGS_TO_RECORD = ['EDGI Community Standup']
DEFAULT_YOUTUBE_PLAYLIST = 'Uploads from Zoom'
DEFAULT_YOUTUBE_CATEGORY = 'Science & Technology'
DEFAULT_VIDEO_LICENSE = 'creativeCommon'
DO_FILTER = False
# Ignore users with names that match these patterns when determining if a
# meeting has any participants and its recordings should be preserved.
ZOOM_IGNORE_USER_NAMES = (
# The otter.ai notetaker bot is always present in most meetings.
re.compile(r'Otter\.ai', re.I),
)
def is_truthy(x):
return x.lower() in ['true', '1', 'y', 'yes']
ZOOM_DELETE_AFTER_UPLOAD = is_truthy(os.environ.get('EDGI_ZOOM_DELETE_AFTER_UPLOAD', ''))
DRY_RUN = is_truthy(os.environ.get('EDGI_DRY_RUN', ''))
class ZoomError(Exception):
def __init__(self, response, message=None):
try:
data = response.json()
except Exception:
data = {}
if not message:
message = data.pop('message', 'Zoom API error!')
data['http_status'] = response.status_code
full_message = f'{message} ({data!r}) Check the docs for details: https://developers.zoom.us/docs/api/.'
super().__init__(full_message)
@classmethod
def is_error(cls, response):
return response.status_code >= 400
@classmethod
def raise_if_error(cls, response, message=None):
if cls.is_error(response):
raise cls(response, message)
@classmethod
def parse_or_raise(cls, response, message=None) -> Dict:
cls.raise_if_error(response, message)
return response.json()
def fix_date(date_string: str) -> str:
date = date_string
index = date.find('Z')
date = date[:index] + '.0' + date[index:]
return date
def pretty_date(date_string: str) -> str:
return datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%SZ').strftime('%b %-d, %Y')
def download_zoom_file(client: ZoomClient, url: str, download_directory: str) -> str:
# Note the token info in the client isn't really *public*, but it's
# not explicitly private, either. Use `config[]` syntax instead of
# `config.get()` so we get an exception if things have changed and
# this data is no longer available.
r = requests.get(url, stream=True, headers={
'Authorization': f'Bearer {client.config['token']}'
})
r.raise_for_status()
resolved_url = r.url
filename = urlparse(resolved_url).path.split('/')[-1]
filepath = os.path.join(download_directory, filename)
if os.path.exists(filepath):
r.close()
return
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
return filepath
def meeting_had_no_participants(client: ZoomClient, meeting: Dict) -> bool:
participants = ZoomError.parse_or_raise(client.past_meeting.get_participants(meeting_id=meeting['uuid']))['participants']
return all(
any(p.search(u['name']) for p in ZOOM_IGNORE_USER_NAMES)
for u in participants
)
def recording_status(meeting: Dict) -> str:
for file in meeting['recording_files']:
if file['recording_end'] == '':
return 'ongoing'
elif file['status'] != 'completed':
return 'processing'
return 'ready'
def video_has_audio(file_path: str) -> bool:
"""Detect whether a video file has a non-silent audio track."""
result = subprocess.run([
'ffmpeg',
'-i', file_path,
# The `ebur128=peak` looks for the peak loudness level of the audio.
'-af', 'ebur128=peak=true',
'-f', 'null',
'-'
], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# No audio track.
if b'audio:0kib' in result.stdout.lower():
return False
# Selent audio. Note that this won't handle things like the low hiss of an
# empty room, which will report some low decibel level instead of `-inf`.
# In practice, this covers Zoom recordings where a mic was never turned on.
# Docs: https://ffmpeg.org/ffmpeg-filters.html#ebur128-1
if re.search(rb'Peak:\s+-inf', result.stdout):
return False
return True
def main():
if DRY_RUN:
print('⚠️ This is a dry run! Videos will not actually be uploaded.\n')
youtube = get_youtube_client(YOUTUBE_CREDENTIALS_PATH)
if not validate_youtube_credentials(youtube):
print(f'The credentials in {YOUTUBE_CREDENTIALS_PATH} were not valid!')
print('Please use `python scripts/auth.py` to re-authorize.')
return sys.exit(1)
zoom = ZoomClient(ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET, ZOOM_ACCOUNT_ID)
# Official meeting recordings we will upload belong to the account owner.
zoom_user_id = zoom.user.list(role_id=ZOOM_ROLES['owner']).json()['users'][0]['id']
with tempfile.TemporaryDirectory() as tmpdirname:
print(f'Creating tmp dir: {tmpdirname}\n')
meetings = ZoomError.parse_or_raise(zoom.recording.list(user_id=zoom_user_id))['meetings']
meetings = sorted(meetings, key=lambda m: m['start_time'])
# Filter recordings less than 1 minute
meetings = filter(lambda m: m['duration'] > 1, meetings)
for meeting in meetings:
print(f'Processing meeting: {meeting["topic"]} from {meeting["start_time"]} (ID: "{meeting['uuid']}")')
# 3. filter by criteria (no-op for now)
if meeting['topic'] not in MEETINGS_TO_RECORD and DO_FILTER:
print(' Skipping: meeting not in topic list.')
continue
status = recording_status(meeting)
if status != 'ready':
print(f' Skipping: recording is still {status}.')
continue
if meeting_had_no_participants(zoom, meeting):
print(' Deleting recording: nobody attended this meeting.')
if not DRY_RUN:
response = zoom.recording.delete(meeting_id=meeting['uuid'], action='trash')
if response.status_code < 300:
print(' 🗑️ Deleted recording.')
else:
print(f' ❌ {ZoomError(response)}')
continue
videos = [file for file in meeting['recording_files']
if file['file_type'].lower() == 'mp4']
if len(videos) == 0:
print(' 🔹 Skipping: no videos for meeting')
continue
elif any((file['file_size'] == 0 for file in videos)):
print(' 🔹 Skipping: meeting still processing')
continue
print(f' {len(videos)} videos to upload...')
for file in videos:
url = file['download_url']
print(f' Download from {url}...')
filepath = download_zoom_file(zoom, url, tmpdirname)
if video_has_audio(filepath):
recording_date = fix_date(meeting['start_time'])
title = f'{meeting["topic"]} - {pretty_date(meeting["start_time"])}'
print(f' Uploading {filepath}\n {title=}\n {recording_date=}')
if not DRY_RUN:
video_id = upload_video(youtube,
filepath,
title=title,
category=VIDEO_CATEGORY_IDS["Science & Technology"],
license=DEFAULT_VIDEO_LICENSE,
recording_date=recording_date,
privacy_status='unlisted')
# Add all videos to default playlist
print(' Adding to main playlist: Uploads from Zoom')
if not DRY_RUN:
add_video_to_playlist(youtube, video_id, title=DEFAULT_YOUTUBE_PLAYLIST, privacy='unlisted')
# Add to additional playlists
playlist_name = ''
if any(x in meeting['topic'].lower() for x in ['web mon', 'website monitoring', 'wm']):
playlist_name = 'Website Monitoring'
if 'data together' in meeting['topic'].lower():
playlist_name = 'Data Together'
if 'community call' in meeting['topic'].lower():
playlist_name = 'Community Calls'
if 'edgi introductions' in meeting['topic'].lower():
playlist_name = 'EDGI Introductions'
if 'all-edgi' in meeting['topic'].lower():
playlist_name = 'All-EDGI Meetings'
if playlist_name:
print(f' Adding to call playlist: {playlist_name}')
if not DRY_RUN:
add_video_to_playlist(youtube, video_id, title=playlist_name, privacy='unlisted')
# TODO: save the chat log transcript in a comment on the video.
else:
print(' Skipping upload: video was silent (no mics were on).')
if ZOOM_DELETE_AFTER_UPLOAD and not DRY_RUN:
# Just delete the video for now, since that takes the most storage space.
response = zoom.recording.delete_single_recording(
meeting_id=file['meeting_id'],
recording_id=file['id'],
action='trash'
)
if response.status_code == 204:
print(f' 🗑️ Deleted {file["file_type"]} file from Zoom for recording: {meeting["topic"]}')
else:
print(f' ❌ {ZoomError(response)}')
if __name__ == '__main__':
main()