Skip to content

Commit

Permalink
Merge pull request #35 from FAST-HEP/kreczko-issue-12
Browse files Browse the repository at this point in the history
Add PythonOperator
  • Loading branch information
kreczko authored Oct 15, 2024
2 parents 065f4a9 + 1415cb4 commit 4dd18d4
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/examples/hello_world.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ tasks:
kwargs:
bash_command: touch
arguments: ["/tmp/date.txt"]
- name: printPython
type: "fasthep_flow.operators.PythonOperator"
kwargs:
python_callable: print
arguments: ["Hello World!"]
4 changes: 3 additions & 1 deletion src/fasthep_flow/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Module for defining basic operators."""

from __future__ import annotations

from .base import Operator
from .bash import BashOperator, LocalBashOperator
from .py_call import PythonOperator

# only Operator is exposed to the user, everything else is imported directly by the workflow
__all__ = ["BashOperator", "LocalBashOperator", "Operator"]
__all__ = ["BashOperator", "LocalBashOperator", "Operator", "PythonOperator"]
21 changes: 21 additions & 0 deletions src/fasthep_flow/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Definition of the Operator protocol."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Protocol


Expand All @@ -18,3 +20,22 @@ def __repr__(self) -> str:

def configure(self, **kwargs: Any) -> None:
"""General function to configure the operator."""


@dataclass
class ResultType:
"""The result type of an operator. Can add validation here if needed."""

result: Any
stdout: str
stderr: str
exit_code: int

def to_dict(self) -> dict[str, Any]:
"""Convert the ResultType to a dictionary."""
return {
"result": self.result,
"stdout": self.stdout,
"stderr": self.stderr,
"exit_code": self.exit_code,
}
6 changes: 4 additions & 2 deletions src/fasthep_flow/operators/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Any

from .base import Operator
from .base import Operator, ResultType

try:
# try to import plumbum
Expand Down Expand Up @@ -31,7 +31,9 @@ def configure(self, **kwargs: Any) -> None:
def __call__(self, **kwargs: Any) -> dict[str, Any]:
command = plumbum.local[self.bash_command]
exit_code, stdout, stderr = command.run(*self.arguments)
return {"stdout": stdout, "stderr": stderr, "exit_code": exit_code}
return ResultType(
result=None, stdout=stdout, stderr=stderr, exit_code=exit_code
).to_dict()

def __repr__(self) -> str:
return f'LocalBashOperator(bash_command="{self.bash_command}", arguments={self.arguments})'
Expand Down
40 changes: 40 additions & 0 deletions src/fasthep_flow/operators/py_call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Python related operators."""

from __future__ import annotations

import io
from collections.abc import Callable
from contextlib import redirect_stderr, redirect_stdout
from typing import Any

from .base import Operator, ResultType


class PythonOperator(Operator):
"""A Python operator. This operator wraps a Python callable."""

python_callable: Callable[..., Any]
arguments: list[Any]

def __init__(self, **kwargs: Any):
self.configure(**kwargs)

def configure(self, **kwargs: Any) -> None:
"""Configure the operator."""
self.python_callable = kwargs.pop("python_callable")
self.arguments = kwargs.pop("arguments")

def __call__(self, **kwargs: Any) -> dict[str, Any]:
stdout, stderr = io.StringIO(), io.StringIO()
with redirect_stdout(stdout), redirect_stderr(stderr):
result = self.python_callable(*self.arguments)
result = self.python_callable(*self.arguments)
return ResultType(
result=result,
stdout=stdout.getvalue(),
stderr=stderr.getvalue(),
exit_code=0,
).to_dict()

def __repr__(self) -> str:
return f"PythonOperator(python_callable={self.python_callable}, arguments={self.arguments})"

0 comments on commit 4dd18d4

Please sign in to comment.