diff --git a/cmd/k2d.go b/cmd/k2d.go index 7718f57..2d55f43 100644 --- a/cmd/k2d.go +++ b/cmd/k2d.go @@ -176,6 +176,8 @@ func main() { container.Add(apis.APIs()) // /apis/apps container.Add(apis.Apps()) + // /apis/batch + container.Add(apis.Batch()) // /apis/events.k8s.io container.Add(apis.Events()) // /apis/authorization.k8s.io diff --git a/internal/adapter/adapter.go b/internal/adapter/adapter.go index c3002bd..b731d86 100644 --- a/internal/adapter/adapter.go +++ b/internal/adapter/adapter.go @@ -18,6 +18,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/apis/apps" appsv1 "k8s.io/kubernetes/pkg/apis/apps/v1" + "k8s.io/kubernetes/pkg/apis/batch" + batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" "k8s.io/kubernetes/pkg/apis/core" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/apis/storage" @@ -200,6 +202,8 @@ func initConversionScheme() *runtime.Scheme { apps.AddToScheme(scheme) appsv1.AddToScheme(scheme) + batch.AddToScheme(scheme) + batchv1.AddToScheme(scheme) core.AddToScheme(scheme) corev1.AddToScheme(scheme) storage.AddToScheme(scheme) diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go new file mode 100644 index 0000000..7eefb49 --- /dev/null +++ b/internal/adapter/converter/job.go @@ -0,0 +1,54 @@ +package converter + +import ( + "time" + + "github.com/docker/docker/api/types" + k2dtypes "github.com/portainer/k2d/internal/adapter/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/batch" +) + +func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, container types.Container, json types.ContainerJSON) { + job.TypeMeta = metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + } + + job.ObjectMeta.CreationTimestamp = metav1.NewTime(time.Unix(container.Created, 0)) + if job.ObjectMeta.Annotations == nil { + job.ObjectMeta.Annotations = make(map[string]string) + } + + job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = container.Labels[k2dtypes.LastAppliedConfigLabelKey] + + containerState := container.State + + startTime, _ := time.Parse(time.RFC3339Nano, json.State.StartedAt) + + metaStartTime := &metav1.Time{ + Time: startTime, + } + + job.Status.StartTime = metaStartTime + + job.Status.Active = 0 + + if containerState == "running" { + job.Status.Active = 1 + } else { + if json.State.ExitCode == 0 { + job.Status.Succeeded = 1 + + completionTime, _ := time.Parse(time.RFC3339Nano, json.State.FinishedAt) + + metaCompletionTime := &metav1.Time{ + Time: completionTime, + } + + job.Status.CompletionTime = metaCompletionTime + } else { + job.Status.Failed = 1 + } + } +} diff --git a/internal/adapter/converter/pod.go b/internal/adapter/converter/pod.go index af6b2e1..f7977e2 100644 --- a/internal/adapter/converter/pod.go +++ b/internal/adapter/converter/pod.go @@ -110,6 +110,8 @@ func (converter *DockerAPIConverter) ConvertContainerToPod(container types.Conta }, } } else { + // TODO: handle exited containers, so Jobs can determine the status + // /containers//json ? This will allow getting Exit Code. pod.Status.Phase = core.PodUnknown // this is to mark the pod's condition as unknown diff --git a/internal/adapter/filters/filters.go b/internal/adapter/filters/filters.go index 12d66cc..2e07f68 100644 --- a/internal/adapter/filters/filters.go +++ b/internal/adapter/filters/filters.go @@ -26,6 +26,26 @@ func AllDeployments(namespace string) filters.Args { return filter } +// AllJobs creates a Docker filter argument for Kubernetes Jobs within a given namespace. +// The function filters Docker resources based on the Workload and Namespace labels, specifically for Jobs. +// +// Parameters: +// - namespace: The Kubernetes namespace to filter by. +// +// Returns: +// - filters.Args: A Docker filter object that can be used to filter Docker API calls based on the namespace and Workload type labels. +// +// Usage Example: +// +// filter := AllJobs("default") +// // Now 'filter' can be used in Docker API calls to filter Job resources in the 'default' Kubernetes namespace. +func AllJobs(namespace string) filters.Args { + filter := filters.NewArgs() + filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadTypeLabelKey, types.JobWorkloadType)) + filter.Add("label", fmt.Sprintf("%s=%s", types.NamespaceNameLabelKey, namespace)) + return filter +} + // AllNamespaces creates a Docker filter argument that targets resources labeled with a Kubernetes namespace. // This function uses the types.NamespaceLabelKey constant as the base label key to filter Docker resources. // @@ -81,6 +101,26 @@ func ByDeployment(namespace, deploymentName string) filters.Args { return filter } +// ByJob creates a Docker filter argument for a specific Kubernetes Job within a given namespace. +// The function builds upon the JobsFilter by further narrowing down the filter to match a specific Job name. +// +// Parameters: +// - namespace: The Kubernetes namespace to filter by. +// - jobName: The name of the specific Kubernetes Job to filter by. +// +// Returns: +// - filters.Args: A Docker filter object that can be used to filter Docker API calls based on the namespace and Job name labels. +// +// Usage Example: +// +// filter := ByJob("default", "my-job") +// // Now 'filter' can be used in Docker API calls to filter resources in the 'default' Kubernetes namespace that are part of 'my-job'. +func ByJob(namespace, jobName string) filters.Args { + filter := AllJobs(namespace) + filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadNameLabelKey, jobName)) + return filter +} + // ByNamespace creates a Docker filter argument to target all Docker resources within a specific Kubernetes namespace. // If an empty string is provided, it will return a filter that targets all namespaces. // diff --git a/internal/adapter/job.go b/internal/adapter/job.go new file mode 100644 index 0000000..5ad258d --- /dev/null +++ b/internal/adapter/job.go @@ -0,0 +1,186 @@ +package adapter + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/docker/docker/api/types" + adaptererr "github.com/portainer/k2d/internal/adapter/errors" + "github.com/portainer/k2d/internal/adapter/filters" + k2dtypes "github.com/portainer/k2d/internal/adapter/types" + "github.com/portainer/k2d/internal/k8s" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/batch" +) + +func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, job *batchv1.Job) error { + opts := ContainerCreationOptions{ + containerName: job.Name, + namespace: job.Namespace, + podSpec: job.Spec.Template.Spec, + labels: job.Spec.Template.Labels, + } + + if opts.labels == nil { + opts.labels = make(map[string]string) + } + + opts.labels[k2dtypes.WorkloadTypeLabelKey] = k2dtypes.JobWorkloadType + + if job.Labels["app.kubernetes.io/managed-by"] == "Helm" { + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("unable to marshal job: %w", err) + } + job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = string(jobData) + } + + // kubectl create job does not pass the last-applied-configuration annotation + // so we need to add it manually + if job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] == "" { + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("unable to marshal job: %w", err) + } + opts.labels[k2dtypes.LastAppliedConfigLabelKey] = string(jobData) + } + + opts.lastAppliedConfiguration = job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] + + return adapter.createContainerFromPodSpec(ctx, opts) +} + +func (adapter *KubeDockerAdapter) getContainerFromJobName(ctx context.Context, jobName, namespace string) (types.Container, error) { + filter := filters.ByJob(namespace, jobName) + containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) + if err != nil { + return types.Container{}, fmt.Errorf("unable to list containers: %w", err) + } + + if len(containers) == 0 { + return types.Container{}, adaptererr.ErrResourceNotFound + } + + if len(containers) > 1 { + return types.Container{}, fmt.Errorf("multiple containers were found with the associated job name %s", jobName) + } + + return containers[0], nil +} + +func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, namespace string) (*batchv1.Job, error) { + container, err := adapter.getContainerFromJobName(ctx, jobName, namespace) + if err != nil { + return nil, fmt.Errorf("unable to get container from job name: %w", err) + } + + job, err := adapter.buildJobFromContainer(ctx, container) + if err != nil { + return nil, fmt.Errorf("unable to get job: %w", err) + } + + versionedJob := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + } + + err = adapter.ConvertK8SResource(job, &versionedJob) + if err != nil { + return nil, fmt.Errorf("unable to convert internal object to versioned object: %w", err) + } + + return &versionedJob, nil +} + +func (adapter *KubeDockerAdapter) GetJobTable(ctx context.Context, namespace string) (*metav1.Table, error) { + jobList, err := adapter.listJobs(ctx, namespace) + if err != nil { + return &metav1.Table{}, fmt.Errorf("unable to list jobs: %w", err) + } + + return k8s.GenerateTable(&jobList) +} + +func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batchv1.JobList, error) { + jobList, err := adapter.listJobs(ctx, namespace) + if err != nil { + return batchv1.JobList{}, fmt.Errorf("unable to list jobs: %w", err) + } + + versionedJobList := batchv1.JobList{ + TypeMeta: metav1.TypeMeta{ + Kind: "JobList", + APIVersion: "batch/v1", + }, + } + + err = adapter.ConvertK8SResource(&jobList, &versionedJobList) + if err != nil { + return batchv1.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err) + } + + return versionedJobList, nil +} + +func (adapter *KubeDockerAdapter) buildJobFromContainer(ctx context.Context, container types.Container) (*batch.Job, error) { + if container.Labels[k2dtypes.LastAppliedConfigLabelKey] == "" { + return nil, fmt.Errorf("unable to build job, missing %s label on container %s", k2dtypes.LastAppliedConfigLabelKey, container.Names[0]) + } + + jobData := container.Labels[k2dtypes.LastAppliedConfigLabelKey] + + versionedJob := batchv1.Job{} + + err := json.Unmarshal([]byte(jobData), &versionedJob) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal versioned job: %w", err) + } + + job := batch.Job{} + err = adapter.ConvertK8SResource(&versionedJob, &job) + if err != nil { + return nil, fmt.Errorf("unable to convert versioned job spec to internal job spec: %w", err) + } + + containerInspect, err := adapter.cli.ContainerInspect(ctx, container.ID) + if err != nil { + return nil, fmt.Errorf("unable to inspect the container: %w", err) + } + + adapter.converter.UpdateJobFromContainerInfo(&job, container, containerInspect) + + return &job, nil +} + +func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batch.JobList, error) { + filter := filters.AllJobs(namespace) + containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) + if err != nil { + return batch.JobList{}, fmt.Errorf("unable to list containers: %w", err) + } + + jobs := []batch.Job{} + + for _, container := range containers { + job, err := adapter.buildJobFromContainer(ctx, container) + if err != nil { + return batch.JobList{}, fmt.Errorf("unable to get job: %w", err) + } + + if job != nil { + jobs = append(jobs, *job) + } + } + + return batch.JobList{ + TypeMeta: metav1.TypeMeta{ + Kind: "JobList", + APIVersion: "batch/v1", + }, + Items: jobs, + }, nil +} diff --git a/internal/adapter/types/labels.go b/internal/adapter/types/labels.go index 34932b4..b9531d6 100644 --- a/internal/adapter/types/labels.go +++ b/internal/adapter/types/labels.go @@ -35,6 +35,9 @@ const ( // StorageTypeLabelKey is the key used to store the storage type in the labels of a system configmap or a Docker volume // It is used to differentiate between persistent volumes and config maps when listing volumes StorageTypeLabelKey = "storage.k2d.io/type" + + // JobLastAppliedConfigLabelKey is the key used to store the job specific last applied configuration in the container labels + JobLastAppliedConfigLabelKey = "job.k2d.io/last-applied-configuration" ) const ( @@ -58,4 +61,8 @@ const ( // DeploymentWorkloadType is the label value used to identify a Deployment workload // It is stored on a container as a label and used to filter containers when listing deployments DeploymentWorkloadType = "deployment" + + // JobWorkloadType is the label value used to identify a Job workload + // It is stored on a container as a label and used to filter containers when listing deployments + JobWorkloadType = "job" ) diff --git a/internal/api/apis/apigroups_list.go b/internal/api/apis/apigroups_list.go index 13b7e4d..d196feb 100644 --- a/internal/api/apis/apigroups_list.go +++ b/internal/api/apis/apigroups_list.go @@ -21,6 +21,15 @@ func ListAPIGroups(r *restful.Request, w *restful.Response) { }, }, }, + { + Name: "batch", + Versions: []metav1.GroupVersionForDiscovery{ + { + GroupVersion: "batch/v1", + Version: "v1", + }, + }, + }, { Name: "events.k8s.io", Versions: []metav1.GroupVersionForDiscovery{ diff --git a/internal/api/apis/apis.go b/internal/api/apis/apis.go index 15a8ed4..51416af 100644 --- a/internal/api/apis/apis.go +++ b/internal/api/apis/apis.go @@ -5,6 +5,7 @@ import ( "github.com/portainer/k2d/internal/adapter" "github.com/portainer/k2d/internal/api/apis/apps" "github.com/portainer/k2d/internal/api/apis/authorization.k8s.io" + "github.com/portainer/k2d/internal/api/apis/batch" "github.com/portainer/k2d/internal/api/apis/events.k8s.io" "github.com/portainer/k2d/internal/api/apis/storage.k8s.io" "github.com/portainer/k2d/internal/controller" @@ -13,6 +14,7 @@ import ( type ( ApisAPI struct { apps apps.AppsService + batch batch.BatchService events events.EventsService authorization authorization.AuthorizationService storage storage.StorageService @@ -22,6 +24,7 @@ type ( func NewApisAPI(adapter *adapter.KubeDockerAdapter, operations chan controller.Operation) *ApisAPI { return &ApisAPI{ apps: apps.NewAppsService(operations, adapter), + batch: batch.NewBatchService(operations, adapter), events: events.NewEventsService(adapter), authorization: authorization.NewAuthorizationService(), storage: storage.NewStorageService(adapter), @@ -116,3 +119,22 @@ func (api ApisAPI) Apps() *restful.WebService { api.apps.RegisterAppsAPI(routes) return routes } + +// /apis/batch +func (api ApisAPI) Batch() *restful.WebService { + routes := new(restful.WebService). + Path("/apis/batch"). + Consumes(restful.MIME_JSON, "application/yml", "application/json-patch+json", "application/merge-patch+json", "application/strategic-merge-patch+json"). + Produces(restful.MIME_JSON) + + // which versions are served by this api + routes.Route(routes.GET(""). + To(api.batch.GetAPIVersions)) + + // which resources are available under /apis/batch/v1 + routes.Route(routes.GET("/v1"). + To(api.batch.ListAPIResources)) + + api.batch.RegisterBatchAPI(routes) + return routes +} diff --git a/internal/api/apis/batch/batch.go b/internal/api/apis/batch/batch.go new file mode 100644 index 0000000..df77131 --- /dev/null +++ b/internal/api/apis/batch/batch.go @@ -0,0 +1,56 @@ +package batch + +import ( + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/adapter" + "github.com/portainer/k2d/internal/api/apis/batch/jobs" + "github.com/portainer/k2d/internal/controller" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type BatchService struct { + jobs jobs.JobService +} + +func NewBatchService(operations chan controller.Operation, adapter *adapter.KubeDockerAdapter) BatchService { + return BatchService{ + jobs: jobs.NewJobService(adapter, operations), + } +} + +func (svc BatchService) GetAPIVersions(r *restful.Request, w *restful.Response) { + apiVersion := metav1.APIVersions{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIVersions", + }, + Versions: []string{"batch/v1"}, + } + + w.WriteAsJson(apiVersion) +} + +func (svc BatchService) ListAPIResources(r *restful.Request, w *restful.Response) { + resourceList := metav1.APIResourceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIResourceList", + APIVersion: "v1", + }, + GroupVersion: "batch/v1", + APIResources: []metav1.APIResource{ + { + Kind: "Job", + SingularName: "", + Name: "jobs", + Verbs: []string{"create", "list", "delete", "get", "patch"}, + Namespaced: true, + }, + }, + } + + w.WriteAsJson(resourceList) +} + +func (svc BatchService) RegisterBatchAPI(routes *restful.WebService) { + // jobs + svc.jobs.RegisterJobAPI(routes) +} diff --git a/internal/api/apis/batch/jobs/create.go b/internal/api/apis/batch/jobs/create.go new file mode 100644 index 0000000..9180944 --- /dev/null +++ b/internal/api/apis/batch/jobs/create.go @@ -0,0 +1,37 @@ +package jobs + +import ( + "fmt" + "net/http" + + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/api/utils" + "github.com/portainer/k2d/internal/controller" + "github.com/portainer/k2d/internal/types" + httputils "github.com/portainer/k2d/pkg/http" + batchv1 "k8s.io/api/batch/v1" +) + +func (svc JobService) CreateJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + + job := &batchv1.Job{} + + err := httputils.ParseJSONBody(r.Request, &job) + if err != nil { + utils.HttpError(r, w, http.StatusBadRequest, fmt.Errorf("unable to parse request body: %w", err)) + return + } + + job.Namespace = namespace + + dryRun := r.QueryParameter("dryRun") != "" + if dryRun { + w.WriteAsJson(job) + return + } + + svc.operations <- controller.NewOperation(job, controller.MediumPriorityOperation, r.HeaderParameter(types.RequestIDHeader)) + + w.WriteAsJson(job) +} diff --git a/internal/api/apis/batch/jobs/delete.go b/internal/api/apis/batch/jobs/delete.go new file mode 100644 index 0000000..5c87e12 --- /dev/null +++ b/internal/api/apis/batch/jobs/delete.go @@ -0,0 +1,24 @@ +package jobs + +import ( + "net/http" + + "github.com/emicklei/go-restful/v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (svc JobService) DeleteJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + jobName := r.PathParameter("name") + + svc.adapter.DeleteContainer(r.Request.Context(), jobName, namespace) + + w.WriteAsJson(metav1.Status{ + TypeMeta: metav1.TypeMeta{ + Kind: "Status", + APIVersion: "v1", + }, + Status: "Success", + Code: http.StatusOK, + }) +} diff --git a/internal/api/apis/batch/jobs/get.go b/internal/api/apis/batch/jobs/get.go new file mode 100644 index 0000000..f62bb55 --- /dev/null +++ b/internal/api/apis/batch/jobs/get.go @@ -0,0 +1,29 @@ +package jobs + +import ( + "errors" + "fmt" + "net/http" + + "github.com/emicklei/go-restful/v3" + adaptererr "github.com/portainer/k2d/internal/adapter/errors" + "github.com/portainer/k2d/internal/api/utils" +) + +func (svc JobService) GetJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + jobName := r.PathParameter("name") + + job, err := svc.adapter.GetJob(r.Request.Context(), jobName, namespace) + if err != nil { + if errors.Is(err, adaptererr.ErrResourceNotFound) { + w.WriteHeader(http.StatusNotFound) + return + } + + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to get job: %w", err)) + return + } + + w.WriteAsJson(job) +} diff --git a/internal/api/apis/batch/jobs/jobs.go b/internal/api/apis/batch/jobs/jobs.go new file mode 100644 index 0000000..e2103f5 --- /dev/null +++ b/internal/api/apis/batch/jobs/jobs.go @@ -0,0 +1,74 @@ +package jobs + +import ( + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/adapter" + "github.com/portainer/k2d/internal/controller" +) + +type JobService struct { + adapter *adapter.KubeDockerAdapter + operations chan controller.Operation +} + +func NewJobService(adapter *adapter.KubeDockerAdapter, operations chan controller.Operation) JobService { + return JobService{ + adapter: adapter, + operations: operations, + } +} + +func (svc JobService) RegisterJobAPI(ws *restful.WebService) { + jobGVKExtension := map[string]string{ + "group": "batch", + "kind": "Job", + "version": "v1", + } + + ws.Route(ws.POST("/v1/jobs"). + To(svc.CreateJob). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string"))) + + ws.Route(ws.POST("/v1/namespaces/{namespace}/jobs"). + To(svc.CreateJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string"))) + + ws.Route(ws.GET("/v1/jobs"). + To(svc.ListJobs)) + + ws.Route(ws.GET("/v1/namespaces/{namespace}/jobs"). + To(svc.ListJobs). + Param(ws.PathParameter("namespace", "namespace name").DataType("string"))) + + ws.Route(ws.DELETE("/v1/jobs/{name}"). + To(svc.DeleteJob). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.DELETE("/v1/namespaces/{namespace}/jobs/{name}"). + To(svc.DeleteJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.GET("/v1/jobs/{name}"). + To(svc.GetJob). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.GET("/v1/namespaces/{namespace}/jobs/{name}"). + To(svc.GetJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.PATCH("/v1/jobs/{name}"). + To(svc.PatchJob). + Param(ws.PathParameter("name", "name of the job").DataType("string")). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string")). + AddExtension("x-kubernetes-group-version-kind", jobGVKExtension)) + + ws.Route(ws.PATCH("/v1/namespaces/{namespace}/jobs/{name}"). + To(svc.PatchJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.PathParameter("name", "name of the job").DataType("string")). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string")). + AddExtension("x-kubernetes-group-version-kind", jobGVKExtension)) +} diff --git a/internal/api/apis/batch/jobs/list.go b/internal/api/apis/batch/jobs/list.go new file mode 100644 index 0000000..178d777 --- /dev/null +++ b/internal/api/apis/batch/jobs/list.go @@ -0,0 +1,24 @@ +package jobs + +import ( + "context" + + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/api/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (svc JobService) ListJobs(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + + utils.ListResources( + r, + w, + func(ctx context.Context) (interface{}, error) { + return svc.adapter.ListJobs(ctx, namespace) + }, + func(ctx context.Context) (*metav1.Table, error) { + return svc.adapter.GetJobTable(ctx, namespace) + }, + ) +} diff --git a/internal/api/apis/batch/jobs/patch.go b/internal/api/apis/batch/jobs/patch.go new file mode 100644 index 0000000..2f5e5ae --- /dev/null +++ b/internal/api/apis/batch/jobs/patch.go @@ -0,0 +1,67 @@ +package jobs + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/api/utils" + "github.com/portainer/k2d/internal/controller" + "github.com/portainer/k2d/internal/types" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/util/strategicpatch" +) + +func (svc JobService) PatchJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + jobName := r.PathParameter("name") + + patch, err := io.ReadAll(r.Request.Body) + if err != nil { + utils.HttpError(r, w, http.StatusBadRequest, fmt.Errorf("unable to parse request body: %w", err)) + return + } + + job, err := svc.adapter.GetJob(r.Request.Context(), jobName, namespace) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to get job: %w", err)) + return + } + + if job == nil { + w.WriteHeader(http.StatusNotFound) + return + } + + data, err := json.Marshal(job) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to marshal job: %w", err)) + return + } + + mergedData, err := strategicpatch.StrategicMergePatch(data, patch, batchv1.Job{}) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to apply patch: %w", err)) + return + } + + updatedJob := &batchv1.Job{} + + err = json.Unmarshal(mergedData, updatedJob) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to unmarshal job: %w", err)) + return + } + + dryRun := r.QueryParameter("dryRun") != "" + if dryRun { + w.WriteAsJson(updatedJob) + return + } + + svc.operations <- controller.NewOperation(updatedJob, controller.MediumPriorityOperation, r.HeaderParameter(types.RequestIDHeader)) + + w.WriteAsJson(updatedJob) +} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 2cde77a..12a6ad9 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" ) @@ -184,6 +185,13 @@ func (controller *OperationController) processOperation(op Operation) { "request_id", op.RequestID, ) } + case *batchv1.Job: + err := controller.createJob(op) + if err != nil { + controller.logger.Errorw("unable to create job", + "error", err, + ) + } case *corev1.ConfigMap: err := controller.createConfigMap(op) if err != nil { @@ -227,6 +235,11 @@ func (controller *OperationController) createDeployment(op Operation) error { return controller.adapter.CreateContainerFromDeployment(context.TODO(), deployment) } +func (controller *OperationController) createJob(op Operation) error { + job := op.Operation.(*batchv1.Job) + return controller.adapter.CreateContainerFromJob(context.TODO(), job) +} + func (controller *OperationController) createService(op Operation) error { service := op.Operation.(*corev1.Service) return controller.adapter.CreateContainerFromService(context.TODO(), service)