Skip to content

Commit

Permalink
Update dags_airflow_fede_k8sexecutor.py
Browse files Browse the repository at this point in the history
  • Loading branch information
leggerf authored Jan 26, 2024
1 parent 4045f3b commit 0fdbbb9
Showing 1 changed file with 99 additions and 37 deletions.
136 changes: 99 additions & 37 deletions dags_airflow_fede_k8sexecutor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
"""
This is an example dag for using a Kubernetes Executor Configuration.
This is an example dag for using a Kubernetes Executor Configuration.
It starts the following tasks:
1) pod with annotation
2) pod with mounted volume
3) pod with sidecar and shared volume
4) pod with label
5) pod with other namespace
6) pod with image
7) pod with resource limits
"""
from __future__ import annotations

import logging
import os

import pendulum

from airflow.configuration import conf
Expand All @@ -15,55 +22,70 @@

log = logging.getLogger(__name__)

# Check k8s is there
try:
from kubernetes.client import models as k8s
except ImportError:
log.warning(
"The example_kubernetes_executor example DAG requires the kubernetes provider."
"This DAG requires the kubernetes provider."
" Please install it with: pip install apache-airflow[cncf.kubernetes]"
)
k8s = None


if k8s:
with DAG(
dag_id="example_kubernetes_executor",
dag_id="kubernetes_executor",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example3"],
tags=["fede"],
) as dag:
# You can use annotations on your kubernetes pods!
start_task_executor_config = {
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))

#############################################################
# Define config for task with pod annotation
#############################################################
executor_config_annotation = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(annotations={"test": "fede"})
)
}

@task(executor_config=start_task_executor_config)
def start_task():
#############################################################
# 1) task with pod annotation
#############################################################
@task(executor_config=executor_config_annotation)
def test_annotation():
print_stuff()

# [START task_with_volume]
annotation_task = test_annotation()

#############################################################
# Define config for task with volume, mount host /tmp/ to /foo/
#############################################################
executor_config_volume_mount = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
k8s.V1VolumeMount(mount_path="/foo/", name="test-volume")
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
name="test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
}

###########################################################
# 2) task with mount volume
###########################################################
@task(executor_config=executor_config_volume_mount)
def test_volume_mount():
"""
Expand All @@ -78,32 +100,44 @@ def test_volume_mount():
raise ValueError(f"Error when checking volume mount. Return code {return_code}")

volume_task = test_volume_mount()
# [END task_with_volume]

# [START task_with_sidecar]
#############################################################
# Define config for task with sidecar and shared volume /shared/
#############################################################
executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/shared/", name="shared-empty-dir")],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=['echo "retrieved from mount" > /shared/test.txt'],
command=["bash", "-cx"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/shared/", name="shared-empty-dir"
)
],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
k8s.V1Volume(
name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()
),
],
)
),
}

###########################################################
# 3) pod with sidecar and shared volumes
###########################################################
@task(executor_config=executor_config_sidecar)
def test_sharedvolume_mount():
"""
Expand All @@ -119,38 +153,53 @@ def test_sharedvolume_mount():
raise e

sidecar_task = test_sharedvolume_mount()
# [END task_with_sidecar]

# You can add labels to pods
executor_config_non_root = {
#############################################################
# Define config for task: pod with label
#############################################################
executor_config_label = {
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
}

@task(executor_config=executor_config_non_root)
def non_root_task():

#############################################################
# 4) pod with label
#############################################################
@task(executor_config=executor_config_label)
def test_label():
print_stuff()

third_task = non_root_task()

label_task = test_label()

#############################################################
# Define config for task: pod with namespace
#############################################################
executor_config_other_ns = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={"release": "stable"})
)
}

#############################################################
# 5) pod with other namespace
#############################################################
@task(executor_config=executor_config_other_ns)
def other_namespace_task():
print_stuff()

other_ns_task = other_namespace_task()

#############################################################
# Define config for task: pod with image
#############################################################

worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository")
worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag")

# You can also change the base image, here we used the worker image for demonstration.
# Note that the image must have the same configuration as the
# worker image. Could be that you want to run this task in a special docker image that has a zip
# Note that the image must have the same configuration as the worker image.
# Could be that you want to run this task in a special docker image that has a zip
# library built-in. You build the special docker image on top your worker image.
kube_exec_config_special = {
kube_exec_config_image = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
Expand All @@ -162,12 +211,19 @@ def other_namespace_task():
)
}

@task(executor_config=kube_exec_config_special)
def base_image_override_task():
#############################################################
# 6) pod with image
#############################################################
@task(executor_config=kube_exec_config_image)
def image_override_task():
print_stuff()

base_image_task = base_image_override_task()
image_task = image_override_task()

#############################################################
# Define config for task: pod with resource limits
#############################################################

# Use k8s_client.V1Affinity to define node affinity
k8s_affinity = k8s.V1Affinity(
pod_anti_affinity=k8s.V1PodAntiAffinity(
Expand Down Expand Up @@ -207,15 +263,21 @@ def base_image_override_task():
)
}

#############################################################
# 7) pod with resource limits
#############################################################
@task(executor_config=kube_exec_config_resource_limits)
def task_with_resource_limits():
print_stuff()

four_task = task_with_resource_limits()
resource_task = task_with_resource_limits()

#############################################################
# Define DAG execution
#############################################################
(
start_task()
annotation_task
>> [volume_task, other_ns_task, sidecar_task]
>> third_task
>> [base_image_task, four_task]
>> label_task
>> [image_task, resource_task]
)

0 comments on commit 0fdbbb9

Please sign in to comment.