-
Hi, I am trying to build a web application that interacts with Kubernetes to launch a series of data-science jobs and monitor these jobs. I think the code below will work but wanted your input on if I am handling the error statuses properly? using k8s;
using k8s.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Test
{
internal class TestK8sJobWatcher
{
private static async Task Main(string[] args)
{
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile();
IKubernetes client = new Kubernetes(config);
Console.WriteLine("Starting Request!");
var jobId = "t" + Guid.NewGuid();
// create the job
var jobObject = new V1Job()
{
ApiVersion = "batch/v1",
Kind = V1Job.KubeKind,
Metadata = new V1ObjectMeta() { Name = jobId },
Spec = new V1JobSpec()
{
TtlSecondsAfterFinished = 240,
Template = new V1PodTemplateSpec()
{
Spec = new V1PodSpec()
{
Containers = new List<V1Container>()
{
new V1Container()
{
Image = "container:1.0.0",
Name = "some-name",
ImagePullPolicy = "Never" ,
Command = new List<string>() { "/bin/bash", "-c", "./long-running-command.sh" },
},
},
RestartPolicy = "Never",
},
},
BackoffLimit = 0
},
};
var job = await client.BatchV1.CreateNamespacedJobAsync(jobObject, "default");
var podSelector = $"job-name={jobId},controller-uid={job.Labels()["controller-uid"]}";
// Wait for the job to be active
var jobs = client.BatchV1.ListNamespacedJobWithHttpMessagesAsync("default", labelSelector: podSelector, watch: true, timeoutSeconds: 300);
await foreach (var (type, item) in jobs.WatchAsync<V1Job, V1JobList>())
{
if (item.Status.Active > 0)
{
Console.WriteLine("Job is active");
break;
}
if (item.Status.Conditions?.Any(c => c.Type == "Failed" && c.Status == "True") == true)
{
throw new Exception("The Job failed");
}
}
V1Pod jobPod = null;
var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", labelSelector: podSelector, watch: true, timeoutSeconds: 300);
// wait for container to be running
await foreach (var (type, item) in podlistResp.WatchAsync<V1Pod, V1PodList>())
{
if(item.Status.ContainerStatuses.Any(c=>c.State.Running != null))
{
jobPod = item;
Console.WriteLine("The container is running");
break;
}
if(item.Status.ContainerStatuses.Any(c=>c.State.Terminated != null))
{
throw new Exception("The container is terminated");
}
}
var response = await client.CoreV1.ReadNamespacedPodLogWithHttpMessagesAsync(jobPod.Metadata.Name, "default", follow: true);
var stream = response.Body;
// Any way to steam these differently ?
stream.CopyTo(Console.OpenStandardOutput());
var jobCompletionWatcher = client.BatchV1.ListNamespacedJobWithHttpMessagesAsync("default", labelSelector: $"controller-uid={job.Labels()["controller-uid"]}", watch: true, timeoutSeconds: 300);
await foreach (var (type, item) in jobCompletionWatcher.WatchAsync<V1Job, V1JobList>())
{
if(item.Status.Conditions?.Any(c=>c.Type == "Complete" && c.Status == "True") == true)
{
Console.WriteLine("THE JOB Succeeded");
break;
}
if (item.Status.Conditions?.Any(c => c.Type == "Failed" && c.Status == "True") == true)
{
throw new Exception("The Job failed");
}
}
}
}
}
|
Beta Was this translation helpful? Give feedback.
Answered by
tg123
Dec 1, 2022
Replies: 1 comment 2 replies
-
any issue? you do not have to watch, seems |
Beta Was this translation helpful? Give feedback.
2 replies
Answer selected by
jptissot
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
any issue?
looks good to me after going through roughly
you do not have to watch, seems
for(;;) get
is easier in your case