Skip to content

Commit

Permalink
Merge pull request #46 from shanduur/dev-batch-job
Browse files Browse the repository at this point in the history
feat(batch/jobs): jobs emulation
  • Loading branch information
deviantony authored Nov 9, 2024
2 parents e57a7ff + e4d2b03 commit 6bd9e56
Show file tree
Hide file tree
Showing 17 changed files with 650 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/k2d.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func main() {
container.Add(apis.APIs())
// /apis/apps
container.Add(apis.Apps())
// /apis/batch
container.Add(apis.Batch())
// /apis/events.k8s.io
container.Add(apis.Events())
// /apis/authorization.k8s.io
Expand Down
4 changes: 4 additions & 0 deletions internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/apis/apps"
appsv1 "k8s.io/kubernetes/pkg/apis/apps/v1"
"k8s.io/kubernetes/pkg/apis/batch"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/core"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/apis/storage"
Expand Down Expand Up @@ -200,6 +202,8 @@ func initConversionScheme() *runtime.Scheme {

apps.AddToScheme(scheme)
appsv1.AddToScheme(scheme)
batch.AddToScheme(scheme)
batchv1.AddToScheme(scheme)
core.AddToScheme(scheme)
corev1.AddToScheme(scheme)
storage.AddToScheme(scheme)
Expand Down
54 changes: 54 additions & 0 deletions internal/adapter/converter/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package converter

import (
"time"

"github.com/docker/docker/api/types"
k2dtypes "github.com/portainer/k2d/internal/adapter/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/batch"
)

func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, container types.Container, json types.ContainerJSON) {
job.TypeMeta = metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
}

job.ObjectMeta.CreationTimestamp = metav1.NewTime(time.Unix(container.Created, 0))
if job.ObjectMeta.Annotations == nil {
job.ObjectMeta.Annotations = make(map[string]string)
}

job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = container.Labels[k2dtypes.LastAppliedConfigLabelKey]

containerState := container.State

startTime, _ := time.Parse(time.RFC3339Nano, json.State.StartedAt)

metaStartTime := &metav1.Time{
Time: startTime,
}

job.Status.StartTime = metaStartTime

job.Status.Active = 0

if containerState == "running" {
job.Status.Active = 1
} else {
if json.State.ExitCode == 0 {
job.Status.Succeeded = 1

completionTime, _ := time.Parse(time.RFC3339Nano, json.State.FinishedAt)

metaCompletionTime := &metav1.Time{
Time: completionTime,
}

job.Status.CompletionTime = metaCompletionTime
} else {
job.Status.Failed = 1
}
}
}
2 changes: 2 additions & 0 deletions internal/adapter/converter/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (converter *DockerAPIConverter) ConvertContainerToPod(container types.Conta
},
}
} else {
// TODO: handle exited containers, so Jobs can determine the status
// /containers/<container ID>/json ? This will allow getting Exit Code.
pod.Status.Phase = core.PodUnknown

// this is to mark the pod's condition as unknown
Expand Down
40 changes: 40 additions & 0 deletions internal/adapter/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ func AllDeployments(namespace string) filters.Args {
return filter
}

// AllJobs creates a Docker filter argument for Kubernetes Jobs within a given namespace.
// The function filters Docker resources based on the Workload and Namespace labels, specifically for Jobs.
//
// Parameters:
// - namespace: The Kubernetes namespace to filter by.
//
// Returns:
// - filters.Args: A Docker filter object that can be used to filter Docker API calls based on the namespace and Workload type labels.
//
// Usage Example:
//
// filter := AllJobs("default")
// // Now 'filter' can be used in Docker API calls to filter Job resources in the 'default' Kubernetes namespace.
func AllJobs(namespace string) filters.Args {
filter := filters.NewArgs()
filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadTypeLabelKey, types.JobWorkloadType))
filter.Add("label", fmt.Sprintf("%s=%s", types.NamespaceNameLabelKey, namespace))
return filter
}

// AllNamespaces creates a Docker filter argument that targets resources labeled with a Kubernetes namespace.
// This function uses the types.NamespaceLabelKey constant as the base label key to filter Docker resources.
//
Expand Down Expand Up @@ -81,6 +101,26 @@ func ByDeployment(namespace, deploymentName string) filters.Args {
return filter
}

// ByJob creates a Docker filter argument for a specific Kubernetes Job within a given namespace.
// The function builds upon the JobsFilter by further narrowing down the filter to match a specific Job name.
//
// Parameters:
// - namespace: The Kubernetes namespace to filter by.
// - jobName: The name of the specific Kubernetes Job to filter by.
//
// Returns:
// - filters.Args: A Docker filter object that can be used to filter Docker API calls based on the namespace and Job name labels.
//
// Usage Example:
//
// filter := ByJob("default", "my-job")
// // Now 'filter' can be used in Docker API calls to filter resources in the 'default' Kubernetes namespace that are part of 'my-job'.
func ByJob(namespace, jobName string) filters.Args {
filter := AllJobs(namespace)
filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadNameLabelKey, jobName))
return filter
}

// ByNamespace creates a Docker filter argument to target all Docker resources within a specific Kubernetes namespace.
// If an empty string is provided, it will return a filter that targets all namespaces.
//
Expand Down
186 changes: 186 additions & 0 deletions internal/adapter/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package adapter

import (
"context"
"encoding/json"
"fmt"

"github.com/docker/docker/api/types"
adaptererr "github.com/portainer/k2d/internal/adapter/errors"
"github.com/portainer/k2d/internal/adapter/filters"
k2dtypes "github.com/portainer/k2d/internal/adapter/types"
"github.com/portainer/k2d/internal/k8s"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/batch"
)

func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, job *batchv1.Job) error {
opts := ContainerCreationOptions{
containerName: job.Name,
namespace: job.Namespace,
podSpec: job.Spec.Template.Spec,
labels: job.Spec.Template.Labels,
}

if opts.labels == nil {
opts.labels = make(map[string]string)
}

opts.labels[k2dtypes.WorkloadTypeLabelKey] = k2dtypes.JobWorkloadType

if job.Labels["app.kubernetes.io/managed-by"] == "Helm" {
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("unable to marshal job: %w", err)
}
job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = string(jobData)
}

// kubectl create job does not pass the last-applied-configuration annotation
// so we need to add it manually
if job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] == "" {
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("unable to marshal job: %w", err)
}
opts.labels[k2dtypes.LastAppliedConfigLabelKey] = string(jobData)
}

opts.lastAppliedConfiguration = job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"]

return adapter.createContainerFromPodSpec(ctx, opts)
}

func (adapter *KubeDockerAdapter) getContainerFromJobName(ctx context.Context, jobName, namespace string) (types.Container, error) {
filter := filters.ByJob(namespace, jobName)
containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter})
if err != nil {
return types.Container{}, fmt.Errorf("unable to list containers: %w", err)
}

if len(containers) == 0 {
return types.Container{}, adaptererr.ErrResourceNotFound
}

if len(containers) > 1 {
return types.Container{}, fmt.Errorf("multiple containers were found with the associated job name %s", jobName)
}

return containers[0], nil
}

func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, namespace string) (*batchv1.Job, error) {
container, err := adapter.getContainerFromJobName(ctx, jobName, namespace)
if err != nil {
return nil, fmt.Errorf("unable to get container from job name: %w", err)
}

job, err := adapter.buildJobFromContainer(ctx, container)
if err != nil {
return nil, fmt.Errorf("unable to get job: %w", err)
}

versionedJob := batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
},
}

err = adapter.ConvertK8SResource(job, &versionedJob)
if err != nil {
return nil, fmt.Errorf("unable to convert internal object to versioned object: %w", err)
}

return &versionedJob, nil
}

func (adapter *KubeDockerAdapter) GetJobTable(ctx context.Context, namespace string) (*metav1.Table, error) {
jobList, err := adapter.listJobs(ctx, namespace)
if err != nil {
return &metav1.Table{}, fmt.Errorf("unable to list jobs: %w", err)
}

return k8s.GenerateTable(&jobList)
}

func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batchv1.JobList, error) {
jobList, err := adapter.listJobs(ctx, namespace)
if err != nil {
return batchv1.JobList{}, fmt.Errorf("unable to list jobs: %w", err)
}

versionedJobList := batchv1.JobList{
TypeMeta: metav1.TypeMeta{
Kind: "JobList",
APIVersion: "batch/v1",
},
}

err = adapter.ConvertK8SResource(&jobList, &versionedJobList)
if err != nil {
return batchv1.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err)
}

return versionedJobList, nil
}

func (adapter *KubeDockerAdapter) buildJobFromContainer(ctx context.Context, container types.Container) (*batch.Job, error) {
if container.Labels[k2dtypes.LastAppliedConfigLabelKey] == "" {
return nil, fmt.Errorf("unable to build job, missing %s label on container %s", k2dtypes.LastAppliedConfigLabelKey, container.Names[0])
}

jobData := container.Labels[k2dtypes.LastAppliedConfigLabelKey]

versionedJob := batchv1.Job{}

err := json.Unmarshal([]byte(jobData), &versionedJob)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal versioned job: %w", err)
}

job := batch.Job{}
err = adapter.ConvertK8SResource(&versionedJob, &job)
if err != nil {
return nil, fmt.Errorf("unable to convert versioned job spec to internal job spec: %w", err)
}

containerInspect, err := adapter.cli.ContainerInspect(ctx, container.ID)
if err != nil {
return nil, fmt.Errorf("unable to inspect the container: %w", err)
}

adapter.converter.UpdateJobFromContainerInfo(&job, container, containerInspect)

return &job, nil
}

func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batch.JobList, error) {
filter := filters.AllJobs(namespace)
containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter})
if err != nil {
return batch.JobList{}, fmt.Errorf("unable to list containers: %w", err)
}

jobs := []batch.Job{}

for _, container := range containers {
job, err := adapter.buildJobFromContainer(ctx, container)
if err != nil {
return batch.JobList{}, fmt.Errorf("unable to get job: %w", err)
}

if job != nil {
jobs = append(jobs, *job)
}
}

return batch.JobList{
TypeMeta: metav1.TypeMeta{
Kind: "JobList",
APIVersion: "batch/v1",
},
Items: jobs,
}, nil
}
7 changes: 7 additions & 0 deletions internal/adapter/types/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// StorageTypeLabelKey is the key used to store the storage type in the labels of a system configmap or a Docker volume
// It is used to differentiate between persistent volumes and config maps when listing volumes
StorageTypeLabelKey = "storage.k2d.io/type"

// JobLastAppliedConfigLabelKey is the key used to store the job specific last applied configuration in the container labels
JobLastAppliedConfigLabelKey = "job.k2d.io/last-applied-configuration"
)

const (
Expand All @@ -58,4 +61,8 @@ const (
// DeploymentWorkloadType is the label value used to identify a Deployment workload
// It is stored on a container as a label and used to filter containers when listing deployments
DeploymentWorkloadType = "deployment"

// JobWorkloadType is the label value used to identify a Job workload
// It is stored on a container as a label and used to filter containers when listing deployments
JobWorkloadType = "job"
)
9 changes: 9 additions & 0 deletions internal/api/apis/apigroups_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ func ListAPIGroups(r *restful.Request, w *restful.Response) {
},
},
},
{
Name: "batch",
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: "batch/v1",
Version: "v1",
},
},
},
{
Name: "events.k8s.io",
Versions: []metav1.GroupVersionForDiscovery{
Expand Down
Loading

0 comments on commit 6bd9e56

Please sign in to comment.