diff --git a/couler/core/proto_repr.py b/couler/core/proto_repr.py index 4aa785b3..40b80c8b 100644 --- a/couler/core/proto_repr.py +++ b/couler/core/proto_repr.py @@ -62,6 +62,7 @@ def step_repr( secret=None, action=None, volume_mounts=None, + cache=None, ): assert step_name is not None assert tmpl_name is not None @@ -80,6 +81,11 @@ def step_repr( pb_secret.key = k pb_secret.name = secret.name + if cache is not None: + pb_step.cache.name = cache.name + pb_step.cache.key = cache.key + pb_step.cache.max_age = cache.max_age + # image can be None if manifest specified. if image is not None: pb_step.container_spec.image = image diff --git a/couler/core/run_templates.py b/couler/core/run_templates.py index d477056c..bafb6631 100644 --- a/couler/core/run_templates.py +++ b/couler/core/run_templates.py @@ -188,6 +188,7 @@ def run_script( env=env, resources=resources, volume_mounts=volume_mounts, + cache=cache, ) proto_repr.add_deps_to_step(step_name) return rets @@ -340,6 +341,7 @@ def run_container( resources=resources, secret=states.get_secret(secret), volume_mounts=volume_mounts, + cache=cache, ) proto_repr.add_deps_to_step(step_name) return rets @@ -441,12 +443,15 @@ def run_job( action=action, success_cond=success_condition, failure_cond=failure_condition, + cache=cache, ) proto_repr.add_deps_to_step(step_name) return rets -def run_canned_step(name, args, inputs=None, outputs=None, step_name=None): +def run_canned_step( + name, args, inputs=None, outputs=None, step_name=None, cache=None +): func_name, caller_line = utils.invocation_location() func_name = ( utils.argo_safe_name(step_name) if step_name is not None else func_name @@ -457,6 +462,7 @@ def run_canned_step(name, args, inputs=None, outputs=None, step_name=None): tmpl_args = [] if states._outputs_tmp is not None: tmpl_args.extend(states._outputs_tmp) + pb_step = None if proto_repr: pb_step = proto_repr.step_repr( # noqa: F841 input=inputs, @@ -466,6 +472,7 @@ def run_canned_step(name, args, inputs=None, outputs=None, step_name=None): step_name=step_name, tmpl_name=step_name + "-tmpl", args=tmpl_args, + cache=cache, ) proto_repr.add_deps_to_step(step_name) return pb_step diff --git a/couler/tests/proto_repr_test.py b/couler/tests/proto_repr_test.py index dfb5149b..7d4982d5 100644 --- a/couler/tests/proto_repr_test.py +++ b/couler/tests/proto_repr_test.py @@ -35,12 +35,18 @@ def echo(): image="docker/whalesay:latest", source=echo, resources={"cpu": "2", "memory": "1Gi"}, + cache=couler.Cache( + name="cache-name", key="cache-key", max_age="60s" + ), ) proto_wf = get_default_proto_workflow() s = proto_wf.steps[0].steps[0] self.assertFalse(s.HasField("resource_spec")) self.assertEqual(s.script, '\nprint("echo")\n') self.assertEqual(s.container_spec.resources["cpu"], "2") + self.assertEqual(s.cache.name, "cache-name") + self.assertEqual(s.cache.key, "cache-key") + self.assertEqual(s.cache.max_age, "60s") def test_canned_step(self): couler.run_canned_step(name="test", args={"k1": "v1", "k2": "v2"})