-
Notifications
You must be signed in to change notification settings - Fork 0
/
heet_export_cli.py
187 lines (152 loc) · 5.77 KB
/
heet_export_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import argparse
import ee
import logging
import polling2
from tqdm import tqdm
from yaspin import yaspin
from yaspin.spinners import Spinners
parser = argparse.ArgumentParser(
usage="python heet_export_cli.py --results-path <path-to-results-folder-in-GEE> --drive-folder <folder-in-GDrive> --project <GEE-project-name>"
)
parser.add_argument(
"--results-path",
required=True,
help="Full path to the heet results folder on Earth Engine (must start from projects/<project-name>/...)"
)
# EE Export to Drive functionality is not well documented
# Important:
# - If drive folder does not exist it will be created in the TOP LEVEL of
# your google drive
# - If drive folder already exists as a top level folder
# OR as a subdirectory items will be added the existing folder
# - If multiple drive folders of the same name exist, behaviour is unclear
# but files will likely be added to the most recently modified folder
parser.add_argument(
"--drive-folder",
default=os.getcwd(),
help="Name of Google Drive Folder to save to",
)
parser.add_argument(
"--project",
type=str,
required=True,
help="Name of Earth Engine cloud project to use",
)
args = parser.parse_args()
results_path = args.results_path
drive_folder = args.drive_folder
project = args.project
ee.Initialize(project=project)
# Importing from delineator needs to be done after ee.Initialize
from delineator import heet_config as cfg
from delineator import heet_log as lg
from delineator import heet_task
from delineator import heet_monitor as mtr
lg.log_file_name = "heet_export.log"
# Warning - overwriting ipmorted config data.
cfg.output_drive_folder = drive_folder
# ==============================================================================
# Set up logger
# ==============================================================================
# Create new log each run (TODO; better implementation)
with open(lg.log_file_name, "w") as file:
pass
# Gets or creates a logger
logger = logging.getLogger(__name__)
# set log level
logger.setLevel(logging.DEBUG)
# define file handler and set formatter
file_handler = logging.FileHandler(lg.log_file_name)
formatter = logging.Formatter("%(asctime)s : %(levelname)s : %(name)s : %(message)s")
file_handler.setFormatter(formatter)
# add file handler to logger
logger.addHandler(file_handler)
def update_sp_success(sp):
sp.color = "cyan"
sp.ok("*")
return sp
def update_sp_fail(sp):
sp.color = "red"
sp.fail("!")
return sp
def update_sp_err_fatal(sp):
sp.write("")
sp.write(" [ERROR] HEET EXPORTER encountered a fatal error and will exit")
sp.write("")
sp.write("Thank you for using HEET EXPORTER!")
return sp
def update_sp_warn_skip(sp):
sp.write("")
sp.write(
" [WARNING] HEET EXPORTER encountered an problem and will skip Export step"
)
sp.write(" [WARNING] Check heet_export.log for further details.")
sp.write("")
sp.write("")
return sp
def update_sp_err_time(sp):
current_active_analyses = ",".join([str(i) for i in mtr.active_analyses])
sp.write(
" [WARNING] Analysis wait time limit exceeded. Cancelling all unfinished HEET EXPORTER"
)
sp.write(f" tasks (active analysis ids: {current_active_analyses})")
sp.write("")
return sp
def update_sp_inf_service(sp):
sp.write("")
sp.write(" [INFO] Service Account Authenticated; Skipping step")
sp.write("")
return sp
if __name__ == "__main__":
breakpoint = False
# ==========================================================================
if breakpoint:
import sys
print(f"Exporting results folder {results_path} to Google Drive folder {cfg.output_drive_folder}")
sys.exit("STOP")
# Exporting Results
# ==========================================================================
step_desc = f"Exporting results folder {results_path} to Google Drive folder {cfg.output_drive_folder}"
with yaspin(Spinners.line, text=step_desc, color="yellow") as sp:
try:
if "CI_ROBOT_USER" in os.environ:
sp = update_sp_inf_service(sp)
else:
heet_task.export_to_drive(export_from_path=results_path)
except Exception as error:
# Handles any issue, including connectivity
print("Raised exception")
print(error)
sp = update_sp_fail(sp)
sp = update_sp_warn_skip(sp)
else:
sp = update_sp_success(sp)
mtr.active_exports = list(mtr.active_export_tasks_log.keys())
export_size = len(mtr.active_exports)
remaining_export_size = export_size
pbar = tqdm(total=export_size, ncols=80)
keep_going = 1
while (len(mtr.active_exports) > 0) and (keep_going == 1):
try:
heet_task.wait_until_exports()
active_count = len(mtr.active_exports)
new_exports = remaining_export_size - active_count
remaining_export_size = active_count
pbar.update(new_exports)
except polling2.TimeoutException:
keep_going = 0
sp = update_sp_fail(sp)
sp = update_sp_err_time(sp)
logger.info(
"(Waiting) Analysis wait time limit exceeded. Cancelling all unfinished HEET tasks."
)
try:
heet_task.kill_all_heet_tasks()
except Exception:
sp = update_sp_err_fatal(sp)
sys.exit()
except Exception:
sp = update_sp_fail(sp)
sys.exit()
pbar.close()