-
Notifications
You must be signed in to change notification settings - Fork 0
/
recipe_taste.py
executable file
·148 lines (126 loc) · 4.01 KB
/
recipe_taste.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Mar 16 12:42:22 2021
@author: edwardcui
"""
import os
import argparse
import subprocess
from recipe_utils import get_metadata, get_config, run_shell_command
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"metadata_yaml",
type=str,
help="Path to metadata.yaml file",
)
parser.add_argument(
"-t",
"--type",
type=str,
help="Recipe type, either tfx [default] or zenml",
choices=["tfx", "zenml", "auto"],
default="tfx",
dest="pipeline_type",
)
parser.add_argument(
"--update",
action="store_true",
dest="update_pipeline",
)
def set_tfx_environments(pipeline_path):
"""Set up the local environments."""
# pip install -r requirements.txt
print("Installing requirements.txt")
returncode, _ = run_shell_command(
["pip", "install", "--user", "-r", os.path.join(pipeline_path, "requirements.txt")],
verbose=True,
)
if returncode == 0:
print("Successfully installed requirements.txt")
else:
raise (Exception("requirements.txt not successfully installed"))
# Add the bin path to the running directory
bin_path = os.path.realpath(os.path.join(pipeline_path, "bin"))
if bin_path not in os.environ["PATH"].split(":"):
os.environ["PATH"] += f":{bin_path}"
def parse_tfx_run_output(stdout):
return {"asf": 1}
def taste_tfx_recipe(metadata_yaml, update=False, engine="kubeflow"):
# Get the path of the pipeline
pipeline_path = os.path.dirname(metadata_yaml)
# Set some envrionments
if not update:
set_tfx_environments(pipeline_path)
# Get some global variables
metadata = get_metadata(metadata_yaml)
PIPELINE_NAME = metadata["pipeline_name"]
system_config = get_config(metadata, "system_configurations")
CUSTOM_TFX_IMAGE = (
"gcr.io/" + system_config["GOOGLE_CLOUD_PROJECT"] + "/" + system_config["TFX_IMAGE_REPO_NAME"]
)
ENDPOINT = system_config["ENDPOINT"]
KUBEFLOW_RUNNER = os.path.join(pipeline_path, system_config["KUBEFLOW_RUNNER"])
# Prepare the pipeline running commands
if update:
# Update the pipeline
init_command = [
"tfx",
"pipeline",
"update",
f"--pipeline-path={KUBEFLOW_RUNNER}",
f"--endpoint={ENDPOINT}",
f"--engine={engine}",
]
else:
# Create the pipeline
init_command = [
"tfx",
"pipeline",
"create",
f"--pipeline-path={KUBEFLOW_RUNNER}",
f"--endpoint={ENDPOINT}",
f"--engine={engine}" f"--build-target-image={CUSTOM_TFX_IMAGE}",
]
# Run the pipeline
run_command = [
"tfx",
"run",
"create",
f"--pipeline-name={PIPELINE_NAME}",
f"--endpoint={ENDPOINT}",
]
# Execute the pipeline
run_shell_command(init_command, verbose=True)
_, stdout = run_shell_command(run_command, verbose=True)
# Parse outputs
parsed_outputs = parse_tfx_run_output(stdout)
print(parsed_outputs)
def taste_zenml_recipe(metadata_yaml, update=False):
pass
def test_taste_tfx_recipe():
metadata_yaml = (
"/Users/edwardcui/Documents/Scripts/pipeline_utils/tfx_template/metadata.yaml"
)
taste_tfx_recipe(metadata_yaml)
if __name__ == "__main__":
args = parser.parse_args()
print(args.__dict__)
if args.pipeline_type == "tfx":
taste_tfx_recipe(args.metadata_yaml, args.update_pipeline)
elif args.pipeline_type == "zenml":
taste_zenml_recipe(args.metadata_yaml, args.update_pipeline)
elif args.pipeline_type == "auto":
# auto detect pipeline type based on structure
raise (
NotImplementedError(
f"Taste for pipeline type '{args.pipeline_type}' is not implemented"
)
)
else:
raise (
NotImplementedError(
f"Taste for pipeline type '{args.pipeline_type}' is not implemented"
)
)