Skip to content

Commit

Permalink
refactor: add install_dependencies to common_airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Oct 24, 2023
1 parent 27874fe commit 7fcc492
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def create_cluster(
)


def _submit_job(cluster_name, task_id, job_type, job_specification):
def submit_job(cluster_name, task_id, job_type, job_specification):
"""Submit an arbitrary job to a Dataproc cluster."""
return DataprocSubmitJobOperator(
task_id=task_id,
Expand All @@ -108,7 +108,7 @@ def submit_pyspark_job(cluster_name, task_id, python_module_path, args):
"""Submit a PySpark job to a Dataproc cluster."""
if isinstance(args, dict):
args = [f"--{arg}={val}" for arg, val in args.items()]
return _submit_job(
return submit_job(
cluster_name=cluster_name,
task_id=task_id,
job_type="pyspark_job",
Expand All @@ -126,11 +126,11 @@ def submit_pyspark_job(cluster_name, task_id, python_module_path, args):
)


def submit_pig_job(cluster_name, task_id):
"""Submit a Pig job to a Dataproc cluster."""
return _submit_job(
def install_dependencies(cluster_name):
"""Install dependencies on a Dataproc cluster."""
return submit_job(
cluster_name=cluster_name,
task_id=task_id,
task_id="install_dependencies",
job_type="pig_job",
job_specification={
"jar_file_uris": [
Expand Down
3 changes: 3 additions & 0 deletions src/airflow/dags/dag_genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from common_airflow import (
create_cluster,
delete_cluster,
install_dependencies,
shared_dag_args,
shared_dag_kwargs,
submit_pyspark_job,
Expand All @@ -29,6 +30,7 @@
assert (
SOURCE_CONFIG_FILE_PATH.exists()
), f"Config path {SOURCE_CONFIG_FILE_PATH} does not exist."

with open(SOURCE_CONFIG_FILE_PATH, "r") as config_file:
# Parse and define all steps and their prerequisites.
tasks = {}
Expand All @@ -54,6 +56,7 @@
# Construct the DAG with all tasks.
(
create_cluster(CLUSTER_NAME)
>> install_dependencies
>> list(tasks.values())
>> delete_cluster(CLUSTER_NAME)
)

0 comments on commit 7fcc492

Please sign in to comment.