diff --git a/docs/examples/workflows-examples.md b/docs/examples/workflows-examples.md index a0c56e5e7..f5bbff610 100644 --- a/docs/examples/workflows-examples.md +++ b/docs/examples/workflows-examples.md @@ -107,7 +107,8 @@ Explore the examples through the side bar! | [template-defaults](https://github.com/argoproj/argo-workflows/blob/main/examples/template-defaults.yaml) | | [testvolume](https://github.com/argoproj/argo-workflows/blob/main/examples/testvolume.yaml) | | [timeouts-step](https://github.com/argoproj/argo-workflows/blob/main/examples/timeouts-step.yaml) | -| [title-and-descriptin-with-markdown](https://github.com/argoproj/argo-workflows/blob/main/examples/title-and-descriptin-with-markdown.yaml) | +| [title-and-description-with-markdown](https://github.com/argoproj/argo-workflows/blob/main/examples/title-and-description-with-markdown.yaml) | +| [withsequence-nested-result](https://github.com/argoproj/argo-workflows/blob/main/examples/withsequence-nested-result.yaml) | | [work-avoidance](https://github.com/argoproj/argo-workflows/blob/main/examples/work-avoidance.yaml) | | [workflow-count-resourcequota](https://github.com/argoproj/argo-workflows/blob/main/examples/workflow-count-resourcequota.yaml) | | [workflow-event-binding/event-consumer-workfloweventbinding](https://github.com/argoproj/argo-workflows/blob/main/examples/workflow-event-binding/event-consumer-workfloweventbinding.yaml) | diff --git a/docs/examples/workflows/experimental/script_pydantic_io.md b/docs/examples/workflows/experimental/script_runner_io.md similarity index 53% rename from docs/examples/workflows/experimental/script_pydantic_io.md rename to docs/examples/workflows/experimental/script_runner_io.md index 1bc6d4320..d631cf038 100644 --- a/docs/examples/workflows/experimental/script_pydantic_io.md +++ b/docs/examples/workflows/experimental/script_runner_io.md @@ -1,4 +1,4 @@ -# Script Pydantic Io +# Script Runner Io @@ -14,7 +14,8 @@ from pydantic import BaseModel from hera.shared import global_config - from hera.workflows import Artifact, ArtifactLoader, Parameter, Workflow, script + from hera.workflows import Artifact, ArtifactLoader, Parameter, Steps, Workflow, script + from hera.workflows.archive import NoneArchiveStrategy from hera.workflows.io import RunnerInput, RunnerOutput try: @@ -27,7 +28,7 @@ class MyObject(BaseModel): - a_dict: dict = {} + a_dict: dict # not giving a default makes the field a required input for the template a_str: str = "a default string" @@ -44,7 +45,12 @@ artifact_int: Annotated[int, Artifact(name="artifact-output")] - @script(constructor="runner") + @script(constructor="runner", image="python-image-built-with-my-package") + def writer() -> Annotated[int, Artifact(name="int-artifact", archive=NoneArchiveStrategy())]: + return 100 + + + @script(constructor="runner", image="python-image-built-with-my-package") def pydantic_io( my_input: MyInput, ) -> MyOutput: @@ -52,7 +58,17 @@ with Workflow(generate_name="pydantic-io-") as w: - pydantic_io() + with Steps(name="use-pydantic-io"): + write_step = writer() + pydantic_io( + arguments=[ + write_step.get_artifact("int-artifact").with_name("artifact-input"), + { + "param_int": 101, + "an_object": MyObject(a_dict={"my-new-key": "my-new-value"}), + }, + ] + ) ``` === "YAML" @@ -64,6 +80,46 @@ generateName: pydantic-io- spec: templates: + - name: use-pydantic-io + steps: + - - name: writer + template: writer + - - arguments: + artifacts: + - from: '{{steps.writer.outputs.artifacts.int-artifact}}' + name: artifact-input + parameters: + - name: param_int + value: '101' + - name: an_object + value: '{"a_dict": {"my-new-key": "my-new-value"}, "a_str": "a default + string"}' + name: pydantic-io + template: pydantic-io + - name: writer + outputs: + artifacts: + - archive: + none: {} + name: int-artifact + path: /tmp/hera-outputs/artifacts/int-artifact + script: + args: + - -m + - hera.workflows.runner + - -e + - examples.workflows.experimental.script_runner_io:writer + command: + - python + env: + - name: hera__script_annotations + value: '' + - name: hera__outputs_directory + value: /tmp/hera-outputs + - name: hera__script_pydantic_io + value: '' + image: python-image-built-with-my-package + source: '{{inputs.parameters}}' - inputs: artifacts: - name: artifact-input @@ -87,7 +143,7 @@ - -m - hera.workflows.runner - -e - - examples.workflows.experimental.script_pydantic_io:pydantic_io + - examples.workflows.experimental.script_runner_io:pydantic_io command: - python env: @@ -97,7 +153,7 @@ value: /tmp/hera-outputs - name: hera__script_pydantic_io value: '' - image: python:3.8 + image: python-image-built-with-my-package source: '{{inputs.parameters}}' ``` diff --git a/docs/examples/workflows/template_level_volume.md b/docs/examples/workflows/template_level_volume.md index 883e6758f..a4c1fe017 100644 --- a/docs/examples/workflows/template_level_volume.md +++ b/docs/examples/workflows/template_level_volume.md @@ -73,35 +73,17 @@ See https://argo-workflows.readthedocs.io/en/latest/walk-through/volumes/ spec: entrypoint: generate-and-use-volume templates: - - name: generate-and-use-volume - steps: - - - name: generate-volume - template: generate-volume - arguments: - parameters: - - name: pvc-size - # In a real-world example, this could be generated by a previous workflow step. - value: '1Gi' - - - name: generate - template: whalesay - arguments: - parameters: - - name: pvc-name - value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' - - - name: print - template: print-message - arguments: - parameters: - - name: pvc-name - value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' - - - name: generate-volume - inputs: + - inputs: + parameters: + - name: pvc-size + name: generate-volume + outputs: parameters: - - name: pvc-size + - name: pvc-name + valueFrom: + jsonPath: '{.metadata.name}' resource: action: create - setOwnerReference: true manifest: | apiVersion: v1 kind: PersistentVolumeClaim @@ -112,42 +94,62 @@ See https://argo-workflows.readthedocs.io/en/latest/walk-through/volumes/ resources: requests: storage: '{{inputs.parameters.pvc-size}}' - outputs: - parameters: - - name: pvc-name - valueFrom: - jsonPath: '{.metadata.name}' - - - name: whalesay - inputs: - parameters: - - name: pvc-name - volumes: - - name: workdir - persistentVolumeClaim: - claimName: '{{inputs.parameters.pvc-name}}' - container: + setOwnerReference: true + - container: + args: + - echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt + command: + - sh + - -c image: docker/whalesay:latest - command: [sh, -c] - args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"] volumeMounts: - - name: workdir - mountPath: /mnt/vol - - - name: print-message + - mountPath: /mnt/vol + name: workdir inputs: - parameters: - - name: pvc-name + parameters: + - name: pvc-name + name: whalesay volumes: - - name: workdir - persistentVolumeClaim: - claimName: '{{inputs.parameters.pvc-name}}' - container: + - name: workdir + persistentVolumeClaim: + claimName: '{{inputs.parameters.pvc-name}}' + - container: + args: + - echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt + command: + - sh + - -c image: alpine:latest - command: [sh, -c] - args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"] volumeMounts: - - name: workdir - mountPath: /mnt/vol + - mountPath: /mnt/vol + name: workdir + inputs: + parameters: + - name: pvc-name + name: print-message + volumes: + - name: workdir + persistentVolumeClaim: + claimName: '{{inputs.parameters.pvc-name}}' + - name: generate-and-use-volume + steps: + - - arguments: + parameters: + - name: pvc-size + value: 1Gi + name: generate-volume + template: generate-volume + - - arguments: + parameters: + - name: pvc-name + value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' + name: generate + template: whalesay + - - arguments: + parameters: + - name: pvc-name + value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' + name: print + template: print-message ``` diff --git a/docs/user-guides/script-annotations.md b/docs/user-guides/script-annotations.md new file mode 100644 index 000000000..e2a844c36 --- /dev/null +++ b/docs/user-guides/script-annotations.md @@ -0,0 +1,236 @@ +# Script Annotations + +Annotation syntax is an experimental feature using `typing.Annotated` for `Parameter`s and `Artifact`s to declare inputs +and outputs for functions decorated as `scripts`. They use `Annotated` as the type in the function parameters and allow +us to simplify writing scripts with parameters and artifacts that require additional fields such as a `description` or +alternative `name`. + +This feature must be enabled by setting the `experimental_feature` flag `script_annotations` on the global config. + +```py +global_config.experimental_features["script_annotations"] = True +``` + +## Parameters + +In Hera, we can currently specify inputs inside the `@script` decorator as follows: + +```python +@script( + inputs=[ + Parameter(name="an_int", description="an_int parameter", default=1, enum=[1, 2, 3]), + Parameter(name="a_bool", description="a_bool parameter", default=True, enum=[True, False]), + Parameter(name="a_string", description="a_string parameter", default="a", enum=["a", "b", "c"]) + ] +) +def echo_all(an_int=1, a_bool=True, a_string="a"): + print(an_int) + print(a_bool) + print(a_string) +``` + +Notice how the `name` and `default` values are duplicated for each `Parameter`. Using annotations, we can rewrite this +as: + +```python +@script() +def echo_all( + an_int: Annotated[int, Parameter(description="an_int parameter", enum=[1, 2, 3])] = 1, + a_bool: Annotated[bool, Parameter(description="a_bool parameter", enum=[True, False])] = True, + a_string: Annotated[str, Parameter(description="a_string parameter", enum=["a", "b", "c"])] = "a", +): + print(an_int) + print(a_bool) + print(a_string) +``` + +The fields allowed in the `Parameter` annotations are: `name`, `enum`, and `description`. + +## Artifacts + +> Note: `Artifact` annotations are only supported when used with the `RunnerScriptConstructor`. + + +The feature is even more powerful for `Artifact`s. In Hera we are currently able to specify `Artifact`s in `inputs`, but +the given path is not programmatically linked to the code within the function unless defined outside the scope of the +function: + +```python +@script(inputs=Artifact(name="my-artifact", path="/tmp/file")) +def read_artifact(): + with open("/tmp/file") as a_file: # Repeating "/tmp/file" is prone to human error! + print(a_file.read()) + +# or + +MY_PATH = "/tmp/file" # Now accessible outside of the function scope! +@script(inputs=Artifact(name="my-artifact", path=MY_PATH)) +def read_artifact(): + with open(MY_PATH) as a_file: + print(a_file.read()) +``` + +By using annotations we can avoid repeating the `path` of the file, and the function can use the variable directly as a +`Path` object, with its value already set to the given path: + +```python +@script(constructor="runner") +def read_artifact(an_artifact: Annotated[Path, Artifact(name="my-artifact", path="/tmp/file")]): + print(an_artifact.read_text()) +``` + +The fields allowed in the `Artifact` annotations are: `name`, `path`, and `loader`. + +## Artifact Loaders + +In case you want to load an object directly from the `path` of the `Artifact`, we allow two types of loaders besides the +default `Path` behaviour used when no loader is specified. The `ArtifactLoader` enum provides `file` and `json` loaders. + +### `None` loader +With `None` set as the loader (which is by default) in the Artifact annotation, the `path` attribute of `Artifact` is +extracted and used to provide a `pathlib.Path` object for the given argument, which can be used directly in the function +body. The following example is the same as above except for explicitly setting the loader to `None`: + +```python +@script(constructor="runner") +def read_artifact( + an_artifact: Annotated[Path, Artifact(name="my-artifact", path="/tmp/file", loader=None)] +): + print(an_artifact.read_text()) +``` + +### `file` loader + +When the loader is set to `file`, the function parameter type should be `str`, and will contain the contents string +representation of the file stored at `path` (essentially performing `path.read_text()` automatically): + +```python +@script(constructor="runner") +def read_artifact( + an_artifact: Annotated[str, Artifact(name="my-artifact", path="/tmp/file", loader=ArtifactLoader.file)] +) -> str: + return an_artifact +``` + +This loads the contents of the file at `"/tmp/file"` to the argument `an_artifact` and subsequently can be used as a +string inside the function. + +### `json` loader + +When the loader is set to `json`, the contents of the file at `path` are read and parsed to a dictionary via `json.load` +(essentially performing `json.load(path.open())` automatically). By specifying a Pydantic type, this dictionary can even +be automatically parsed to that type: + +```python +class MyArtifact(BaseModel): + a = "a" + b = "b" + + +@script(constructor="runner") +def read_artifact( + an_artifact: Annotated[MyArtifact, Artifact(name="my-artifact", path="/tmp/file", loader=ArtifactLoader.json)] +) -> str: + return an_artifact.a + an_artifact.b +``` + +Here, we have a json representation of `MyArtifact` such as `{"a": "hello ", "b": "world"}` stored at `"/tmp/file"`. We +can load it with `ArtifactLoader.json` and then use `an_artifact` as an instance of `MyArtifact` inside the function, so +the function will return `"hello world"`. + +### Function parameter name aliasing + +Script annotations can work on top of the `RunnerScriptConstructor` for name aliasing of function +parameters, in particular to allow a public `kebab-case` parameter, while using a `snake_case` +Python function parameter. When using a `RunnerScriptConstructor`, an environment variable +`hera__script_annotations` will be added to the Script template (visible in the exported YAML file). + +## Outputs + +> Note: Output annotations are only supported when used with the `RunnerScriptConstructor`. + +There are two ways to specify output Artifacts and Parameters. + +### Function return annotations + +Function return annotations can be used to specify the output type information for output Artifacts and Parameters, and +the function should return a value or tuple. An example can be seen +[here](../examples/workflows/experimental/script_annotations_outputs.md). + +For a simple hello world output artifact example we currently have: +```python +@script(outputs=Artifact(name="hello-artifact", path="/tmp/hello_world.txt")) +def hello_world(): + with open("/tmp/hello_world.txt", "w") as f: + f.write("Hello, world!") +``` + +The new approach allows us to avoid duplication of the path, which is now optional, and results in more readable code: +```python +@script() +def hello_world() -> Annotated[str, Artifact(name="hello-artifact")]: + return "Hello, world!" +``` + +For `Parameter`s we have a similar syntax: + +```python +@script() +def hello_world() -> Annotated[str, Parameter(name="hello-param")]: + return "Hello, world!" +``` + +The returned values will be automatically saved in files within the Argo container according to this schema: +* `/hera/outputs/parameters/` +* `/hera/outputs/artifacts/` + +These outputs are also exposed in the `outputs` section of the template in YAML. + +The object returned from the function can be of any serialisable Pydantic type (or basic Python type) and must be +`Annotated` as an `Artifact` or `Parameter`. The `Parameter`/`Artifact`'s `name` will be used for the path of the output unless provided: +* if the annotation is an `Artifact` with a `path`, we use that `path` +* if the annotation is a `Parameter`, with a `value_from` that contains a `path`, we use that `path` + +See the following two functions for specifying custom paths: + +```python +@script() +def hello_world() -> Annotated[str, Artifact(name="hello-artifact", path="/tmp/hello_world_art.txt")]: + return "Hello, world!" + +@script() +def hello_world() -> Annotated[str, Parameter(name="hello-param", value_from={"path": "/tmp/hello_world_param.txt"})]: + return "Hello, world!" +``` + +For multiple outputs, the return type should be a `Tuple` of arbitrary Pydantic types with individual +`Parameter`/`Artifact` annotations, and the function must return a tuple from the function matching these types: +```python +@script() +def func(...) -> Tuple[ + Annotated[arbitrary_pydantic_type_a, Artifact], + Annotated[arbitrary_pydantic_type_b, Parameter], + Annotated[arbitrary_pydantic_type_c, Parameter], + ...]: + return output_a, output_b, output_c +``` + +### Input-Output function parameters + +Hera also allows output `Parameter`/`Artifact`s as part of the function signature when specified as a `Path` type, +allowing users to write to the path as an output, without needing an explicit return. They require an additional field +`output=True` to distinguish them from the input parameters and must have an underlying `Path` type (or another type +that will write to disk). + +```python +@script() +def func(..., output_param: Annotated[Path, Parameter(output=True, global_name="...", name="")]) -> Annotated[arbitrary_pydantic_type, OutputItem]: + output_param.write_text("...") + return output +``` + +The parent outputs directory, `/hera/outputs` by default, can be set by the user. This is done by adding: + +```python +global_config.set_class_defaults(RunnerScriptConstructor, outputs_directory="user/chosen/outputs") +``` diff --git a/docs/user-guides/script-basics.md b/docs/user-guides/script-basics.md new file mode 100644 index 000000000..2148e81cf --- /dev/null +++ b/docs/user-guides/script-basics.md @@ -0,0 +1,198 @@ +# Script Basics + +The `Script` class is an essential part of Hera's extension on top of Argo. As Hera is a Python library, +[Script templates](https://argoproj.github.io/argo-workflows/fields/#scripttemplate) running Python become the standard +template, which is reflected by the greater feature set provided for writing them. + +## Script Decorator + +The `script` decorator function is a key offering of Hera to achieve near-native Python function orchestration. It +allows you to call the function under a Hera context manager such as a `Workflow` or `Steps` context, and it will be +treated as the intended sub-object, which would be a `template` when under a `Workflow`, or a `Step` when under a +`Steps`. The function will still behave as normal outside of any Hera contexts, meaning you can write unit tests on the +given function. + +> **For advanced users**: the exact mechanism of the `script` decorator is to prepare a `Script` object within the +> decorator, so that when your function is invoked under a Hera context, the call is redirected to the `Script.__call__` +> function. This takes the kwargs of a `Step` or `Task` depending on whether the context manager is a `Steps` or a +> `DAG`. Under a Workflow itself, your function is not expected to take arguments, so the call will add the function as +> a template. + +When decorating a function, you should pass `Script` parameters to the `script` decorator. This includes values such as +the `image` to use, and `resources` to request. + +```py +from hera.workflows import Resources, script + +@script(image="python:3.11", resources=Resources(memory_request="5Gi")) +def echo(message: str): + print(message) +``` + +When calling the function under a `Steps` or `DAG` context, you should pass `Step` or `Task` kwargs, such as the `name` +of the `Step`/`Task`, a `when` clause, a `with_param` list to loop over a given template, or `arguments` for the +function. + +```py +with Workflow(generate_name="dag-diamond-", entrypoint="diamond") as w: + with DAG(name="diamond"): + A = echo(name="A", arguments={"message": "A"}) + B = echo(name="B", arguments={"message": "B"}, when=f"{A.result == 'A'}") + C = echo(name="C", arguments={"message": "C"}, when=f"{A.result != 'A'}") + D = echo(name="D", arguments={"message": "D"}) + A >> [B, C] >> D +``` + +Alternatively, you can specify your DAG using `Task` directly: + +```py +with Workflow(generate_name="dag-diamond-", entrypoint="diamond") as w: + with DAG(name="diamond"): + A = Task(name="A", source=echo, arguments={"message": "A"}) + B = Task(name="B", source=echo, arguments={"message": "B"}, when=f"{A.result == 'A'}") + C = Task(name="C", source=echo, arguments={"message": "C"}, when=f"{A.result != 'A'}") + D = Task(name="D", source=echo, arguments={"message": "D"}) + A >> [B, C] >> D +``` + +> **Note** in the `DAG` above, `D` will still run, even though `C` will be skipped. This is because of the `depends` logic +> resolving to `C.Succeeded || C.Skipped || C.Daemoned` due to Argo's default +> [depends logic](https://argoproj.github.io/argo-workflows/enhanced-depends-logic/#depends). + +## Script Constructors + +### InlineScriptConstructor + +Script templates submitted to Argo typically run the given Python function in a Python image. By default, the Python +function itself is dumped to the YAML, and the Argo cluster will run that code. For the code below, we will see it +directly in the output YAML. + +```py +from hera.workflows import Workflow, script + +@script(add_cwd_to_sys_path=False) +def hello(s: str): + print("Hello, {s}!".format(s=s)) + + +with Workflow( + generate_name="hello-world-", + entrypoint="hello", + arguments={"s": "world"}, +) as w: + hello() +``` + +We added `add_cwd_to_sys_path=False` to remove some boilerplate from the `source` below. You will see Hera adds a +`json.loads` to bridge the YAML input to a Python variable: + +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: hello-world- +spec: + arguments: + parameters: + - name: s + value: world + entrypoint: hello + templates: + - inputs: + parameters: + - name: s + name: hello + script: + command: + - python + image: python:3.8 + source: 'import json + + try: s = json.loads(r''''''{{inputs.parameters.s}}'''''') + + except: s = r''''''{{inputs.parameters.s}}'''''' + + + print(''Hello, {s}!''.format(s=s))' +``` + +This method of running the function is handled by the `InlineScriptConstructor`, called such because it constructs the +`Script` template to run the function "inline" in the YAML. + +#### Importing modules + +A caveat of the `InlineScriptConstructor` is that it is quite limited - as the `InlineScriptConstructor` dumps your code +to the `source` field as-is, you must also `import` (within the function itself) any modules you use in the function: + +```py +@script(image="python:3.10") +def my_matcher(string: str): + import re + + print(bool(re.match("test", string))) +``` + +> **Note** This also applies to other functions in your code - you will not be able to call functions defined outside of +> the scope of the script-decorated function! + +If your function uses standard library imports from Python, you will be able to run your function with any standard +Python image, specified by the `image` argument of the decorator. Therefore, if you use non-standard imports, such as +`numpy`, you will need to use an image that includes `numpy`, or build your own (e.g. as a Docker image on DockerHub). + +### RunnerScriptConstructor + +The `RunnerScriptConstructor` is an alternative `ScriptConstructor` that uses the "Hera Runner" (think of this as being +like the PyTest Runner) to run your function on Argo. This avoids dumping the function to the `source` of a template, +keeping the YAML manageable and small, and allows you to arrange your code in natural Python fashion: imports can be +anywhere in the package, the script-decorated function can call other functions in the package, and the function itself +can take Pydantic objects as arguments. The use of the `RunnerScriptConstructor` necessitates building your own image, +as the Hera Runner runs the function by referencing it as an entrypoint of your module. The image used by the script +should be built from the source code package itself and its dependencies, so that the source code's functions, +dependencies, and Hera itself are available to run. + +A function can set its `constructor` to `"runner"` to use the `RunnerScriptConstructor`, or use the +`global_config.set_class_defaults` function to set it once for all script-decorated functions. We can write a script +template function using Pydantic objects such as: + +```py +global_config.set_class_defaults(Script, constructor="runner") + +class Input(BaseModel): + a: int + b: str = "foo" + +class Output(BaseModel): + output: List[Input] + +@script() +def my_function(input: Input) -> Output: + return Output(output=[input]) +``` + +This creates a template in YAML that looks like: + +```yaml +- name: my-function + inputs: + parameters: + - name: input + script: + command: + - python + args: + - -m + - hera.workflows.runner + - -e + - examples.workflows.callable_script:my_function + image: my-image-with-python-source-code-and-dependencies + source: '{{inputs.parameters}}' +``` + +You will notice some pecularities of this template. Firstly, it is running the `hera.workflows.runner` module, rather +than a user-module such as `examples.workflows.callable_script`. Instead, the `-e` arg specifies the `--entrypoint` to +be called by the runner, in this case the `my_function` of the `examples.workflows.callable_script` module. We do not +give a real `image` here, but we assume it exists in this example. Finally, the `source` parameter is passed the +`inputs.parameters` of the template. This is because the Hera Runner relies on a mechanism in Argo where the value +passed to `source` is dumped to a file, and then the filename is passed as the final `arg` to the `command`. Therefore, +the `source` will actually contain a list of parameters as dictionaries, which are dumped to a file which is passed to +`hera.workflows.runner`. Of course, this is all handled for you! diff --git a/docs/user-guides/script-runner-io.md b/docs/user-guides/script-runner-io.md new file mode 100644 index 000000000..e5a4b5778 --- /dev/null +++ b/docs/user-guides/script-runner-io.md @@ -0,0 +1,105 @@ +# Script Runner IO + +Hera provides the `RunnerInput` and `RunnerOutput` Pydantic classes which can be used to more succinctly write your +script function inputs and outputs, and requires use of the Hera Runner. Use of these classes also requires the +`"script_pydantic_io"` experimental feature flag to be enabled: + +```py +global_config.experimental_features["script_pydantic_io"] = True +``` + +## Pydantic V1 or V2? + +You can import `RunnerInput` and `RunnerOutput` from the `hera.workflows.io` submodule to import the version of Pydantic +that matches your V1 or V2 installation. + +If you need to use V1 models when you have V2 installed, you should import +`RunnerInput` and `RunnerOutput` from the `hera.workflows.io.v1` or `hera.workflows.io.v2` module explicitly. The V2 +models will not be available if you have installed `pydantic<2`, but the V1 models are usable for either version, +allowing you to migrate at your own pace. + +## Script inputs using `RunnerInput` + +For your script inputs, you can create a derived class of `RunnerInput`, and declare all your input parameters (and +artifacts) as fields of the class. If you want to use `Annotated` to declare `Artifacts` add metadata to your +`Parameters`, you will also need to enable the `"script_annotations"` experimental feature flag. + +```py +from typing import Annotated +from pydantic import BaseModel + +from hera.workflows import Artifact, ArtifactLoader, Parameter, script +from hera.workflows.io import RunnerInput + + +class MyObject(BaseModel): + a_dict: dict + a_str: str = "a default string" + + +class MyInput(RunnerInput): + param_int: Annotated[int, Parameter(name="param-input")] = 42 + an_object: Annotated[MyObject, Parameter(name="obj-input")] = MyObject( + a_dict={"my-key": "a-value"}, a_str="hello world!" + ) + artifact_int: Annotated[int, Artifact(name="artifact-input", loader=ArtifactLoader.json)] + + +@script(constructor="runner") +def pydantic_io( + my_input: MyInput, +) -> ...: + ... +``` + +This will create a script template named `pydantic_io`, with input parameters `"param-input"` and `"obj-input"`, but +_not_ `"my_input"` (hence inline script templates will not work, as references to `my_input` will not resolve); the +template will also have the `"artifact-input"` artifact. The yaml generated from the Python will look something like the following: + +```yaml + templates: + - name: pydantic-io + inputs: + parameters: + - name: param-input + default: '42' + - name: obj-input + default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}' + artifacts: + - name: artifact-input + path: /tmp/hera-inputs/artifacts/artifact-input + script: + ... +``` + +## Script outputs using `RunnerOutput` + +The `RunnerOutput` class comes with two special variables, `exit_code` and `result`. The `exit_code` is used to exit the +container when running on Argo with the specific exit code - it is set to `0` by default. The `result` is used to print +any serializable object to stdout, which means you can now use `.result` on tasks or steps that use a "runner +constructor" script - you should be mindful of printing/logging anything else to stdout, which will stop the `result` +functionality working as intended. If you want an output parameters/artifacts with the name `exit_code` or `result`, you +can declare another field with an annotation of that name, e.g. +`my_exit_code: Annotated[int, Parameter(name="exit_code")]`. + +Aside from the `exit_code` and `result`, the `RunnerOutput` behaves exactly like the `RunnerInput`: + +```py +from typing import Annotated + +from hera.workflows import Artifact, Parameter, script +from hera.workflows.io import RunnerOutput + + +class MyOutput(RunnerOutput): + param_int: Annotated[int, Parameter(name="param-output")] + artifact_int: Annotated[int, Artifact(name="artifact-output")] + + +@script(constructor="runner") +def pydantic_io() -> MyOutput: + return MyOutput(exit_code=1, result="Test!", param_int=42, artifact_int=my_input.param_int) + +``` + +See the full Pydantic IO example [here](../examples/workflows/experimental/script_pydantic_io.md)! diff --git a/docs/user-guides/scripts.md b/docs/user-guides/scripts.md deleted file mode 100644 index 458474351..000000000 --- a/docs/user-guides/scripts.md +++ /dev/null @@ -1,529 +0,0 @@ -# Scripts - -The `Script` class is an essential part of Hera's extension on top of Argo. As Hera is a Python library, -[Script templates](https://argoproj.github.io/argo-workflows/fields/#scripttemplate) running Python become the standard -template, which is reflected by the greater feature set provided for writing them. - -## Script Decorator - -The `script` decorator function is a key offering of Hera to achieve near-native Python function orchestration. It -allows you to call the function under a Hera context manager such as a `Workflow` or `Steps` context, and it will be -treated as the intended sub-object, which would be a `template` when under a `Workflow`, or a `Step` when under a -`Steps`. The function will still behave as normal outside of any Hera contexts, meaning you can write unit tests on the -given function. - -> **For advanced users**: the exact mechanism of the `script` decorator is to prepare a `Script` object within the -> decorator, so that when your function is invoked under a Hera context, the call is redirected to the `Script.__call__` -> function. This takes the kwargs of a `Step` or `Task` depending on whether the context manager is a `Steps` or a -> `DAG`. Under a Workflow itself, your function is not expected to take arguments, so the call will add the function as -> a template. - -When decorating a function, you should pass `Script` parameters to the `script` decorator. This includes values such as -the `image` to use, and `resources` to request. - -```py -from hera.workflows import Resources, script - -@script(image="python:3.11", resources=Resources(memory_request="5Gi")) -def echo(message: str): - print(message) -``` - -When calling the function under a `Steps` or `DAG` context, you should pass `Step` or `Task` kwargs, such as the `name` -of the `Step`/`Task`, a `when` clause, a `with_param` list to loop over a given template, or `arguments` for the -function. - -```py -with Workflow(generate_name="dag-diamond-", entrypoint="diamond") as w: - with DAG(name="diamond"): - A = echo(name="A", arguments={"message": "A"}) - B = echo(name="B", arguments={"message": "B"}, when=f"{A.result == 'A'}") - C = echo(name="C", arguments={"message": "C"}, when=f"{A.result != 'A'}") - D = echo(name="D", arguments={"message": "D"}) - A >> [B, C] >> D -``` - -> **Note** in the `DAG` above, `D` will still run, even though `C` will be skipped. This is because of the `depends` logic -> resolving to `C.Succeeded || C.Skipped || C.Daemoned` due to Argo's default -> [depends logic](https://argoproj.github.io/argo-workflows/enhanced-depends-logic/#depends). - -## Script Constructors - -### InlineScriptConstructor - -Script templates submitted to Argo typically run the given Python function in a Python image. By default, the Python -function itself is dumped to the YAML, and the Argo cluster will run that code. For the code below, we will see it -directly in the output YAML. - -```py -from hera.workflows import Workflow, script - -@script(add_cwd_to_sys_path=False) -def hello(s: str): - print("Hello, {s}!".format(s=s)) - - -with Workflow( - generate_name="hello-world-", - entrypoint="hello", - arguments={"s": "world"}, -) as w: - hello() -``` - -We added `add_cwd_to_sys_path=False` to remove some boilerplate from the `source` below. You will see Hera adds a -`json.loads` to bridge the YAML input to a Python variable: - -```yaml -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: hello-world- -spec: - arguments: - parameters: - - name: s - value: world - entrypoint: hello - templates: - - inputs: - parameters: - - name: s - name: hello - script: - command: - - python - image: python:3.8 - source: 'import json - - try: s = json.loads(r''''''{{inputs.parameters.s}}'''''') - - except: s = r''''''{{inputs.parameters.s}}'''''' - - - print(''Hello, {s}!''.format(s=s))' -``` - -This method of running the function is handled by the `InlineScriptConstructor`, called such because it constructs the -`Script` template to run the function "inline" in the YAML. - -#### Importing modules - -A caveat of the `InlineScriptConstructor` is that it is quite limited - as the `InlineScriptConstructor` dumps your code -to the `source` field as-is, you must also `import` (within the function itself) any modules you use in the function: - -```py -@script(image="python:3.10") -def my_matcher(string: str): - import re - - print(bool(re.match("test", string))) -``` - -> **Note** This also applies to other functions in your code - you will not be able to call functions defined outside of -> the scope of the script-decorated function! - -If your function uses standard library imports from Python, you will be able to run your function with any standard -Python image, specified by the `image` argument of the decorator. Therefore, if you use non-standard imports, such as -`numpy`, you will need to use an image that includes `numpy`, or build your own (e.g. as a Docker image on DockerHub). - -### RunnerScriptConstructor - -The `RunnerScriptConstructor` is an alternative `ScriptConstructor` that uses the "Hera Runner" (think of this as being -like the PyTest Runner) to run your function on Argo. This avoids dumping the function to the `source` of a template, -keeping the YAML manageable and small, and allows you to arrange your code in natural Python fashion: imports can be -anywhere in the package, the script-decorated function can call other functions in the package, and the function itself -can take Pydantic objects as arguments. The use of the `RunnerScriptConstructor` necessitates building your own image, -as the Hera Runner runs the function by referencing it as an entrypoint of your module. The image used by the script -should be built from the source code package itself and its dependencies, so that the source code's functions, -dependencies, and Hera itself are available to run. - -A function can set its `constructor` to `"runner"` to use the `RunnerScriptConstructor`, or use the -`global_config.set_class_defaults` function to set it once for all script-decorated functions. We can write a script -template function using Pydantic objects such as: - -```py -global_config.set_class_defaults(Script, constructor="runner") - -class Input(BaseModel): - a: int - b: str = "foo" - -class Output(BaseModel): - output: List[Input] - -@script() -def my_function(input: Input) -> Output: - return Output(output=[input]) -``` - -This creates a template in YAML that looks like: - -```yaml -- name: my-function - inputs: - parameters: - - name: input - script: - command: - - python - args: - - -m - - hera.workflows.runner - - -e - - examples.workflows.callable_script:my_function - image: my-image-with-python-source-code-and-dependencies - source: '{{inputs.parameters}}' -``` - -You will notice some pecularities of this template. Firstly, it is running the `hera.workflows.runner` module, rather -than a user-module such as `examples.workflows.callable_script`. Instead, the `-e` arg specifies the `--entrypoint` to -be called by the runner, in this case the `my_function` of the `examples.workflows.callable_script` module. We do not -give a real `image` here, but we assume it exists in this example. Finally, the `source` parameter is passed the -`inputs.parameters` of the template. This is because the Hera Runner relies on a mechanism in Argo where the value -passed to `source` is dumped to a file, and then the filename is passed as the final `arg` to the `command`. Therefore, -the `source` will actually contain a list of parameters as dictionaries, which are dumped to a file which is passed to -`hera.workflows.runner`. Of course, this is all handled for you! - -## Script Annotations - -Annotation syntax is an experimental feature using `typing.Annotated` for `Parameter`s and `Artifact`s to declare inputs -and outputs for functions decorated as `scripts`. They use `Annotated` as the type in the function parameters and allow -us to simplify writing scripts with parameters and artifacts that require additional fields such as a `description` or -alternative `name`. - -This feature must be enabled by setting the `experimental_feature` flag `script_annotations` on the global config. - -```py -global_config.experimental_features["script_annotations"] = True -``` - -### Parameters - -In Hera, we can currently specify inputs inside the `@script` decorator as follows: - -```python -@script( - inputs=[ - Parameter(name="an_int", description="an_int parameter", default=1, enum=[1, 2, 3]), - Parameter(name="a_bool", description="a_bool parameter", default=True, enum=[True, False]), - Parameter(name="a_string", description="a_string parameter", default="a", enum=["a", "b", "c"]) - ] -) -def echo_all(an_int=1, a_bool=True, a_string="a"): - print(an_int) - print(a_bool) - print(a_string) -``` - -Notice how the `name` and `default` values are duplicated for each `Parameter`. Using annotations, we can rewrite this -as: - -```python -@script() -def echo_all( - an_int: Annotated[int, Parameter(description="an_int parameter", default=1, enum=[1, 2, 3])], - a_bool: Annotated[bool, Parameter(description="a_bool parameter", default=True, enum=[True, False])], - a_string: Annotated[str, Parameter(description="a_string parameter", default="a", enum=["a", "b", "c"])] -): - print(an_int) - print(a_bool) - print(a_string) -``` - -The fields allowed in the `Parameter` annotations are: `name`, `default`, `enum`, and `description`. - -### Artifacts - -> Note: `Artifact` annotations are only supported when used with the `RunnerScriptConstructor`. - - -The feature is even more powerful for `Artifact`s. In Hera we are currently able to specify `Artifact`s in `inputs`, but -the given path is not programmatically linked to the code within the function unless defined outside the scope of the -function: - -```python -@script(inputs=Artifact(name="my-artifact", path="/tmp/file")) -def read_artifact(): - with open("/tmp/file") as a_file: # Repeating "/tmp/file" is prone to human error! - print(a_file.read()) - -# or - -MY_PATH = "/tmp/file" # Now accessible outside of the function scope! -@script(inputs=Artifact(name="my-artifact", path=MY_PATH)) -def read_artifact(): - with open(MY_PATH) as a_file: - print(a_file.read()) -``` - -By using annotations we can avoid repeating the `path` of the file, and the function can use the variable directly as a -`Path` object, with its value already set to the given path: - -```python -@script(constructor="runner") -def read_artifact(an_artifact: Annotated[Path, Artifact(name="my-artifact", path="/tmp/file")]): - print(an_artifact.read_text()) -``` - -The fields allowed in the `Artifact` annotations are: `name`, `path`, and `loader`. - -### Artifact Loaders - -In case you want to load an object directly from the `path` of the `Artifact`, we allow two types of loaders besides the -default `Path` behaviour used when no loader is specified. The `ArtifactLoader` enum provides `file` and `json` loaders. - -#### `None` loader -With `None` set as the loader (which is by default) in the Artifact annotation, the `path` attribute of `Artifact` is -extracted and used to provide a `pathlib.Path` object for the given argument, which can be used directly in the function -body. The following example is the same as above except for explicitly setting the loader to `None`: - -```python -@script(constructor="runner") -def read_artifact( - an_artifact: Annotated[Path, Artifact(name="my-artifact", path="/tmp/file", loader=None)] -): - print(an_artifact.read_text()) -``` - -#### `file` loader - -When the loader is set to `file`, the function parameter type should be `str`, and will contain the contents string -representation of the file stored at `path` (essentially performing `path.read_text()` automatically): - -```python -@script(constructor="runner") -def read_artifact( - an_artifact: Annotated[str, Artifact(name="my-artifact", path="/tmp/file", loader=ArtifactLoader.file)] -) -> str: - return an_artifact -``` - -This loads the contents of the file at `"/tmp/file"` to the argument `an_artifact` and subsequently can be used as a -string inside the function. - -#### `json` loader - -When the loader is set to `json`, the contents of the file at `path` are read and parsed to a dictionary via `json.load` -(essentially performing `json.load(path.open())` automatically). By specifying a Pydantic type, this dictionary can even -be automatically parsed to that type: - -```python -class MyArtifact(BaseModel): - a = "a" - b = "b" - - -@script(constructor="runner") -def read_artifact( - an_artifact: Annotated[MyArtifact, Artifact(name="my-artifact", path="/tmp/file", loader=ArtifactLoader.json)] -) -> str: - return an_artifact.a + an_artifact.b -``` - -Here, we have a json representation of `MyArtifact` such as `{"a": "hello ", "b": "world"}` stored at `"/tmp/file"`. We -can load it with `ArtifactLoader.json` and then use `an_artifact` as an instance of `MyArtifact` inside the function, so -the function will return `"hello world"`. - -#### Function parameter name aliasing - -Script annotations can work on top of the `RunnerScriptConstructor` for name aliasing of function -parameters, in particular to allow a public `kebab-case` parameter, while using a `snake_case` -Python function parameter. When using a `RunnerScriptConstructor`, an environment variable -`hera__script_annotations` will be added to the Script template (visible in the exported YAML file). - -### Outputs - -> Note: Output annotations are only supported when used with the `RunnerScriptConstructor`. - -There are two ways to specify output Artifacts and Parameters. - -#### Function return annotations - -Function return annotations can be used to specify the output type information for output Artifacts and Parameters, and -the function should return a value or tuple. An example can be seen -[here](../examples/workflows/experimental/script_annotations_outputs.md). - -For a simple hello world output artifact example we currently have: -```python -@script(outputs=Artifact(name="hello-artifact", path="/tmp/hello_world.txt")) -def hello_world(): - with open("/tmp/hello_world.txt", "w") as f: - f.write("Hello, world!") -``` - -The new approach allows us to avoid duplication of the path, which is now optional, and results in more readable code: -```python -@script() -def hello_world() -> Annotated[str, Artifact(name="hello-artifact")]: - return "Hello, world!" -``` - -For `Parameter`s we have a similar syntax: - -```python -@script() -def hello_world() -> Annotated[str, Parameter(name="hello-param")]: - return "Hello, world!" -``` - -The returned values will be automatically saved in files within the Argo container according to this schema: -* `/hera/outputs/parameters/` -* `/hera/outputs/artifacts/` - -These outputs are also exposed in the `outputs` section of the template in YAML. - -The object returned from the function can be of any serialisable Pydantic type (or basic Python type) and must be -`Annotated` as an `Artifact` or `Parameter`. The `Parameter`/`Artifact`'s `name` will be used for the path of the output unless provided: -* if the annotation is an `Artifact` with a `path`, we use that `path` -* if the annotation is a `Parameter`, with a `value_from` that contains a `path`, we use that `path` - -See the following two functions for specifying custom paths: - -```python -@script() -def hello_world() -> Annotated[str, Artifact(name="hello-artifact", path="/tmp/hello_world_art.txt")]: - return "Hello, world!" - -@script() -def hello_world() -> Annotated[str, Parameter(name="hello-param", value_from={"path": "/tmp/hello_world_param.txt"})]: - return "Hello, world!" -``` - -For multiple outputs, the return type should be a `Tuple` of arbitrary Pydantic types with individual -`Parameter`/`Artifact` annotations, and the function must return a tuple from the function matching these types: -```python -@script() -def func(...) -> Tuple[ - Annotated[arbitrary_pydantic_type_a, Artifact], - Annotated[arbitrary_pydantic_type_b, Parameter], - Annotated[arbitrary_pydantic_type_c, Parameter], - ...]: - return output_a, output_b, output_c -``` - -#### Input-Output function parameters - -Hera also allows output `Parameter`/`Artifact`s as part of the function signature when specified as a `Path` type, -allowing users to write to the path as an output, without needing an explicit return. They require an additional field -`output=True` to distinguish them from the input parameters and must have an underlying `Path` type (or another type -that will write to disk). - -```python -@script() -def func(..., output_param: Annotated[Path, Parameter(output=True, global_name="...", name="")]) -> Annotated[arbitrary_pydantic_type, OutputItem]: - output_param.write_text("...") - return output -``` - -The parent outputs directory, `/hera/outputs` by default, can be set by the user. This is done by adding: - -```python -global_config.set_class_defaults(RunnerScriptConstructor, outputs_directory="user/chosen/outputs") -``` - -## Script Pydantic IO - -Hera provides the `RunnerInput` and `RunnerOutput` classes which can be used to more succinctly write your script -function inputs and outputs, and requires use of the Hera Runner. Use of these classes also requires the -`"script_pydantic_io"` experimental feature flag to be enabled: - -```py -global_config.experimental_features["script_pydantic_io"] = True -``` - -### Pydantic V1 or V2? - -You can import `RunnerInput` and `RunnerOutput` from the `hera.workflows.io` submodule to import the version of Pydantic -that matches your V1 or V2 installation. - -If you need to use V1 models when you have V2 installed, you should import -`RunnerInput` and `RunnerOutput` from the `hera.workflows.io.v1` or `hera.workflows.io.v2` module explicitly. The V2 -models will not be available if you have installed `pydantic<2`, but the V1 models are usable for either version, -allowing you to migrate at your own pace. - -### Script inputs using `RunnerInput` - -For your script inputs, you can create a derived class of `RunnerInput`, and declare all your input parameters (and -artifacts) as fields of the class. If you want to use `Annotated` to declare `Artifacts` add metadata to your -`Parameters`, you will also need to enable the `"script_annotations"` experimental feature flag. - -```py -from typing import Annotated -from pydantic import BaseModel - -from hera.workflows import Artifact, ArtifactLoader, Parameter, script -from hera.workflows.io import RunnerInput - - -class MyObject(BaseModel): - a_dict: dict = {} - a_str: str = "a default string" - - -class MyInput(RunnerInput): - param_int: Annotated[int, Parameter(name="param-input")] = 42 - an_object: Annotated[MyObject, Parameter(name="obj-input")] = MyObject( - a_dict={"my-key": "a-value"}, a_str="hello world!" - ) - artifact_int: Annotated[int, Artifact(name="artifact-input", loader=ArtifactLoader.json)] - - -@script(constructor="runner") -def pydantic_io( - my_input: MyInput, -) -> ...: - ... -``` - -This will create a script template named `pydantic_io`, with input parameters `"param-input"` and `"obj-input"`, but -_not_ `"my_input"` (hence inline script templates will not work, as references to `my_input` will not resolve); the -template will also have the `"artifact-input"` artifact. The yaml generated from the Python will look something like the following: - -```yaml - templates: - - name: pydantic-io - inputs: - parameters: - - name: param-input - default: '42' - - name: obj-input - default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}' - artifacts: - - name: artifact-input - path: /tmp/hera-inputs/artifacts/artifact-input - script: - ... -``` - -### Script outputs using `RunnerOutput` - -The `RunnerOutput` class comes with two special variables, `exit_code` and `result`. The `exit_code` is used to exit the -container when running on Argo with the specific exit code - it is set to `0` by default. The `result` is used to print -any serializable object to stdout, which means you can now use `.result` on tasks or steps that use a "runner -constructor" script - you should be mindful of printing/logging anything else to stdout, which will stop the `result` -functionality working as intended. If you want an output parameters/artifacts with the name `exit_code` or `result`, you -can declare another field with an annotation of that name, e.g. -`my_exit_code: Annotated[int, Parameter(name="exit_code")]`. - -Aside from the `exit_code` and `result`, the `RunnerOutput` behaves exactly like the `RunnerInput`: - -```py -from typing import Annotated - -from hera.workflows import Artifact, Parameter, script -from hera.workflows.io import RunnerOutput - - -class MyOutput(RunnerOutput): - param_int: Annotated[int, Parameter(name="param-output")] - artifact_int: Annotated[int, Artifact(name="artifact-output")] - - -@script(constructor="runner") -def pydantic_io() -> MyOutput: - return MyOutput(exit_code=1, result="Test!", param_int=42, artifact_int=my_input.param_int) - -``` - -See the full Pydantic IO example [here](../examples/workflows/experimental/script_pydantic_io.md)! diff --git a/docs/walk-through/advanced-hera-features.md b/docs/walk-through/advanced-hera-features.md index 774fa3a5c..dbdf0791b 100644 --- a/docs/walk-through/advanced-hera-features.md +++ b/docs/walk-through/advanced-hera-features.md @@ -117,7 +117,7 @@ This feature can be enabled by setting the `experimental_feature` flag `script_a global_config.experimental_features["script_annotations"] = True ``` -Read the full guide on script annotations in [the script user guide](../user-guides/scripts.md#script-annotations). +Read the full guide on script annotations in [the script user guide](../user-guides/script-annotations.md). ### Script IO Models @@ -133,7 +133,7 @@ To enable Hera input/output models, you must set the `experimental_feature` flag global_config.experimental_features["script_pydantic_io"] = True ``` -Read the full guide on script pydantic IO in [the script user guide](../user-guides/scripts.md#script-pydantic-io). +Read the full guide on script pydantic IO in [the script user guide](../user-guides/script-runner-io.md). ## Graduated features @@ -150,4 +150,4 @@ Argo. The image used by the script should be built from the source code package source code's functions, dependencies, and Hera itself are available to run. The `RunnerScriptConstructor` is also compatible with Pydantic so supports deserializing inputs to Python objects and serializing outputs to json strings. -Read [the Script Guide](../user-guides/scripts.md#runnerscriptconstructor) to learn more! +Read [the Script Guide](../user-guides/script-basics.md#runnerscriptconstructor) to learn more! diff --git a/docs/walk-through/hello-world.md b/docs/walk-through/hello-world.md index 3131cfe77..c581117d7 100644 --- a/docs/walk-through/hello-world.md +++ b/docs/walk-through/hello-world.md @@ -67,8 +67,8 @@ def echo_twice(message: str): ``` For an in-depth explanation of the mechanics of the script decorator, see the -[script decorator section](../user-guides/scripts.md#script-decorator) in the scripts user guide, and read about -building your own image in the [script constructors section](../user-guides/scripts.md#script-constructors). +[script decorator section](../user-guides/script-basics.md#script-decorator) in the scripts user guide, and read about +building your own image in the [script constructors section](../user-guides/script-basics.md#script-constructors). ## The Workflow Context Manager diff --git a/docs/walk-through/pydantic-support.md b/docs/walk-through/pydantic-support.md index c7b8a4cbf..217dad6a8 100644 --- a/docs/walk-through/pydantic-support.md +++ b/docs/walk-through/pydantic-support.md @@ -14,3 +14,6 @@ validate the function call. Using Pydantic classes in your function parameters u de-serializing features of Pydantic when running on Argo. Your functions can return objects that are serialized, passed to another `Step` as a string argument, and then de-serialized in another function. This flow can be seen in [the callable scripts example](../examples/workflows/scripts/callable_script.md). + +The new experimental Runner IO feature provides a way to specify composite inputs using the class fields, which become the +template's inputs. Read more in the [Script Runner IO guide](../user-guides/scripts-runner-io.md). diff --git a/examples/workflows/experimental/script-pydantic-io.yaml b/examples/workflows/experimental/script-pydantic-io.yaml deleted file mode 100644 index 55256a985..000000000 --- a/examples/workflows/experimental/script-pydantic-io.yaml +++ /dev/null @@ -1,41 +0,0 @@ -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: pydantic-io- -spec: - templates: - - inputs: - artifacts: - - name: artifact-input - path: /tmp/hera-inputs/artifacts/artifact-input - parameters: - - default: '42' - name: param-input - - default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}' - name: obj-input - name: pydantic-io - outputs: - artifacts: - - name: artifact-output - path: /tmp/hera-outputs/artifacts/artifact-output - parameters: - - name: param-output - valueFrom: - path: /tmp/hera-outputs/parameters/param-output - script: - args: - - -m - - hera.workflows.runner - - -e - - examples.workflows.experimental.script_pydantic_io:pydantic_io - command: - - python - env: - - name: hera__script_annotations - value: '' - - name: hera__outputs_directory - value: /tmp/hera-outputs - - name: hera__script_pydantic_io - value: '' - image: python:3.8 - source: '{{inputs.parameters}}' diff --git a/examples/workflows/experimental/script-runner-io.yaml b/examples/workflows/experimental/script-runner-io.yaml new file mode 100644 index 000000000..3ee5a5c08 --- /dev/null +++ b/examples/workflows/experimental/script-runner-io.yaml @@ -0,0 +1,81 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pydantic-io- +spec: + templates: + - name: use-pydantic-io + steps: + - - name: writer + template: writer + - - arguments: + artifacts: + - from: '{{steps.writer.outputs.artifacts.int-artifact}}' + name: artifact-input + parameters: + - name: param_int + value: '101' + - name: an_object + value: '{"a_dict": {"my-new-key": "my-new-value"}, "a_str": "a default + string"}' + name: pydantic-io + template: pydantic-io + - name: writer + outputs: + artifacts: + - archive: + none: {} + name: int-artifact + path: /tmp/hera-outputs/artifacts/int-artifact + script: + args: + - -m + - hera.workflows.runner + - -e + - examples.workflows.experimental.script_runner_io:writer + command: + - python + env: + - name: hera__script_annotations + value: '' + - name: hera__outputs_directory + value: /tmp/hera-outputs + - name: hera__script_pydantic_io + value: '' + image: python-image-built-with-my-package + source: '{{inputs.parameters}}' + - inputs: + artifacts: + - name: artifact-input + path: /tmp/hera-inputs/artifacts/artifact-input + parameters: + - default: '42' + name: param-input + - default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}' + name: obj-input + name: pydantic-io + outputs: + artifacts: + - name: artifact-output + path: /tmp/hera-outputs/artifacts/artifact-output + parameters: + - name: param-output + valueFrom: + path: /tmp/hera-outputs/parameters/param-output + script: + args: + - -m + - hera.workflows.runner + - -e + - examples.workflows.experimental.script_runner_io:pydantic_io + command: + - python + env: + - name: hera__script_annotations + value: '' + - name: hera__outputs_directory + value: /tmp/hera-outputs + - name: hera__script_pydantic_io + value: '' + image: python-image-built-with-my-package + source: '{{inputs.parameters}}' diff --git a/examples/workflows/experimental/script_pydantic_io.py b/examples/workflows/experimental/script_runner_io.py similarity index 61% rename from examples/workflows/experimental/script_pydantic_io.py rename to examples/workflows/experimental/script_runner_io.py index 418795554..1afc0486f 100644 --- a/examples/workflows/experimental/script_pydantic_io.py +++ b/examples/workflows/experimental/script_runner_io.py @@ -4,7 +4,8 @@ from pydantic import BaseModel from hera.shared import global_config -from hera.workflows import Artifact, ArtifactLoader, Parameter, Workflow, script +from hera.workflows import Artifact, ArtifactLoader, Parameter, Steps, Workflow, script +from hera.workflows.archive import NoneArchiveStrategy from hera.workflows.io import RunnerInput, RunnerOutput try: @@ -17,7 +18,7 @@ class MyObject(BaseModel): - a_dict: dict = {} + a_dict: dict # not giving a default makes the field a required input for the template a_str: str = "a default string" @@ -34,7 +35,12 @@ class MyOutput(RunnerOutput): artifact_int: Annotated[int, Artifact(name="artifact-output")] -@script(constructor="runner") +@script(constructor="runner", image="python-image-built-with-my-package") +def writer() -> Annotated[int, Artifact(name="int-artifact", archive=NoneArchiveStrategy())]: + return 100 + + +@script(constructor="runner", image="python-image-built-with-my-package") def pydantic_io( my_input: MyInput, ) -> MyOutput: @@ -42,4 +48,14 @@ def pydantic_io( with Workflow(generate_name="pydantic-io-") as w: - pydantic_io() + with Steps(name="use-pydantic-io"): + write_step = writer() + pydantic_io( + arguments=[ + write_step.get_artifact("int-artifact").with_name("artifact-input"), + { + "param_int": 101, + "an_object": MyObject(a_dict={"my-new-key": "my-new-value"}), + }, + ] + ) diff --git a/examples/workflows/template-level-volume.yaml b/examples/workflows/template-level-volume.yaml index 5032067bb..c0d87e5cc 100644 --- a/examples/workflows/template-level-volume.yaml +++ b/examples/workflows/template-level-volume.yaml @@ -5,35 +5,17 @@ metadata: spec: entrypoint: generate-and-use-volume templates: - - name: generate-and-use-volume - steps: - - - name: generate-volume - template: generate-volume - arguments: - parameters: - - name: pvc-size - # In a real-world example, this could be generated by a previous workflow step. - value: '1Gi' - - - name: generate - template: whalesay - arguments: - parameters: - - name: pvc-name - value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' - - - name: print - template: print-message - arguments: - parameters: - - name: pvc-name - value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' - - - name: generate-volume - inputs: + - inputs: parameters: - - name: pvc-size + - name: pvc-size + name: generate-volume + outputs: + parameters: + - name: pvc-name + valueFrom: + jsonPath: '{.metadata.name}' resource: action: create - setOwnerReference: true manifest: | apiVersion: v1 kind: PersistentVolumeClaim @@ -44,40 +26,60 @@ spec: resources: requests: storage: '{{inputs.parameters.pvc-size}}' - outputs: - parameters: - - name: pvc-name - valueFrom: - jsonPath: '{.metadata.name}' - - - name: whalesay - inputs: - parameters: - - name: pvc-name - volumes: - - name: workdir - persistentVolumeClaim: - claimName: '{{inputs.parameters.pvc-name}}' - container: + setOwnerReference: true + - container: + args: + - echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt + command: + - sh + - -c image: docker/whalesay:latest - command: [sh, -c] - args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"] volumeMounts: - - name: workdir - mountPath: /mnt/vol - - - name: print-message + - mountPath: /mnt/vol + name: workdir inputs: - parameters: - - name: pvc-name + parameters: + - name: pvc-name + name: whalesay volumes: - - name: workdir - persistentVolumeClaim: - claimName: '{{inputs.parameters.pvc-name}}' - container: + - name: workdir + persistentVolumeClaim: + claimName: '{{inputs.parameters.pvc-name}}' + - container: + args: + - echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt + command: + - sh + - -c image: alpine:latest - command: [sh, -c] - args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"] volumeMounts: - - name: workdir - mountPath: /mnt/vol + - mountPath: /mnt/vol + name: workdir + inputs: + parameters: + - name: pvc-name + name: print-message + volumes: + - name: workdir + persistentVolumeClaim: + claimName: '{{inputs.parameters.pvc-name}}' + - name: generate-and-use-volume + steps: + - - arguments: + parameters: + - name: pvc-size + value: 1Gi + name: generate-volume + template: generate-volume + - - arguments: + parameters: + - name: pvc-name + value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' + name: generate + template: whalesay + - - arguments: + parameters: + - name: pvc-name + value: '{{steps.generate-volume.outputs.parameters.pvc-name}}' + name: print + template: print-message diff --git a/examples/workflows/upstream/title-and-descriptin-with-markdown.upstream.yaml b/examples/workflows/upstream/title-and-description-with-markdown.upstream.yaml similarity index 58% rename from examples/workflows/upstream/title-and-descriptin-with-markdown.upstream.yaml rename to examples/workflows/upstream/title-and-description-with-markdown.upstream.yaml index ff25ccdf3..7750e5791 100644 --- a/examples/workflows/upstream/title-and-descriptin-with-markdown.upstream.yaml +++ b/examples/workflows/upstream/title-and-description-with-markdown.upstream.yaml @@ -5,10 +5,12 @@ metadata: labels: workflows.argoproj.io/archive-strategy: "false" annotations: - workflows.argoproj.io/title: "**Test Title**" + # both annotations are available since v3.4.4 + # embedded markdown is available since v3.6 + workflows.argoproj.io/title: "**Test Title**" # defaults to `metadata.name` if not specified workflows.argoproj.io/description: | `This is a simple hello world example.` - You can also run it in Python: https://couler-proj.github.io/couler/examples/#hello-world + This is an embedded link to the docs: https://argo-workflows.readthedocs.io/en/latest/title-and-description/ spec: entrypoint: whalesay templates: diff --git a/examples/workflows/upstream/withsequence-nested-result.upstream.yaml b/examples/workflows/upstream/withsequence-nested-result.upstream.yaml new file mode 100644 index 000000000..1a6a2f3b4 --- /dev/null +++ b/examples/workflows/upstream/withsequence-nested-result.upstream.yaml @@ -0,0 +1,43 @@ +# This example shows how to nest withSequence loops in a Workflow. +# A is the first step. A's output determines how many times B is executed. +# B's output then determines how many times C is executed. +# A +# / \ +# B1 B2 +# / | \ / | +# C1 C2 C3 C4 C5 +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: withsequence-nested-result- +spec: + entrypoint: hello-entrypoint + templates: + - name: hello-entrypoint + steps: + - - name: hello-a + template: hello + - - name: hello-b + template: hello-hello + withSequence: + start: "1" + end: "{{steps.hello-a.outputs.result}}" + + - name: hello-hello + steps: + - - name: hello-b + template: hello + - - name: hello-c + template: hello + withSequence: + start: "1" + end: "{{steps.hello-b.outputs.result}}" + + - name: hello + script: + image: python:alpine3.6 + command: [python] + source: | + import random + result = random.randint(0,5) + print(result) diff --git a/mkdocs.yml b/mkdocs.yml index c2d400563..164012dc9 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -27,7 +27,10 @@ nav: - History of Hera: contributing/history.md - User Guides: - Core Concepts: user-guides/core-concepts.md - - Scripts: user-guides/scripts.md + - Scripts: + - Script Basics: user-guides/script-basics.md + - Script Annotations: user-guides/script-annotations.md + - Script Runner IO: user-guides/script-runner-io.md - Expr Transpiler: user-guides/expr.md - Examples: - About: examples/workflows-examples.md diff --git a/src/hera/workflows/_runner/__init__.py b/src/hera/workflows/_runner/__init__.py new file mode 100644 index 000000000..ea8404a49 --- /dev/null +++ b/src/hera/workflows/_runner/__init__.py @@ -0,0 +1 @@ +"""Functionality related to the Hera Runner.""" diff --git a/src/hera/workflows/_runner/script_annotations_util.py b/src/hera/workflows/_runner/script_annotations_util.py new file mode 100644 index 000000000..fae6addbf --- /dev/null +++ b/src/hera/workflows/_runner/script_annotations_util.py @@ -0,0 +1,320 @@ +"""The script_annotations_util module contains functionality for the script annotations feature when used with the runner.""" +import inspect +import json +import os +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union, cast + +from hera.shared._pydantic import BaseModel, get_fields +from hera.shared.serialization import serialize +from hera.workflows import Artifact, Parameter +from hera.workflows.artifact import ArtifactLoader +from hera.workflows.io.v1 import ( + RunnerInput as RunnerInputV1, + RunnerOutput as RunnerOutputV1, +) + +try: + from hera.workflows.io.v2 import ( # type: ignore + RunnerInput as RunnerInputV2, + RunnerOutput as RunnerOutputV2, + ) +except ImportError: + from hera.workflows.io.v1 import ( # type: ignore + RunnerInput as RunnerInputV2, + RunnerOutput as RunnerOutputV2, + ) + +try: + from typing import Annotated, get_args, get_origin # type: ignore +except ImportError: + from typing_extensions import Annotated, get_args, get_origin # type: ignore + + +def _get_outputs_path(destination: Union[Parameter, Artifact]) -> Path: + """Get the path from the destination annotation using the defined outputs directory.""" + path = Path(os.environ.get("hera__outputs_directory", "/tmp/hera-outputs")) + if isinstance(destination, Parameter) and destination.name: + path = path / f"parameters/{destination.name}" + elif isinstance(destination, Artifact): + if destination.path: + path = Path(destination.path) + elif destination.name: + path = path / f"artifacts/{destination.name}" + return path + + +def _get_annotated_input_param_value( + func_param_name: str, + param_annotation: Parameter, + kwargs: Dict[str, str], +) -> str: + if param_annotation.name in kwargs: + return kwargs[param_annotation.name] + + if func_param_name in kwargs: + return kwargs[func_param_name] + + raise RuntimeError( + f"Parameter {param_annotation.name if param_annotation.name else func_param_name} was not given a value" + ) + + +def get_annotated_param_value( + func_param_name: str, + param_annotation: Parameter, + kwargs: Dict[str, str], +) -> Union[Path, str]: + """Get the value from a given function param and its annotation. + + If the parameter is an output, return the path it will write to. + If the parameter is an input, return the string value from the kwargs dict, + which could be from the param_annotation.name if given, or func_param_name. + """ + if param_annotation.output: + if param_annotation.value_from and param_annotation.value_from.path: + path = Path(param_annotation.value_from.path) + else: + path = _get_outputs_path(param_annotation) + # Automatically create the parent directory (if required) + path.parent.mkdir(parents=True, exist_ok=True) + return path + return _get_annotated_input_param_value(func_param_name, param_annotation, kwargs) + + +def get_annotated_artifact_value(artifact_annotation: Artifact) -> Union[Path, Any]: + """Get the value of the given Artifact annotation. + + If the artifact is an output, return the path it will write to. + If the artifact is an input, return the loaded value (json, path or string) using its ArtifactLoader. + + As Artifacts are always Annotated in function parameters, we don't need to consider + the `kwargs` or the function parameter name. + """ + if artifact_annotation.output: + if artifact_annotation.path: + path = Path(artifact_annotation.path) + else: + path = _get_outputs_path(artifact_annotation) + # Automatically create the parent directory (if required) + path.parent.mkdir(parents=True, exist_ok=True) + return path + + if not artifact_annotation.path: + # Path is added to the spec automatically when built. As it isn't present in the annotation itself, + # we need to add it back in for the runner. + artifact_annotation.path = artifact_annotation._get_default_inputs_path() + + if artifact_annotation.loader == ArtifactLoader.json.value: + path = Path(artifact_annotation.path) + return json.load(path.open()) + + if artifact_annotation.loader == ArtifactLoader.file.value: + path = Path(artifact_annotation.path) + return path.read_text() + + if artifact_annotation.loader is None: + return artifact_annotation.path + + raise RuntimeError(f"Artifact {artifact_annotation.name} was not given a value") + + +T = TypeVar("T", bound=Type[BaseModel]) + + +def map_runner_input( + runner_input_class: T, + kwargs: Dict[str, str], +) -> T: + """Map argo input kwargs to the fields of the given RunnerInput, return an instance of the class. + + If the field is annotated, we look for the kwarg with the name from the annotation (Parameter or Artifact). + Otherwise, we look for the kwarg with the name of the field. + """ + from hera.workflows._runner.util import _get_type + + input_model_obj = {} + + def load_parameter_value(value: str, value_type: type) -> Any: + if issubclass(_get_type(value_type), str): + return value + + try: + return json.loads(value) + except json.JSONDecodeError: + return value + + def map_field( + field: str, + kwargs: Dict[str, str], + ) -> Any: + annotation = runner_input_class.__annotations__[field] + if get_origin(annotation) is Annotated: + meta_annotation = get_args(annotation)[1] + if isinstance(meta_annotation, Parameter): + assert not meta_annotation.output + return load_parameter_value( + _get_annotated_input_param_value(field, meta_annotation, kwargs), + get_args(annotation)[0], + ) + + if isinstance(meta_annotation, Artifact): + return get_annotated_artifact_value(meta_annotation) + + return load_parameter_value(kwargs[field], annotation) + + for field in get_fields(runner_input_class): + input_model_obj[field] = map_field(field, kwargs) + + return cast(T, runner_input_class.parse_raw(json.dumps(input_model_obj))) + + +def _map_argo_inputs_to_function(function: Callable, kwargs: Dict[str, str]) -> Dict: + """Map kwargs from Argo to the function parameters using the function's parameter annotations. + + For Parameter inputs: + * if the Parameter has a "name", replace it with the function parameter name + * otherwise use the function parameter name as-is + For Parameter outputs: + * update value to a Path object from the value_from.path value, or the default if not provided + + For Artifact inputs: + * load the Artifact according to the given ArtifactLoader + For Artifact outputs: + * update value to a Path object + """ + mapped_kwargs: Dict[str, Any] = {} + + for func_param_name, func_param in inspect.signature(function).parameters.items(): + if get_origin(func_param.annotation) is Annotated: + func_param_annotation = get_args(func_param.annotation)[1] + + if isinstance(func_param_annotation, Parameter): + mapped_kwargs[func_param_name] = get_annotated_param_value( + func_param_name, func_param_annotation, kwargs + ) + elif isinstance(func_param_annotation, Artifact): + mapped_kwargs[func_param_name] = get_annotated_artifact_value(func_param_annotation) + else: + mapped_kwargs[func_param_name] = kwargs[func_param_name] + elif get_origin(func_param.annotation) is None and issubclass( + func_param.annotation, (RunnerInputV1, RunnerInputV2) + ): + mapped_kwargs[func_param_name] = map_runner_input(func_param.annotation, kwargs) + else: + mapped_kwargs[func_param_name] = kwargs[func_param_name] + return mapped_kwargs + + +def _save_annotated_return_outputs( + function_outputs: Union[Tuple[Any], Any], + output_annotations: List[ + Union[Tuple[type, Union[Parameter, Artifact]], Union[Type[RunnerOutputV1], Type[RunnerOutputV2]]] + ], +) -> Optional[Union[RunnerOutputV1, RunnerOutputV2]]: + """Save the outputs of the function to the specified output destinations. + + The output values are matched with the output annotations and saved using the schema: + /artifacts/ + /parameters/ + If the artifact path or parameter value_from.path is specified, that is used instead. + can be provided by the user or is set to /tmp/hera-outputs by default + """ + if not isinstance(function_outputs, tuple): + function_outputs = [function_outputs] + if len(function_outputs) != len(output_annotations): + raise ValueError("The number of outputs does not match the annotation") + + if os.environ.get("hera__script_pydantic_io", None) is not None: + return_obj = None + + for output_value, dest in zip(function_outputs, output_annotations): + if isinstance(output_value, (RunnerOutputV1, RunnerOutputV2)): + if os.environ.get("hera__script_pydantic_io", None) is None: + raise ValueError("hera__script_pydantic_io environment variable is not set") + + return_obj = output_value + + for field, value in output_value.dict().items(): + if field in {"exit_code", "result"}: + continue + + matching_output = output_value._get_output(field) + path = _get_outputs_path(matching_output) + _write_to_path(path, value) + else: + assert isinstance(dest, tuple) + if get_origin(dest[0]) is None: + # Built-in types return None from get_origin, so we can check isinstance directly + if not isinstance(output_value, dest[0]): + raise ValueError( + f"The type of output `{dest[1].name}`, `{type(output_value)}` does not match the annotated type `{dest[0]}`" + ) + else: + # Here, we know get_origin is not None, but its return type is found to be `Optional[Any]` + origin_type = cast(type, get_origin(dest[0])) + if not isinstance(output_value, origin_type): + raise ValueError( + f"The type of output `{dest[1].name}`, `{type(output_value)}` does not match the annotated type `{dest[0]}`" + ) + + if not dest[1].name: + raise ValueError("The name was not provided for one of the outputs.") + + path = _get_outputs_path(dest[1]) + _write_to_path(path, output_value) + + if os.environ.get("hera__script_pydantic_io", None) is not None: + return return_obj + + return None + + +def _save_dummy_outputs( + output_annotations: List[ + Union[Tuple[type, Union[Parameter, Artifact]], Union[Type[RunnerOutputV1], Type[RunnerOutputV2]]] + ], +) -> None: + """Save dummy values into the outputs specified. + + This function is used at runtime by the Hera Runner to create files in the container so that Argo + does not log confusing error messages that obfuscate the real error, which look like: + ``` + msg="cannot save parameter /tmp/hera-outputs/parameters/my-parameter" + argo=true + error="open /tmp/hera-outputs/parameters/my-parameter: no such file or directory"` + ``` + + The output annotations are used to write files using the schema: + /artifacts/ + /parameters/ + If the artifact path or parameter value_from.path is specified, that is used instead. + can be provided by the user or is set to /tmp/hera-outputs by default + """ + for dest in output_annotations: + if isinstance(dest, (RunnerOutputV1, RunnerOutputV2)): + if os.environ.get("hera__script_pydantic_io", None) is None: + raise ValueError("hera__script_pydantic_io environment variable is not set") + + for field in get_fields(dest): + if field in {"exit_code", "result"}: + continue + + annotation = dest._get_output(field) + path = _get_outputs_path(annotation) + _write_to_path(path, "") + else: + assert isinstance(dest, tuple) + if not dest[1].name: + raise ValueError("The name was not provided for one of the outputs.") + + path = _get_outputs_path(dest[1]) + _write_to_path(path, "") + + +def _write_to_path(path: Path, output_value: Any) -> None: + """Write the output_value as serialized text to the provided path. Create the necessary parent directories.""" + output_string = serialize(output_value) + if output_string is not None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(output_string) diff --git a/src/hera/workflows/_runner/util.py b/src/hera/workflows/_runner/util.py new file mode 100644 index 000000000..c27b014eb --- /dev/null +++ b/src/hera/workflows/_runner/util.py @@ -0,0 +1,261 @@ +"""The util module contains the functionality required for the script runner.""" +import argparse +import functools +import importlib +import inspect +import json +import os +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, cast + +from hera.shared._pydantic import _PYDANTIC_VERSION +from hera.shared.serialization import serialize +from hera.workflows import Artifact, Parameter +from hera.workflows._runner.script_annotations_util import ( + _map_argo_inputs_to_function, + _save_annotated_return_outputs, + _save_dummy_outputs, +) +from hera.workflows.artifact import ArtifactLoader +from hera.workflows.io.v1 import ( + RunnerOutput as RunnerOutputV1, +) +from hera.workflows.script import _extract_return_annotation_output + +try: + from typing import Annotated, get_args, get_origin # type: ignore +except ImportError: + from typing_extensions import Annotated, get_args, get_origin # type: ignore + +try: # pydantic-v1/v2 related imports + from pydantic.type_adapter import TypeAdapter # type: ignore + from pydantic.v1 import parse_obj_as # type: ignore + + from hera.workflows.io.v2 import ( # type: ignore + RunnerOutput as RunnerOutputV2, + ) +except ImportError: + from pydantic import parse_obj_as + + from hera.workflows.io.v1 import ( # type: ignore + RunnerOutput as RunnerOutputV2, + ) + + +def _ignore_unmatched_kwargs(f: Callable) -> Callable: + """Make function ignore unmatched kwargs. + + If the function already has the catch all **kwargs, do nothing. + """ + if _contains_var_kwarg(f): + return f + + @functools.wraps(f) + def inner(**kwargs): + # filter out kwargs that are not part of the function signature + # and transform them to the correct type + filtered_kwargs = {key: _parse(value, key, f) for key, value in kwargs.items() if _is_kwarg_of(key, f)} + return f(**filtered_kwargs) + + return inner + + +def _contains_var_kwarg(f: Callable) -> bool: + """Tells whether the given callable contains a keyword argument.""" + return any(param.kind == inspect.Parameter.VAR_KEYWORD for param in inspect.signature(f).parameters.values()) + + +def _is_kwarg_of(key: str, f: Callable) -> bool: + """Tells whether the given `key` identifies a keyword argument of the given callable.""" + param = inspect.signature(f).parameters.get(key) + return param is not None and ( + param.kind is inspect.Parameter.KEYWORD_ONLY or param.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD + ) + + +def _parse(value: str, key: str, f: Callable) -> Any: + """Parse a value to the correct type. + + Args: + value: The value to parse. + key: The name of the kwarg. + f: The function to parse the value for. + + Returns: + The parsed value. + + """ + if _is_str_kwarg_of(key, f) or _is_artifact_loaded(key, f) or _is_output_kwarg(key, f): + return value + try: + if os.environ.get("hera__script_annotations", None) is None: + return json.loads(value) + + type_ = _get_unannotated_type(key, f) + loaded_json_value = json.loads(value) + + if not type_: + return loaded_json_value + + _pydantic_mode = int(os.environ.get("hera__pydantic_mode", _PYDANTIC_VERSION)) + if _pydantic_mode == 1: + return parse_obj_as(type_, loaded_json_value) + else: + return TypeAdapter(type_).validate_python(loaded_json_value) + except (json.JSONDecodeError, TypeError): + return value + + +def _get_type(type_: type) -> type: + if get_origin(type_) is None: + return type_ + origin_type = cast(type, get_origin(type_)) + if origin_type is Annotated: + return get_args(type_)[0] + return origin_type + + +def _get_unannotated_type(key: str, f: Callable) -> Optional[type]: + """Get the type of function param without the 'Annotated' outer type.""" + type_ = inspect.signature(f).parameters[key].annotation + if type_ is inspect.Parameter.empty: + return None + if get_origin(type_) is None: + return type_ + + origin_type = cast(type, get_origin(type_)) + if origin_type is Annotated: + return get_args(type_)[0] + + # Type could be a dict/list with subscript type + return type_ + + +def _is_str_kwarg_of(key: str, f: Callable) -> bool: + """Check if param `key` of function `f` has a type annotation of a subclass of str.""" + func_param_annotation = inspect.signature(f).parameters[key].annotation + if func_param_annotation is inspect.Parameter.empty: + return False + + type_ = _get_type(func_param_annotation) + return issubclass(type_, str) + + +def _is_artifact_loaded(key: str, f: Callable) -> bool: + """Check if param `key` of function `f` is actually an Artifact that has already been loaded.""" + param = inspect.signature(f).parameters[key] + return ( + get_origin(param.annotation) is Annotated + and isinstance(get_args(param.annotation)[1], Artifact) + and get_args(param.annotation)[1].loader == ArtifactLoader.json.value + ) + + +def _is_output_kwarg(key: str, f: Callable) -> bool: + """Check if param `key` of function `f` is an output Artifact/Parameter.""" + param = inspect.signature(f).parameters[key] + return ( + get_origin(param.annotation) is Annotated + and isinstance(get_args(param.annotation)[1], (Artifact, Parameter)) + and get_args(param.annotation)[1].output + ) + + +def _runner(entrypoint: str, kwargs_list: List) -> Any: + """Run the function defined by the entrypoint with the given list of kwargs. + + Args: + entrypoint: The module path to the script within the container to execute. "package.submodule:function" + kwargs_list: A list of dicts with "name" and "value" keys, representing the kwargs of the function. + + Returns: + The result of the function or `None` if the outputs are to be saved. + """ + # import the module and get the function + module, function_name = entrypoint.split(":") + function: Callable = getattr(importlib.import_module(module), function_name) + # if the function is wrapped, unwrap it + # this may happen if the function is decorated with @script + if hasattr(function, "wrapped_function"): + function = function.wrapped_function + # convert the kwargs list to a dict + kwargs: Dict[str, str] = {} + for kwarg in kwargs_list: + if "name" not in kwarg or "value" not in kwarg: + continue + # sanitize the key for python + key = cast(str, serialize(kwarg["name"])) + value = kwarg["value"] + kwargs[key] = value + + if os.environ.get("hera__script_annotations", None) is None: + # Do a simple replacement for hyphens to get valid Python parameter names. + kwargs = {key.replace("-", "_"): value for key, value in kwargs.items()} + else: + kwargs = _map_argo_inputs_to_function(function, kwargs) + + # The imported validate_arguments uses smart union by default just in case clients do not rely on it. This means that if a function uses a union + # type for any of its inputs, then this will at least try to map those types correctly if the input object is + # not a pydantic model with smart_union enabled + _pydantic_mode = int(os.environ.get("hera__pydantic_mode", _PYDANTIC_VERSION)) + if _pydantic_mode == 2: + from pydantic import validate_call # type: ignore + + function = validate_call(function) + else: + if _PYDANTIC_VERSION == 1: + from pydantic import validate_arguments + else: + from pydantic.v1 import validate_arguments # type: ignore + function = validate_arguments(function, config=dict(smart_union=True, arbitrary_types_allowed=True)) # type: ignore + + function = _ignore_unmatched_kwargs(function) + + if os.environ.get("hera__script_annotations", None) is not None: + output_annotations = _extract_return_annotation_output(function) + + if output_annotations: + # This will save outputs returned from the function only. Any function parameters/artifacts marked as + # outputs should be written to within the function itself. + try: + output = _save_annotated_return_outputs(function(**kwargs), output_annotations) + except Exception as e: + _save_dummy_outputs(output_annotations) + raise e + return output or None + + return function(**kwargs) + + +def _parse_args() -> argparse.Namespace: + """Creates an argparse for the runner function. + + The returned argparse takes a module and function name as flags and a path to a json file as an argument. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--entrypoint", "-e", type=str, required=True) + parser.add_argument("args_path", type=Path) + return parser.parse_args() + + +def _run() -> None: + """Runs a function from a specific path using parsed arguments from Argo. + + Note that this prints the result of the function to stdout, which is the normal mode of operation for Argo. Any + output of a Python function submitted via a `Script.source` field results in outputs sent to stdout. + """ + args = _parse_args() + # 1. Protect against trying to json.loads on empty files with inner `or r"[]` + # 2. Protect against files containing `null` as text with outer `or []` (as a result of using + # `{{inputs.parameters}}` where the parameters key doesn't exist in `inputs`) + kwargs_list = json.loads(args.args_path.read_text() or r"[]") or [] + assert isinstance(kwargs_list, List) + result = _runner(args.entrypoint, kwargs_list) + if not result: + return + + if isinstance(result, (RunnerOutputV1, RunnerOutputV2)): + print(serialize(result.result)) + exit(result.exit_code) + + print(serialize(result)) diff --git a/src/hera/workflows/runner.py b/src/hera/workflows/runner.py index b61fbd310..af2bcb02e 100644 --- a/src/hera/workflows/runner.py +++ b/src/hera/workflows/runner.py @@ -1,493 +1,6 @@ -"""The runner module contains the functionality required for the script runner.""" -import argparse -import functools -import importlib -import inspect -import json -import os -from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union, cast - -from hera.shared._pydantic import _PYDANTIC_VERSION -from hera.shared.serialization import serialize -from hera.workflows import Artifact, Parameter -from hera.workflows.artifact import ArtifactLoader -from hera.workflows.io.v1 import ( - RunnerInput as RunnerInputV1, - RunnerOutput as RunnerOutputV1, -) - -try: - from hera.workflows.io.v2 import ( # type: ignore - RunnerInput as RunnerInputV2, - RunnerOutput as RunnerOutputV2, - ) -except ImportError: - from hera.workflows.io.v1 import ( # type: ignore - RunnerInput as RunnerInputV2, - RunnerOutput as RunnerOutputV2, - ) -from hera.workflows.script import _extract_return_annotation_output - -try: - from typing import Annotated, get_args, get_origin # type: ignore -except ImportError: - from typing_extensions import Annotated, get_args, get_origin # type: ignore - -try: - from pydantic.type_adapter import TypeAdapter # type: ignore - from pydantic.v1 import parse_obj_as # type: ignore -except ImportError: - from pydantic import parse_obj_as - - -def _ignore_unmatched_kwargs(f): - """Make function ignore unmatched kwargs. - - If the function already has the catch all **kwargs, do nothing. - """ - if _contains_var_kwarg(f): - return f - - @functools.wraps(f) - def inner(**kwargs): - # filter out kwargs that are not part of the function signature - # and transform them to the correct type - filtered_kwargs = {key: _parse(value, key, f) for key, value in kwargs.items() if _is_kwarg_of(key, f)} - return f(**filtered_kwargs) - - return inner - - -def _contains_var_kwarg(f: Callable) -> bool: - """Tells whether the given callable contains a keyword argument.""" - return any(param.kind == inspect.Parameter.VAR_KEYWORD for param in inspect.signature(f).parameters.values()) - - -def _is_kwarg_of(key: str, f: Callable) -> bool: - """Tells whether the given `key` identifies a keyword argument of the given callable.""" - param = inspect.signature(f).parameters.get(key) - return param is not None and ( - param.kind is inspect.Parameter.KEYWORD_ONLY or param.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD - ) - - -def _parse(value, key, f): - """Parse a value to the correct type. - - Args: - value: The value to parse. - key: The name of the kwarg. - f: The function to parse the value for. - - Returns: - The parsed value. - - """ - if _is_str_kwarg_of(key, f) or _is_artifact_loaded(key, f) or _is_output_kwarg(key, f): - return value - try: - if os.environ.get("hera__script_annotations", None) is None: - return json.loads(value) - - type_ = _get_unannotated_type(key, f) - loaded_json_value = json.loads(value) - - if not type_: - return loaded_json_value - - _pydantic_mode = int(os.environ.get("hera__pydantic_mode", _PYDANTIC_VERSION)) - if _pydantic_mode == 1: - return parse_obj_as(type_, loaded_json_value) - else: - return TypeAdapter(type_).validate_python(loaded_json_value) - except (json.JSONDecodeError, TypeError): - return value - - -def _get_type(key: str, f: Callable) -> Optional[type]: - type_ = inspect.signature(f).parameters[key].annotation - if type_ is inspect.Parameter.empty: - return None - if get_origin(type_) is None: - return type_ - origin_type = cast(type, get_origin(type_)) - if origin_type is Annotated: - return get_args(type_)[0] - return origin_type - - -def _get_unannotated_type(key: str, f: Callable) -> Optional[type]: - """Get the type of function param without the 'Annotated' outer type.""" - type_ = inspect.signature(f).parameters[key].annotation - if type_ is inspect.Parameter.empty: - return None - if get_origin(type_) is None: - return type_ - - origin_type = cast(type, get_origin(type_)) - if origin_type is Annotated: - return get_args(type_)[0] - - # Type could be a dict/list with subscript type - return type_ - - -def _is_str_kwarg_of(key: str, f: Callable) -> bool: - """Check if param `key` of function `f` has a type annotation of a subclass of str.""" - type_ = _get_type(key, f) - if type_ is None: - return False - return issubclass(type_, str) - - -def _is_artifact_loaded(key, f): - """Check if param `key` of function `f` is actually an Artifact that has already been loaded.""" - param = inspect.signature(f).parameters[key] - return ( - get_origin(param.annotation) is Annotated - and isinstance(get_args(param.annotation)[1], Artifact) - and get_args(param.annotation)[1].loader == ArtifactLoader.json.value - ) - - -def _is_output_kwarg(key, f): - """Check if param `key` of function `f` is an output Artifact/Parameter.""" - param = inspect.signature(f).parameters[key] - return ( - get_origin(param.annotation) is Annotated - and isinstance(get_args(param.annotation)[1], (Artifact, Parameter)) - and get_args(param.annotation)[1].output - ) - - -def _map_argo_inputs_to_function(function: Callable, kwargs: Dict) -> Dict: - """Map kwargs from Argo to the function parameters using the function's parameter annotations. - - For Parameter inputs: - * if the Parameter has a "name", replace it with the function parameter name - * otherwise use the function parameter name as-is - For Parameter outputs: - * update value to a Path object from the value_from.path value, or the default if not provided - - For Artifact inputs: - * load the Artifact according to the given ArtifactLoader - For Artifact outputs: - * update value to a Path object - """ - mapped_kwargs: Dict[str, Any] = {} - - def map_annotated_param(param_name: str, param_annotation: Parameter) -> None: - if param_annotation.output: - if param_annotation.value_from and param_annotation.value_from.path: - mapped_kwargs[param_name] = Path(param_annotation.value_from.path) - else: - mapped_kwargs[param_name] = _get_outputs_path(param_annotation) - # Automatically create the parent directory (if required) - mapped_kwargs[param_name].parent.mkdir(parents=True, exist_ok=True) - elif param_annotation.name: - mapped_kwargs[param_name] = kwargs[param_annotation.name] - else: - mapped_kwargs[param_name] = kwargs[param_name] - - def map_annotated_artifact(param_name: str, artifact_annotation: Artifact) -> None: - if artifact_annotation.output: - if artifact_annotation.path: - mapped_kwargs[param_name] = Path(artifact_annotation.path) - else: - mapped_kwargs[param_name] = _get_outputs_path(artifact_annotation) - # Automatically create the parent directory (if required) - mapped_kwargs[param_name].parent.mkdir(parents=True, exist_ok=True) - else: - if not artifact_annotation.path: - # Path was added to yaml automatically, we need to add it back in for the runner - artifact_annotation.path = artifact_annotation._get_default_inputs_path() - - if artifact_annotation.loader == ArtifactLoader.json.value: - path = Path(artifact_annotation.path) - mapped_kwargs[param_name] = json.load(path.open()) - elif artifact_annotation.loader == ArtifactLoader.file.value: - path = Path(artifact_annotation.path) - mapped_kwargs[param_name] = path.read_text() - elif artifact_annotation.loader is None: - mapped_kwargs[param_name] = artifact_annotation.path - - T = TypeVar("T", bound=Union[RunnerInputV1, RunnerInputV2]) - - def map_runner_input(param_name: str, runner_input_class: T): - """Map argo input kwargs to the fields of the given RunnerInput. - - If the field is annotated, we look for the kwarg with the name from the annotation (Parameter or Artifact). - Otherwise, we look for the kwarg with the name of the field. - """ - input_model_obj = {} - - def map_field(field: str) -> Optional[str]: - annotation = runner_input_class.__annotations__[field] - if get_origin(annotation) is Annotated: - annotation = get_args(annotation)[1] - if isinstance(annotation, Parameter): - map_annotated_param(field, annotation) - mapped_kwargs[field] = json.loads(mapped_kwargs[field]) - elif isinstance(annotation, Artifact): - map_annotated_artifact(field, annotation) - else: - mapped_kwargs[field] = json.loads(kwargs[field]) - - return field - - for field in runner_input_class.__fields__: - matched_field = map_field(field) - if matched_field: - input_model_obj[field] = mapped_kwargs[matched_field] - - mapped_kwargs[param_name] = runner_input_class.parse_raw(json.dumps(input_model_obj)) - - for param_name, func_param in inspect.signature(function).parameters.items(): - if get_origin(func_param.annotation) is Annotated: - func_param_annotation = get_args(func_param.annotation)[1] - - if isinstance(func_param_annotation, Parameter): - map_annotated_param(param_name, func_param_annotation) - elif isinstance(func_param_annotation, Artifact): - map_annotated_artifact(param_name, func_param_annotation) - else: - mapped_kwargs[param_name] = kwargs[param_name] - elif get_origin(func_param.annotation) is None and issubclass( - func_param.annotation, (RunnerInputV1, RunnerInputV2) - ): - map_runner_input(param_name, func_param.annotation) - else: - mapped_kwargs[param_name] = kwargs[param_name] - return mapped_kwargs - - -def _save_annotated_return_outputs( - function_outputs: Union[Tuple[Any], Any], - output_annotations: List[ - Union[Tuple[type, Union[Parameter, Artifact]], Union[Type[RunnerOutputV1], Type[RunnerOutputV2]]] - ], -) -> Optional[Union[RunnerOutputV1, RunnerOutputV2]]: - """Save the outputs of the function to the specified output destinations. - - The output values are matched with the output annotations and saved using the schema: - /artifacts/ - /parameters/ - If the artifact path or parameter value_from.path is specified, that is used instead. - can be provided by the user or is set to /tmp/hera-outputs by default - """ - if not isinstance(function_outputs, tuple): - function_outputs = [function_outputs] - if len(function_outputs) != len(output_annotations): - raise ValueError("The number of outputs does not match the annotation") - - if os.environ.get("hera__script_pydantic_io", None) is not None: - return_obj = None - - for output_value, dest in zip(function_outputs, output_annotations): - if isinstance(output_value, (RunnerOutputV1, RunnerOutputV2)): - if os.environ.get("hera__script_pydantic_io", None) is None: - raise ValueError("hera__script_pydantic_io environment variable is not set") - - return_obj = output_value - - for field, value in output_value.dict().items(): - if field in {"exit_code", "result"}: - continue - - matching_output = output_value._get_output(field) - path = _get_outputs_path(matching_output) - _write_to_path(path, value) - else: - assert isinstance(dest, tuple) - if get_origin(dest[0]) is None: - # Built-in types return None from get_origin, so we can check isinstance directly - if not isinstance(output_value, dest[0]): - raise ValueError( - f"The type of output `{dest[1].name}`, `{type(output_value)}` does not match the annotated type `{dest[0]}`" - ) - else: - # Here, we know get_origin is not None, but its return type is found to be `Optional[Any]` - origin_type = cast(type, get_origin(dest[0])) - if not isinstance(output_value, origin_type): - raise ValueError( - f"The type of output `{dest[1].name}`, `{type(output_value)}` does not match the annotated type `{dest[0]}`" - ) - - if not dest[1].name: - raise ValueError("The name was not provided for one of the outputs.") - - path = _get_outputs_path(dest[1]) - _write_to_path(path, output_value) - - if os.environ.get("hera__script_pydantic_io", None) is not None: - return return_obj - - return None - - -def _save_dummy_outputs( - output_annotations: List[ - Union[Tuple[type, Union[Parameter, Artifact]], Union[Type[RunnerOutputV1], Type[RunnerOutputV2]]] - ], -) -> None: - """Save dummy values into the outputs specified. - - This function is used at runtime by the Hera Runner to create files in the container so that Argo - does not log confusing error messages that obfuscate the real error, which look like: - ``` - msg="cannot save parameter /tmp/hera-outputs/parameters/my-parameter" - argo=true - error="open /tmp/hera-outputs/parameters/my-parameter: no such file or directory"` - ``` - - The output annotations are used to write files using the schema: - /artifacts/ - /parameters/ - If the artifact path or parameter value_from.path is specified, that is used instead. - can be provided by the user or is set to /tmp/hera-outputs by default - """ - for dest in output_annotations: - if isinstance(dest, (RunnerOutputV1, RunnerOutputV2)): - if os.environ.get("hera__script_pydantic_io", None) is None: - raise ValueError("hera__script_pydantic_io environment variable is not set") - - for field, _ in dest.__fields__: - if field in {"exit_code", "result"}: - continue - - annotation = dest._get_output(field) - path = _get_outputs_path(annotation) - _write_to_path(path, "") - else: - assert isinstance(dest, tuple) - if not dest[1].name: - raise ValueError("The name was not provided for one of the outputs.") - - path = _get_outputs_path(dest[1]) - _write_to_path(path, "") - - -def _get_outputs_path(destination: Union[Parameter, Artifact]) -> Path: - """Get the path from the destination annotation using the defined outputs directory.""" - path = Path(os.environ.get("hera__outputs_directory", "/tmp/hera-outputs")) - if isinstance(destination, Parameter) and destination.name: - path = path / f"parameters/{destination.name}" - elif isinstance(destination, Artifact): - if destination.path: - path = Path(destination.path) - elif destination.name: - path = path / f"artifacts/{destination.name}" - return path - - -def _write_to_path(path: Path, output_value: Any) -> None: - """Write the output_value as serialized text to the provided path. Create the necessary parent directories.""" - output_string = serialize(output_value) - if output_string is not None: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(output_string) - - -def _runner(entrypoint: str, kwargs_list: List) -> Any: - """Run the function defined by the entrypoint with the given list of kwargs. - - Args: - entrypoint: The module path to the script within the container to execute. "package.submodule:function" - kwargs_list: A list of dicts with "name" and "value" keys, representing the kwargs of the function. - - Returns: - The result of the function or `None` if the outputs are to be saved. - """ - # import the module and get the function - module, function_name = entrypoint.split(":") - function = getattr(importlib.import_module(module), function_name) - # if the function is wrapped, unwrap it - # this may happen if the function is decorated with @script - if hasattr(function, "wrapped_function"): - function = function.wrapped_function - # convert the kwargs list to a dict - kwargs = {} - for kwarg in kwargs_list: - if "name" not in kwarg or "value" not in kwarg: - continue - # sanitize the key for python - key = cast(str, serialize(kwarg["name"])) - value = kwarg["value"] - kwargs[key] = value - - if os.environ.get("hera__script_annotations", None) is None: - # Do a simple replacement for hyphens to get valid Python parameter names. - kwargs = {key.replace("-", "_"): value for key, value in kwargs.items()} - else: - kwargs = _map_argo_inputs_to_function(function, kwargs) - - # The imported validate_arguments uses smart union by default just in case clients do not rely on it. This means that if a function uses a union - # type for any of its inputs, then this will at least try to map those types correctly if the input object is - # not a pydantic model with smart_union enabled - _pydantic_mode = int(os.environ.get("hera__pydantic_mode", _PYDANTIC_VERSION)) - if _pydantic_mode == 2: - from pydantic import validate_call # type: ignore - - function = validate_call(function) - else: - if _PYDANTIC_VERSION == 1: - from pydantic import validate_arguments - else: - from pydantic.v1 import validate_arguments # type: ignore - function = validate_arguments(function, config=dict(smart_union=True, arbitrary_types_allowed=True)) # type: ignore - - function = _ignore_unmatched_kwargs(function) - - if os.environ.get("hera__script_annotations", None) is not None: - output_annotations = _extract_return_annotation_output(function) - - if output_annotations: - # This will save outputs returned from the function only. Any function parameters/artifacts marked as - # outputs should be written to within the function itself. - try: - output = _save_annotated_return_outputs(function(**kwargs), output_annotations) - except Exception as e: - _save_dummy_outputs(output_annotations) - raise e - return output or None - - return function(**kwargs) - - -def _parse_args(): - """Creates an argparse for the runner function. - - The returned argparse takes a module and function name as flags and a path to a json file as an argument. - """ - parser = argparse.ArgumentParser() - parser.add_argument("--entrypoint", "-e", type=str, required=True) - parser.add_argument("args_path", type=Path) - return parser.parse_args() - - -def _run(): - """Runs a function from a specific path using parsed arguments from Argo. - - Note that this prints the result of the function to stdout, which is the normal mode of operation for Argo. Any - output of a Python function submitted via a `Script.source` field results in outputs sent to stdout. - """ - args = _parse_args() - # 1. Protect against trying to json.loads on empty files with inner `or r"[]` - # 2. Protect against files containing `null` as text with outer `or []` (as a result of using - # `{{inputs.parameters}}` where the parameters key doesn't exist in `inputs`) - kwargs_list = json.loads(args.args_path.read_text() or r"[]") or [] - assert isinstance(kwargs_list, List) - result = _runner(args.entrypoint, kwargs_list) - if not result: - return - - if isinstance(result, (RunnerOutputV1, RunnerOutputV2)): - print(serialize(result.result)) - exit(result.exit_code) - - print(serialize(result)) +"""The entrypoint for the Hera Runner when running on Argo.""" +from hera.workflows._runner.util import _run if __name__ == "__main__": _run() diff --git a/tests/test_runner.py b/tests/test_runner.py index a3de1fe31..ef79eb916 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -15,11 +15,11 @@ from pydantic import ValidationError import tests.helper as test_module -from hera.shared import GlobalConfig +from hera.shared._global_config import _GlobalConfig from hera.shared._pydantic import _PYDANTIC_VERSION from hera.shared.serialization import serialize +from hera.workflows._runner.util import _run, _runner from hera.workflows.io.v1 import RunnerOutput -from hera.workflows.runner import _run, _runner from hera.workflows.script import RunnerScriptConstructor @@ -86,7 +86,7 @@ def test_parameter_loading( entrypoint, kwargs_list: List[Dict[str, str]], expected_output, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, ): # GIVEN @@ -138,7 +138,7 @@ def test_runner_parameter_inputs( entrypoint, kwargs_list: List[Dict[str, str]], expected_output, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, ): # GIVEN @@ -194,7 +194,7 @@ def test_runner_annotated_parameter_inputs( entrypoint, kwargs_list: List[Dict[str, str]], expected_output, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, pydantic_mode, environ_annotations_fixture: None, monkeypatch, @@ -302,7 +302,7 @@ def test_script_annotations_outputs( function_name, kwargs_list: List[Dict[str, str]], expected_files: List[Dict[str, str]], - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, tmp_path: Path, monkeypatch, @@ -353,7 +353,7 @@ def test_script_raising_error_still_outputs( function_name, expected_error: type, expected_files: List[Dict[str, str]], - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, tmp_path: Path, monkeypatch, @@ -415,7 +415,7 @@ def test_script_annotations_outputs_exceptions( function_name, kwargs_list: List[Dict[str, str]], exception, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, ): """Test that the output annotations throw the expected exceptions.""" @@ -488,7 +488,7 @@ def test_script_annotations_artifact_inputs( expected_output, tmp_path: Path, monkeypatch, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, ): """Test that the input artifact annotations are parsed correctly and the loaders behave as intended.""" # GIVEN @@ -515,7 +515,7 @@ def test_script_annotations_artifact_inputs( def test_script_annotations_artifact_input_loader_error( - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, ): """Test that the input artifact loaded with wrong type throws the expected exception.""" @@ -553,7 +553,7 @@ def test_script_annotations_artifacts_no_path( expected_output, tmp_path: Path, monkeypatch, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, ): """Test that the input artifact annotations are parsed correctly and the loaders behave as intended.""" # GIVEN @@ -575,7 +575,7 @@ def test_script_annotations_artifacts_no_path( def test_script_annotations_artifacts_wrong_loader( - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, ): """Test that the input artifact annotation with no loader throws an exception.""" # GIVEN @@ -592,7 +592,7 @@ def test_script_annotations_artifacts_wrong_loader( assert "value is not a valid enumeration member" in str(e.value) -def test_script_annotations_unknown_type(global_config_fixture: GlobalConfig): +def test_script_annotations_unknown_type(global_config_fixture: _GlobalConfig): # GIVEN expected_output = "a string" entrypoint = "tests.script_runner.unknown_annotation_types:unknown_annotations_ignored" @@ -614,8 +614,8 @@ def test_script_annotations_unknown_type(global_config_fixture: GlobalConfig): [{"name": "a_number", "value": 123}], ], ) -@patch("hera.workflows.runner._runner") -@patch("hera.workflows.runner._parse_args") +@patch("hera.workflows._runner.util._runner") +@patch("hera.workflows._runner.util._parse_args") def test_run(mock_parse_args, mock_runner, kwargs_list, tmp_path: Path): # GIVEN file_path = Path(tmp_path / "test_params") @@ -633,8 +633,8 @@ def test_run(mock_parse_args, mock_runner, kwargs_list, tmp_path: Path): mock_runner.assert_called_once_with("my_entrypoint", kwargs_list) -@patch("hera.workflows.runner._runner") -@patch("hera.workflows.runner._parse_args") +@patch("hera.workflows._runner.util._runner") +@patch("hera.workflows._runner.util._parse_args") def test_run_empty_file(mock_parse_args, mock_runner, tmp_path: Path): # GIVEN file_path = Path(tmp_path / "test_params") @@ -652,8 +652,8 @@ def test_run_empty_file(mock_parse_args, mock_runner, tmp_path: Path): mock_runner.assert_called_once_with("my_entrypoint", []) -@patch("hera.workflows.runner._runner") -@patch("hera.workflows.runner._parse_args") +@patch("hera.workflows._runner.util._runner") +@patch("hera.workflows._runner.util._parse_args") def test_run_null_string(mock_parse_args, mock_runner, tmp_path: Path): # GIVEN file_path = Path(tmp_path / "test_params") @@ -700,7 +700,7 @@ def test_runner_pydantic_inputs_params( entrypoint, kwargs_list: List[Dict[str, str]], expected_output, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, pydantic_mode, environ_annotations_fixture: None, monkeypatch, @@ -740,7 +740,7 @@ def test_runner_pydantic_output_params( entrypoint, expected_files, pydantic_mode, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, monkeypatch, tmp_path: Path, @@ -789,7 +789,7 @@ def test_runner_pydantic_input_artifacts( input_files: Dict, expected_output, pydantic_mode, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, monkeypatch, tmp_path: Path, @@ -843,7 +843,7 @@ def test_runner_pydantic_output_artifacts( input_files: Dict, expected_files, pydantic_mode, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, monkeypatch, tmp_path: Path, @@ -894,7 +894,7 @@ def test_runner_pydantic_output_with_exit_code( entrypoint, expected_files, pydantic_mode, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, monkeypatch, tmp_path: Path, @@ -935,13 +935,13 @@ def test_runner_pydantic_output_with_exit_code( ), ], ) -@patch("hera.workflows.runner._parse_args") +@patch("hera.workflows._runner.util._parse_args") def test_run_pydantic_output_with_exit_code( mock_parse_args, entrypoint, expected_files, pydantic_mode, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, monkeypatch, tmp_path: Path, @@ -995,7 +995,7 @@ def test_runner_pydantic_output_with_result( expected_files, expected_result, pydantic_mode, - global_config_fixture: GlobalConfig, + global_config_fixture: _GlobalConfig, environ_annotations_fixture: None, monkeypatch, tmp_path: Path, diff --git a/tests/test_unit/test_script_annotations_util.py b/tests/test_unit/test_script_annotations_util.py new file mode 100644 index 000000000..991f5145d --- /dev/null +++ b/tests/test_unit/test_script_annotations_util.py @@ -0,0 +1,176 @@ +import json +import os +from pathlib import Path +from typing import Any, Union + +import pytest + +from hera.workflows._runner.script_annotations_util import ( + _get_outputs_path, + get_annotated_artifact_value, + get_annotated_param_value, + map_runner_input, +) +from hera.workflows.artifact import Artifact, ArtifactLoader +from hera.workflows.io import RunnerInput +from hera.workflows.models import ValueFrom +from hera.workflows.parameter import Parameter + + +@pytest.mark.parametrize( + "destination,expected_path", + [ + (Parameter(name="a-param"), Path("/tmp/hera-outputs/parameters/a-param")), + (Artifact(name="an-artifact"), Path("/tmp/hera-outputs/artifacts/an-artifact")), + (Artifact(name="artifact-custom-path", path="/tmp/my-path"), Path("/tmp/my-path")), + ], +) +def test_get_outputs_path(destination: Union[Parameter, Artifact], expected_path: Path): + os.environ["hera__outputs_directory"] = "/tmp/hera-outputs" + assert _get_outputs_path(destination) == expected_path + + +@pytest.mark.parametrize( + "func_param_name,param_annotation,kwargs,expected_value", + [ + pytest.param( + "func_param_name_only", + Parameter(description="use func param name"), + {"func_param_name_only": "value"}, + "value", + id="parameter-annotation-has-no-name", + ), + pytest.param( + "dummy_name", + Parameter(name="input-param-test"), + {"input-param-test": "value"}, + "value", + id="use-parameter-name", + ), + pytest.param( + "func_param_name", + Parameter(name="input-param-test"), + {"func_param_name": "value"}, + "value", + id="user-passes-func-param-name-instead-of-annotation", + ), + pytest.param( + "dummy_name", + Parameter(name="output-default-path-test", output=True), + {}, + Path("/tmp/hera-outputs/parameters/output-default-path-test"), + id="output-parameter-with-default", + ), + pytest.param( + "dummy_name", + Parameter(name="output-path-test", value_from=ValueFrom(path="/tmp/test"), output=True), + {}, + Path("/tmp/test"), + id="output-parameter-with-custom-path", + ), + ], +) +def test_get_annotated_param_value( + func_param_name, + param_annotation, + kwargs, + expected_value, +): + assert get_annotated_param_value(func_param_name, param_annotation, kwargs) == expected_value + + +def test_get_annotated_param_value_error(): + with pytest.raises(RuntimeError, match="my_func_param was not given a value"): + get_annotated_param_value("my_func_param", Parameter(), {}) + + +def test_get_annotated_param_value_error_param_name(): + with pytest.raises(RuntimeError, match="my-func-param was not given a value"): + get_annotated_param_value("my_func_param", Parameter(name="my-func-param"), {}) + + +@pytest.mark.parametrize( + "file_contents,artifact,expected_return", + [ + pytest.param('{"json": "object"}', Artifact(loader=ArtifactLoader.json), {"json": "object"}, id="json-load"), + pytest.param('{"json": "object"}', Artifact(loader=ArtifactLoader.file), '{"json": "object"}', id="file-load"), + ], +) +def test_get_annotated_artifact_value_inputs_with_loaders( + file_contents: str, + artifact: Artifact, + expected_return: Any, + tmp_path: Path, +): + file_path = tmp_path / "contents.txt" + file_path.write_text(file_contents) + artifact.path = file_path + assert get_annotated_artifact_value(artifact) == expected_return + + +@pytest.mark.parametrize( + "file_contents,artifact", + [ + pytest.param('{"json": "object"}', Artifact(loader=None), id="file-load"), + ], +) +def test_get_annotated_artifact_value_path_inputs( + file_contents: str, + artifact: Artifact, + tmp_path: Path, +): + file_path = tmp_path / "contents.txt" + file_path.write_text(file_contents) + artifact.path = file_path + assert get_annotated_artifact_value(artifact) == file_path + + +@pytest.mark.parametrize( + "artifact,expected_path", + [ + pytest.param(Artifact(path="/tmp/test.txt", output=True), "/tmp/test.txt", id="given-path"), + pytest.param( + Artifact(name="artifact-name", output=True), "/tmp/hera-outputs/artifacts/artifact-name", id="default-path" + ), + ], +) +def test_get_annotated_artifact_value_path_outputs( + artifact: Artifact, + expected_path: str, +): + assert get_annotated_artifact_value(artifact) == Path(expected_path) + + +def test_map_runner_input(): + class MyInput(RunnerInput): + a_str: str + an_int: int + a_dict: dict + a_list: list + + kwargs = { + "a_str": "hello", + "an_int": "123", + "a_dict": '{"a-key": "a-value"}', + "a_list": json.dumps([1, 2, 3]), + } + assert map_runner_input(MyInput, kwargs) == MyInput( + a_str="hello", + an_int=123, + a_dict={"a-key": "a-value"}, + a_list=[1, 2, 3], + ) + + +def test_map_runner_input_strings(): + """Test the parsing logic when str type fields are passed json-serialized strings.""" + + class MyInput(RunnerInput): + a_dict_str: str + a_list_str: str + + kwargs = {"a_dict_str": json.dumps({"key": "value"}), "a_list_str": json.dumps([1, 2, 3])} + assert map_runner_input(MyInput, kwargs) == MyInput( + a_dict_str=json.dumps({"key": "value"}), + a_list_str=json.dumps([1, 2, 3]), + )