Skip to content

Commit

Permalink
[dagster-airlift] KubernetesPodOperator guide
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 22, 2024
1 parent c6068ad commit ee6dba1
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 1 deletion.
6 changes: 5 additions & 1 deletion docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,11 @@
{
"title": "Migrating a BashOperator for dbt",
"path": "/integrations/airlift/operator-migration/bash-operator-dbt"
}
},
{
"title": "Migrating a KubernetesPodOperator",
"path": "/integrations/airlift/operator-migration/kubernetes-pod-operator"
},
]
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Operator migration guides: Migrating usage of `KubernetesPodOperator`

In this page, we'll explain migrating an Airflow `KubernetesPodOperator` to Dagster.

### Background

The KubernetesPodOperator in Apache Airflow enables users to execute containerized tasks within Kubernetes pods as part of their data pipelines.

```python file=/integrations/airlift/operator_migration/kubernetes_pod_operator.py
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k8s_hello_world = KubernetesPodOperator(
task_id="hello_world_task",
name="hello-world-pod",
image="bash:latest",
cmds=["bash", "-cx"],
arguments=['echo "Hello World!"'],
)
```

### Dagster equivalent

The Dagster equivalent is to use the <PyObject object="PipesK8sClient" module="dagster_k8s"/> to execute a task within a Kubernetes pod.

```python file=/integrations/airlift/operator_migration/using_k8s_pipes.py
from dagster_k8s import PipesK8sClient

from dagster import AssetExecutionContext, asset

container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}


@asset
def execute_hello_world_task(context: AssetExecutionContext):
return (
PipesK8sClient()
.run(
context=context.op_execution_context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
)
.get_results()
)
```

### Migrating the operator

Migrating the operator breaks down into a few steps:

1. Ensure that your Dagster deployment has access to the Kubernetes cluster.
2. Write an <PyObject object="asset" module="dagster"/> that executes the task within a Kubernetes pod using the <PyObject object="PipesK8sClient" module="dagster_k8s"/>.
3. Use `dagster-airlift` to proxy execution of the original task to Dagster.

### Step 1: Ensure access to the Kubernetes cluster

First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The <PyObject object="PipesK8sClient" module="dagster_k8s"/> accepts `kubeconfig` and `kubecontext`, and `env` arguments to configure the Kubernetes client.

Here's an example of what this might look like when configuring the client to access an EKS cluster:

```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_client endbefore=end_client
from dagster_k8s import PipesK8sClient

eks_client = PipesK8sClient(
# The client will have automatic access to all
# environment variables in the execution context.
env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
kubeconfig_file="path/to/kubeconfig",
kube_context="my-eks-cluster",
)
```

### Step 2: Writing an asset that executes the task within a Kubernetes pod

Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the <PyObject object="PipesK8sClient" module="dagster_k8s"/>. In comparison to the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code.

In the [parameter comparison](/integrations/airlift/kubernetes-pod-operator#parameter-comparison) section of this doc, you'll find a detailed comparison describing how to map the KubernetesPodOperator parameters to the PipesK8sClient parameters.

```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_asset endbefore=end_asset
from dagster import AssetExecutionContext, asset

container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}


@asset
def execute_hello_world_task(context: AssetExecutionContext):
return eks_client.run(
context=context.op_execution_context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
).get_results()
```

This is just a snippet of what the PipesK8sClient can do. Take a look at our full guide on the [dagster-k8s PipesK8sClient](/dagster-pipes/kubernetes) for more information.

### Step 3: Using `dagster-airlift` to proxy execution

Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process.

### Parameter comparison

Here's a comparison of the parameters between the KubernetesPodOperator and the PipesK8sClient: Directly supported arguments:

- in_cluster (named load_incluster_config here)
- cluster_context (named kube_context here)
- config_file (named kubeconfig_file here)

Many arguments are supported indirectly via the pod_spec argument.

- volumes: Volumes to be used by the Pod (key `volumes`)
- affinity: Node affinity/anti-affinity rules for the Pod (key `affinity`)
- node_selector: Node selection constraints for the Pod (key `nodeSelector`)
- hostnetwork: Enable host networking for the Pod (key `hostNetwork`)
- dns_config: DNS settings for the Pod (key `dnsConfig`)
- dnspolicy: DNS policy for the Pod (key `dnsPolicy`)
- hostname: Hostname of the Pod (key `hostname`)
- subdomain: Subdomain for the Pod (key `subdomain`)
- schedulername: Scheduler to be used for the Pod (key `schedulerName`)
- service_account_name: Service account to be used by the Pod (key `serviceAccountName`)
- priority_class_name: Priority class for the Pod (key `priorityClassName`)
- security_context: Security context for the entire Pod (key `securityContext`)
- tolerations: Tolerations for the Pod (key `tolerations`)
- image_pull_secrets: Secrets for pulling container images (key `imagePullSecrets`)
- termination_grace_period: Grace period for Pod termination (key `terminationGracePeriodSeconds`)
- active_deadline_seconds: Deadline for the Pod's execution (key `activeDeadlineSeconds`)
- host_aliases: Additional entries for the Pod's /etc/hosts (key `hostAliases`)
- init_containers: Initialization containers for the Pod (key `initContainers`)

The following arguments are supported under the nested `containers` key of the pod_spec argument:

- image: Docker image for the container (key 'image')
- cmds: Entrypoint command for the container (key `command`)
- arguments: Arguments for the entrypoint command (key `args`)
- ports: List of ports to expose from the container (key `ports`)
- volume_mounts: List of volume mounts for the container (key `volumeMounts`)
- env_vars: Environment variables for the container (key `env`)
- env_from: List of sources to populate environment variables (key `envFrom`)
- image_pull_policy: Policy for pulling the container image (key `imagePullPolicy`)
- container_resources: Resource requirements for the container (key `resources`)
- container_security_context: Security context for the container (key `securityContext`)
- termination_message_policy: Policy for the termination message (key `terminationMessagePolicy`)

For a full list, see the [kubernetes container spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#container-v1-core). The following arguments are supported under the pod_metadata argument, which configures the metadata of the pod:

- name: `name`
- namespace: `namespace`
- labels: `labels`
- annotations: `annotations`

For a full list, see the [kubernetes objectmeta spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#objectmeta-v1-meta).
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ This page contains a collection of reference materials for migrating usage of co
title="Bash Operator for dbt"
href="/integrations/airlift/operator-migration/bash-operator-dbt"
></ArticleListItem>
<ArticleListItem
title="Kubernetes Pod Operator"
href="/integrations/airlift/operator-migration/kubernetes-pod-operator"
></ArticleListItem>
</ArticleList>
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
AWS_CREDENTIALS = {}

# start_client
from dagster_k8s import PipesK8sClient

eks_client = PipesK8sClient(
# The client will have automatic access to all
# environment variables in the execution context.
env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
kubeconfig_file="path/to/kubeconfig",
kube_context="my-eks-cluster",
)
# end_client

# start_asset
from dagster import AssetExecutionContext, asset

container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}


@asset
def execute_hello_world_task(context: AssetExecutionContext):
return eks_client.run(
context=context.op_execution_context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
).get_results()


# end_asset
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k8s_hello_world = KubernetesPodOperator(
task_id="hello_world_task",
name="hello-world-pod",
image="bash:latest",
cmds=["bash", "-cx"],
arguments=['echo "Hello World!"'],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dagster_k8s import PipesK8sClient

from dagster import AssetExecutionContext, asset

container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}


@asset
def execute_hello_world_task(context: AssetExecutionContext):
return (
PipesK8sClient()
.run(
context=context.op_execution_context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
)
.get_results()
)

0 comments on commit ee6dba1

Please sign in to comment.