-
Notifications
You must be signed in to change notification settings - Fork 0
/
update_today.py
229 lines (172 loc) · 6.2 KB
/
update_today.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import logging
import os
import shlex
import subprocess
from datetime import date, timedelta
from pathlib import Path
import luigi
from git import Repo
from luigi.local_target import LocalTarget
from luigi.mock import MockTarget
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s || %(levelname)s || %(name)s || %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("Update")
FILE = Path(__file__).absolute()
WORKING_DIR = FILE.parent
WEB_DIR = WORKING_DIR.parent / "wckdouglas.github.io"
FIRST_DAY = date(2020, 4, 12)
TODAY = date.today()
class GetData(luigi.Task):
"""
For a given date (e.g. "2021-05-01"), retrieve data and save it to data/
"""
day = luigi.Parameter()
force = luigi.BoolParameter(default=False)
def rquires(self):
return [CovidPull(force=self.force)]
def output(self):
return luigi.LocalTarget(WORKING_DIR / "data/{}.tsv".format(self.day))
def run(self): # work if SGE
cmd = "python dashboard.py get --date {}".format(self.day)
logger.debug("Getting {}".format(self.output().path))
with self.output().open("w") as out:
subprocess.run(shlex.split(cmd), stdout=out)
class CovidPull(luigi.Task):
"""
git pull the covid data repo
"""
force = luigi.BoolParameter(default=False)
output_file = WEB_DIR / "git_pull_covid"
if force and os.path.isfile(output_file):
os.remove(output_file)
def output(self):
return luigi.LocalTarget(self.output)
def run(self):
git_sync(WORKING_DIR, action="pull")
with open(self.output().path, "w") as out:
print("git_pulled_covid", file=out)
class SyncRepo(luigi.Task):
"""
2. git add the newly added file in data/
3. git push
"""
force = luigi.BoolParameter(default=False)
output_file = WEB_DIR / "git_dashboard_sync"
if force and os.path.isfile(output_file):
os.remove(output_file)
def output(self):
return luigi.LocalTarget(self.output_file)
def requires(self):
return [
GetData(day=str(day), force=self.force)
for day in date_range(FIRST_DAY, TODAY)
]
def run(self):
with Repo(WORKING_DIR) as repo:
index = repo.index
for task in self.requires():
if os.stat(task.output().path).st_size > 0:
index.add(task.output().path)
logger.info("Added {}".format(task.output().path))
index.commit("Added %s" % task.output().path)
git_sync(WORKING_DIR, action="push")
with open(self.output().path, "w") as out:
print("git_push", file=out)
class UpdateDashboard(luigi.Task):
"""
refresh the dash board
:param force (bool): remove the existing dashboard.html and rerun this step
"""
force = luigi.BoolParameter(default=False)
output_file = "dashboard.html"
if force and os.path.isfile(output_file):
os.remove(output_file)
def requires(self):
return [SyncRepo(force=self.force)]
def output(self):
return luigi.LocalTarget(self.output_file)
def run(self):
logger.info("run here")
update_cmd = f"poetry run python dashboard.py update -o {self.output().path} --datadir data"
logger.info(f"Running: {update_cmd}")
subprocess.call(shlex.split(update_cmd))
class WebSitePull(luigi.Task):
"""
git pull the website repo
"""
def output(self):
return MockTarget("git_pull_website", mirror_on_stderr=True)
def run(self):
git_sync(WEB_DIR, action="pull")
with self.output().open("w") as out:
print("git pull website", file=out)
class UpdateWebSite(luigi.Task):
"""
copy the newly made dashboard.html to website repo
:param force (bool): remove the dashboard html file from website repo for rerunning
"""
force = luigi.BoolParameter(default=False)
output_file = WEB_DIR / "_includes/COVID.html"
if force and os.path.isfile(output_file):
os.remove(output_file)
def requires(self):
return [WebSitePull(), UpdateDashboard(force=self.force)]
def output(self):
return luigi.LocalTarget(self.output_file)
def run(self):
with self.output().open("w") as outfile:
with UpdateDashboard().output().open("r") as infile:
for line in infile:
print(line.strip().replace("<!DOCTYPE html>", ""), file=outfile)
class PushWebSite(luigi.Task):
force = luigi.BoolParameter(default=False)
output_file = WEB_DIR / "git_push"
if force and os.path.isfile(output_file):
os.remove(output_file)
def requires(self):
return [UpdateWebSite(force=self.force)]
def output(self):
return luigi.LocalTarget(self.output_file)
def run(self):
os.chdir(WEB_DIR)
with Repo(WEB_DIR) as web_repo:
web_repo.index.add(self.requires()[0].output().path)
web_repo.index.commit("Updated {}".format(TODAY))
git_sync(WEB_DIR, action="push")
with self.output().open("w") as out:
print("pushed website", file=out)
def date_range(date1, date2):
"""
copy from https://www.w3resource.com/python-exercises/date-time-exercise/python-date-time-exercise-50.php
:param date1 (datetime.date): first starting date of the range
:param date2 (datetime.date): ending date of the range
"""
for n in range(int((date2 - date1).days) + 1):
yield date1 + timedelta(n)
def git_sync(dir: str, action: str = "pull"):
"""
do a git pull in the give directory
:param dir (str): a git directory
:param action (str): {'pull','push'}
"""
assert action in {"pull", "push"}
with Repo(dir) as repo:
for remote in repo.remotes:
if remote.name == "origin":
if action == "pull":
remote.pull()
else:
remote.push()
logger.info("git {} {}".format(action, dir))
if __name__ == "__main__":
luigi.build(
[PushWebSite(force=True)],
local_scheduler=True,
log_level="INFO",
workers=1,
detailed_summary=True,
scheduler_port=2020,
)