Skip to content

Commit

Permalink
Updated branching capabilities (#2)
Browse files Browse the repository at this point in the history
Supports nested branches.
Fixes some edge cases.

for example: 
```
assign > step_1 > step_2
step_2 > step_3
step_2 > step_4
step_1 > step_5
```
creates 
```
main:
  steps:
  - init:
      assign:
      - test: test
  - step_1:
      call: http.post
      args:
        url: ${"https://us-central1-" + args.project_id + ".cloudfunctions.net/step1"}
  - branch-2:
      parallel:
        branches:
        - step_2_branch:
            steps:
            - step_2:
                call: http.post
                args:
                  url: ${"https://us-central1-" + args.project_id + ".cloudfunctions.net/step2"}
                  body:
                    var: ${var1}
        - step_5_branch:
            steps:
            - step_5:
                call: http.post
                args:
                  url: ${"https://us-central1-" + args.project_id + ".cloudfunctions.net/step5"}
  - branch-1:
      parallel:
        branches:
        - step_3_branch:
            steps:
            - step_3:
                call: http.post
                args:
                  url: ${"https://us-central1-" + args.project_id + ".cloudfunctions.net/step3"}
        - step_4_branch:
            steps:
            - step_4:
                call: http.post
                args:
                  url: ${"https://us-central1-" + args.project_id + ".cloudfunctions.net/step4"}
```
  • Loading branch information
anovis authored Feb 3, 2023
1 parent cb5da48 commit d5e5ece
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 119 deletions.
4 changes: 2 additions & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[flake8]
max-line-length = 88
ignore = E501, W503
ignore = E501, W503
#E501 line too long
#W503 line break before binary operator
; exclude = .git,__pycache__,docs
max-complexity = 15
extend-ignore = 203
extend-ignore = E203
20 changes: 20 additions & 0 deletions .github/stale.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Number of days of inactivity before an issue becomes stale
daysUntilStale: 60
# Number of days of inactivity before a stale issue is closed
daysUntilClose: 7
# Issues with these labels will never be considered stale
exemptLabels:
- pinned
- proposal
- enhancement
- bug
- documentation
# Label to use when marking an issue as stale
staleLabel: stale
# Comment to post when marking an issue as stale. Set to `false` to disable
markComment: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. Thank you
for your contributions.
# Comment to post when closing a stale issue. Set to `false` to disable
closeComment: false
56 changes: 56 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: "CI"

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
Lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install Linter
run: |
python -m pip install --upgrade pip
pip install flake8==6.0.0
- name: Lint Check
run: |
make lint
- uses: psf/black@stable
with:
src: "./goblet_workflows"
version: "23.1.0"

Test:
runs-on: ubuntu-latest
needs: Lint
strategy:
matrix:
python-version: ['3.10']
name: Test Python ${{ matrix.python-version }}
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install pytest & goblet_workflows
run: |
python -m pip install --upgrade pip
pip install pytest
pip install coverage
pip install requests-mock
pip install -r requirements.txt
- name: Run pytest
run: |
export PYTHONPATH=$(pwd)
coverage run -m pytest goblet_workflows/tests;
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v1
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

![PyPI](https://img.shields.io/pypi/v/goblet_workflows?color=blue&style=plastic)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/goblet_workflows?style=plastic)
![Tests](https://github.com/goblet/goblet_workflows/actions/workflows/main.yml/badge.svg)
[![codecov](https://codecov.io/gh/goblet/goblet_workflows/branch/main/graph/badge.svg?token=g8TL6Sc0P5)](https://codecov.io/gh/goblet/goblet_workflows)

Goblet Workflows is a wrapper around [GCP Workflows](https://cloud.google.com/workflows/docs/overview), which is a fully-managed orchestration platform that executes services in an order that you define: a workflow. These workflows can combine services including custom services hosted on Cloud Run or Cloud Functions, Google Cloud services such as Cloud Vision AI and BigQuery, and any HTTP-based API.

Expand Down
8 changes: 8 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
coverage:
status:
project:
default:
target: 80%
threshold: 1%
comment:
require_changes: true
8 changes: 8 additions & 0 deletions goblet_workflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
from goblet_workflows.workflow import Workflow # noqa: F401
from goblet_workflows.steps import ( # noqa: F401
Step, # noqa: F401
MultiStep, # noqa: F401
AssignStep, # noqa: F401
HttpStep, # noqa: F401
BQStep, # noqa: F401
) # noqa: F401
from goblet_workflows.controls import Branch, For # noqa: F401
2 changes: 1 addition & 1 deletion goblet_workflows/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION = (0, 1, 1)
VERSION = (0, 1, 2)


__version__ = ".".join(map(str, VERSION))
10 changes: 1 addition & 9 deletions goblet_workflows/cli.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import click
import os
import logging

# import subprocess
import json

# import requests
import sys

from goblet_workflows.utils import get_workflow
from goblet_workflows.client import get_default_project
from goblet_workflows.__version__ import __version__

logging.basicConfig()
Expand All @@ -35,8 +28,7 @@ def version():

@main.command()
@click.option("--schedule", "schedule", is_flag=True)
@click.option("-f","--file", "file", envvar="FILE")

@click.option("-f", "--file", "file", envvar="FILE")
def deploy(schedule, file):
"""
Deploy a workflow
Expand Down
84 changes: 84 additions & 0 deletions goblet_workflows/controls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from goblet_workflows.steps import Step
from goblet_workflows.exceptions import GobletWorkflowException


class Branch(Step):
def __init__(self, name: str, branches=[], shared=None, **kwargs) -> None:
self.name: str = name
self.kwargs = kwargs
self.shared = shared
self.branches = branches
if self.branches:
self.workflow = self.branches[0].workflow
self.workflow.register_step(self)

def __call__(self, branches):
self.branches = branches
if self.branches:
self.workflow = self.branches[0].workflow
self.workflow.register_step(self)
return self

def append(self, parent):
self.branches.append(parent)

def create_definition(self):
if not self.branches:
raise GobletWorkflowException("At least one branch step is required")
optional = {}
if self.shared:
optional["shared"] = self.shared

branches = []
for v in self.branches:
if isinstance(v, Step):
branches.append(
{f"{v.name}_branch": {"steps": [v.create_definition()]}}
)
if isinstance(v, list):
branches.append(
{
f"{v[0].name}_branch": {
"steps": [s.create_definition() for s in v]
}
}
)

definition = {self.name: {"parallel": {**optional, "branches": branches}}}
return definition


class For(Step):
def __init__(self, value: str, value_list: list, steps=[], **kwargs) -> None:
self.value: str = value
self.value_list: list = value_list
self.kwargs = kwargs
self.steps = steps
# TODO: Unique name for for element
self.name = "For"
if self.steps:
self.workflow = self.steps[0].workflow
self.workflow.register_step(self)

def __call__(self, steps):
if not steps:
raise GobletWorkflowException("At least one step is required")
self.steps = steps
if isinstance(self.steps, Step):
self.steps = [steps]
self.workflow = self.steps[0].workflow
self.workflow.register_step(self)
return self

def create_definition(self):
if not self.steps:
raise GobletWorkflowException("At least one step is required")

definition = {
"for": {
"value": self.value,
"in": self.value_list,
"steps": [s.create_definition() for s in self.steps],
}
}
return definition
97 changes: 16 additions & 81 deletions goblet_workflows/steps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from goblet_workflows.workflow import Workflow
from goblet_workflows.exceptions import GobletWorkflowException
from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from goblet_workflows.workflow import Workflow


class Step:
Expand Down Expand Up @@ -63,6 +66,17 @@ def create_definition(self):


class HttpStep(Step):
def __init__(
self, workflow: Workflow, name: str, args: dict, call: str = "post", **kwargs
) -> None:
super().__init__(
workflow,
name,
call=call,
args=args,
**kwargs,
)

def create_definition(self):
optional = {}
if self.kwargs.get("result"):
Expand Down Expand Up @@ -129,85 +143,6 @@ def create_definition(self):
return definition


class Branch(Step):
def __init__(self, name: str, branches=[], shared=None, **kwargs) -> None:
self.name: str = name
self.kwargs = kwargs
self.shared = shared
self.branches = branches
if self.branches:
self.workflow = self.branches[0].workflow
self.workflow.register_step(self)

def __call__(self, branches):
self.branches = branches
if self.branches:
self.workflow = self.branches[0].workflow
self.workflow.register_step(self)
return self

def create_definition(self):
if not self.branches:
raise GobletWorkflowException("At least one branch step is required")
optional = {}
if self.shared:
optional["shared"] = self.shared

branches = []
for v in self.branches:
if isinstance(v, Step):
branches.append(
{f"{v.name}_branch": {"steps": [v.create_definition()]}}
)
if isinstance(v, list):
branches.append(
{
f"{v[0].name}_branch": {
"steps": [s.create_definition() for s in v]
}
}
)

definition = {self.name: {"parallel": {**optional, "branches": branches}}}
return definition


class For(Step):
def __init__(self, value: str, value_list: list, steps=[], **kwargs) -> None:
self.value: str = value
self.value_list: list = value_list
self.kwargs = kwargs
self.steps = steps
# TODO: Unique name for for element
self.name = "For"
if self.steps:
self.workflow = self.steps[0].workflow
self.workflow.register_step(self)

def __call__(self, steps):
if not steps:
raise GobletWorkflowException("At least one step is required")
self.steps = steps
if isinstance(self.steps, Step):
self.steps = [steps]
self.workflow = self.steps[0].workflow
self.workflow.register_step(self)
return self

def create_definition(self):
if not self.steps:
raise GobletWorkflowException("At least one step is required")

definition = {
"for": {
"value": self.value,
"in": self.value_list,
"steps": [s.create_definition() for s in self.steps],
}
}
return definition


class DataformSteps(MultiStep):
def __init__(self, workflow: Workflow, git_branch, **kwargs) -> None:
super().__init__(workflow, **kwargs)
Expand Down
Loading

0 comments on commit d5e5ece

Please sign in to comment.