Skip to content

Commit

Permalink
Merge pull request #6 from Sage-Bionetworks-Workflows/bgrande/ORCA-13…
Browse files Browse the repository at this point in the history
…4/airflow-testing

[ORCA-134] Update "orca" based on Airflow testing
  • Loading branch information
Bruno Grande authored Feb 10, 2023
2 parents b283b43 + 0a0dcf1 commit d668790
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 44 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,8 @@ repos:
- id: interrogate
exclude: ^(docs/conf.py|setup.py|tests)
args: [--config=pyproject.toml]

- repo: https://github.com/jendrikseipp/vulture
rev: 'v2.7'
hooks:
- id: vulture
24 changes: 18 additions & 6 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ verbose = true
fail-under = 95
ignore-module = true
ignore-nested-functions = true

[tool.vulture]
paths = ["src/orca/"]
min_confidence = 80
5 changes: 3 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ dev =
sphinx-autodoc-typehints~=1.21
interrogate~=1.5
jupyterlab~=3.6
vulture~=2.7

[options.entry_points]
# Add here console scripts like:
Expand All @@ -114,7 +115,7 @@ apache_airflow_provider =
# Comment those flags to avoid this pytest issue.
addopts =
--cov "orca" --cov-report "term-missing" --cov-report "xml"
-m "not slow and not integration"
-m "not slow and not integration and not acceptance"
--verbose
norecursedirs =
dist
Expand All @@ -130,7 +131,7 @@ testpaths =
markers =
slow: mark tests as slow (deselect with '-m "not slow"')
integration: mark tests that interact with external services
# acceptance: mark end-to-end acceptance tests
acceptance: mark end-to-end acceptance tests

[devpi:upload]
# Options for the devpi: PyPI server and packaging tool
Expand Down
43 changes: 35 additions & 8 deletions src/orca/services/sevenbridges/client_factory.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from __future__ import annotations

import os
from dataclasses import field
from functools import cached_property
from typing import Any, ClassVar, Optional, Type
from typing import TYPE_CHECKING, Any, ClassVar, Optional, Type

from airflow.models.connection import Connection
from pydantic.dataclasses import dataclass
from sevenbridges import Api
from sevenbridges.http.error_handlers import maintenance_sleeper, rate_limit_sleeper

from orca.errors import ClientArgsError, ClientRequestError

if TYPE_CHECKING:
from airflow.models.connection import Connection


@dataclass(kw_only=False)
class SevenBridgesClientFactory:
Expand Down Expand Up @@ -62,14 +66,14 @@ def __post_init_post_parse__(self) -> None:
self.update_client_kwargs()

@staticmethod
def map_connection(connection: Connection) -> dict[str, Any]:
"""Map Airflow connection fields to client arguments.
def parse_connection(connection: Connection) -> dict[str, Optional[str]]:
"""Map Airflow connection fields to keyword arguments.
Args:
connection: An Airflow connection object.
Returns:
A dictionary of client arguments.
Keyword arguments relevant to SevenBridges.
"""
api_endpoint = None
if connection.host:
Expand All @@ -80,9 +84,34 @@ def map_connection(connection: Connection) -> dict[str, Any]:
kwargs = {
"api_endpoint": api_endpoint,
"auth_token": connection.password,
"project": connection.extra_dejson.get("project"),
}
return kwargs

@classmethod
def connection_from_env(cls) -> Connection:
"""Generate Airflow connection from environment variable.
Returns:
An Airflow connection
"""
# Following Airflow's lead on this non-standard practice
# because this import does introduce a bit of overhead
from airflow.models.connection import Connection

env_connection_uri = os.environ.get(cls.CONNECTION_ENV)
return Connection(uri=env_connection_uri)

@classmethod
def kwargs_from_env(cls) -> dict[str, Optional[str]]:
"""Parse environment variable for keyword arguments.
Returns:
Keyword arguments relevant to SevenBridges.
"""
env_connection = cls.connection_from_env()
return cls.parse_connection(env_connection)

def resolve_credentials(self) -> None:
"""Resolve SevenBridges credentials based on priority.
Expand All @@ -93,9 +122,7 @@ def resolve_credentials(self) -> None:
return

# Get value from environment, which is confirmed to be available
env_connection_uri = os.environ[self.CONNECTION_ENV]
env_connection = Connection(uri=env_connection_uri)
env_kwargs = self.map_connection(env_connection)
env_kwargs = self.kwargs_from_env()

# Resolve single values for each client argument based on priority
self.api_endpoint = self.api_endpoint or env_kwargs["api_endpoint"]
Expand Down
51 changes: 43 additions & 8 deletions src/orca/services/sevenbridges/hook.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
from sevenbridges import Api

from orca.errors import ClientArgsError
from orca.services.sevenbridges.client_factory import SevenBridgesClientFactory
from orca.services.sevenbridges.ops import SevenBridgesOps

if TYPE_CHECKING:
from airflow.models.connection import Connection


class SevenBridgesHook(BaseHook):
"""Wrapper around SevenBridges client and ops classes.
Expand All @@ -31,18 +39,45 @@ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs):
extras = self.connection.extra_dejson
self.project = extras.get("project")

def get_conn(self) -> Api:
"""Retrieve authenticated SevenBridges client."""
return self.client
@classmethod
def get_connection(cls, conn_id: str) -> Connection:
"""
Retrieve Airflow connection
Args:
conn_id: Airflow connection ID.
Returns:
An Airflow connection.
"""
try:
connection = super().get_connection(conn_id)
except AirflowNotFoundException:
connection = SevenBridgesClientFactory.connection_from_env()
return connection

def get_conn(self) -> SevenBridgesOps:
"""Retrieve the authenticated SevenBridgesOps object.
This object contains an authenticated SevenBridges client.
Returns:
An authenticated SevenBridgesOps instance.
"""
return self.ops

@cached_property
def client(self) -> Api:
"""Retrieve authenticated SevenBridges client."""
client_args = SevenBridgesClientFactory.map_connection(self.connection)
factory = SevenBridgesClientFactory(**client_args)
return factory.get_client()
return self.ops.client

@cached_property
def ops(self) -> SevenBridgesOps:
"""Retrieve authenticated SevenBridgesOps instance."""
return SevenBridgesOps(self.client, self.project)
"""An authenticated SevenBridgesOps instance."""
kwargs = SevenBridgesClientFactory.parse_connection(self.connection)
endpoint = kwargs.pop("api_endpoint")
token = kwargs.pop("auth_token")
if endpoint is None or token is None:
message = f"Unset 'api_endpoint' ({endpoint}) or 'auth_token' ({token})"
raise ClientArgsError(message)
return SevenBridgesOps.from_args(endpoint, token, **kwargs)
9 changes: 3 additions & 6 deletions src/orca/services/sevenbridges/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ class SevenBridgesOps:
project: Optional[str] = None

@classmethod
def from_creds(
def from_args(
cls,
api_endpoint: str,
auth_token: str,
project: Optional[str] = None,
**client_kwargs: Any,
) -> SevenBridgesOps:
"""Construct SevenBridgesOps from credentials.
"""Construct SevenBridgesOps from individual arguments.
Args:
api_endpoint: API base endpoint.
Expand All @@ -63,13 +62,11 @@ def from_creds(
project: An owner-prefixed SevenBridges project.
For example: <username>/<project-name>.
Defaults to None.
**client_kwargs: Additional keyword arguments that are passed
to the SevenBridges client during its construction.
Returns:
An authenticated SevenBridgesOps instance.
"""
factory = SevenBridgesClientFactory(api_endpoint, auth_token, client_kwargs)
factory = SevenBridgesClientFactory(api_endpoint, auth_token)
client = factory.get_client()
return SevenBridgesOps(client, project)

Expand Down
32 changes: 32 additions & 0 deletions tests/acceptance/test_cavatica.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest
from sevenbridges.models.enums import TaskStatus

from orca.services.sevenbridges import SevenBridgesHook


@pytest.mark.acceptance
def test_cavatica_launch_poc_v2(run_id):
def create_task():
hook = SevenBridgesHook()
task_inputs = {
"input_type": "FASTQ",
"reads1": hook.client.files.get("63e569217a0654635c558c84"),
"reads2": hook.client.files.get("63e5694ebfc712185ac37a27"),
"runThreadN": 36,
"wf_strand_param": "default",
"sample_name": "HCC1187_1M",
"rmats_read_length": 101,
"outSAMattrRGline": "ID:HCC1187_1M\tLB:NA\tPL:Illumina\tSM:HCC1187_1M",
"output_basename": run_id,
}
app_id = "orca-service/test-project/kfdrc-rnaseq-workflow"
task_id = hook.ops.create_task(run_id, app_id, task_inputs)
return task_id

def monitor_task(task_name):
hook = SevenBridgesHook()
return hook.ops.get_task_status(task_name)

task_id = create_task()
task_status, _ = monitor_task(task_id)
assert hasattr(TaskStatus, str(task_status))
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
from datetime import datetime
from getpass import getuser
from uuid import uuid4

import pytest

UUID = str(uuid4())
USER = getuser()
UTCTIME = datetime.now().isoformat("T", "seconds").replace(":", ".")
RUN_ID = f"{USER}--{UTCTIME}--{UUID}" # Valid characters: [A-Za-z0-9._-]


@pytest.fixture
def run_id():
return RUN_ID


@pytest.fixture
def patch_os_environ(mocker):
Expand Down
Loading

0 comments on commit d668790

Please sign in to comment.