Skip to content

Commit

Permalink
added backoff-retry-algorithm_for_clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavan Kongara committed Jun 20, 2023
1 parent 00397cf commit c81065a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
25 changes: 20 additions & 5 deletions client/python/armada_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.permissions import Permissions
from armada_client.typings import JobState
import tenacity


class ArmadaClient:
Expand All @@ -38,6 +39,7 @@ def __init__(self, channel):
self.event_stub = event_pb2_grpc.EventStub(channel)
self.usage_stub = usage_pb2_grpc.UsageStub(channel)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_job_events_stream(
self,
queue: str,
Expand Down Expand Up @@ -86,21 +88,24 @@ def unmarshal_event_response(event: event_pb2.EventStreamMessage) -> Event:
"""

return Event(event)


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def submit_health(self) -> health_pb2.HealthCheckResponse:
"""
Health check for Submit Service.
:return: A HealthCheckResponse object.
"""
return self.submit_stub.Health(request=empty_pb2.Empty())


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def event_health(self) -> health_pb2.HealthCheckResponse:
"""
Health check for Event Service.
:return: A HealthCheckResponse object.
"""
return self.event_stub.Health(request=empty_pb2.Empty())


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def submit_jobs(
self, queue: str, job_set_id: str, job_request_items
) -> submit_pb2.JobSubmitResponse:
Expand All @@ -119,7 +124,8 @@ def submit_jobs(
)
response = self.submit_stub.SubmitJobs(request)
return response


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def cancel_jobs(
self,
queue: Optional[str] = None,
Expand Down Expand Up @@ -152,7 +158,8 @@ def cancel_jobs(

response = self.submit_stub.CancelJobs(request)
return response


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def cancel_jobset(
self,
queue: str,
Expand Down Expand Up @@ -181,6 +188,7 @@ def cancel_jobset(
response = self.submit_stub.CancelJobSet(request)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def reprioritize_jobs(
self,
new_priority: float,
Expand Down Expand Up @@ -222,6 +230,7 @@ def reprioritize_jobs(
response = self.submit_stub.ReprioritizeJobs(request)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
"""
Uses the CreateQueue RPC to create a queue.
Expand All @@ -232,6 +241,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
response = self.submit_stub.CreateQueue(queue)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
"""
Uses the UpdateQueue RPC to update a queue.
Expand All @@ -242,6 +252,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
response = self.submit_stub.UpdateQueue(queue)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def create_queues(
self, queues: List[submit_pb2.Queue]
) -> submit_pb2.BatchQueueCreateResponse:
Expand All @@ -255,6 +266,7 @@ def create_queues(
response = self.submit_stub.CreateQueues(queue_list)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def update_queues(
self, queues: List[submit_pb2.Queue]
) -> submit_pb2.BatchQueueUpdateResponse:
Expand All @@ -268,6 +280,7 @@ def update_queues(
response = self.submit_stub.UpdateQueues(queue_list)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def delete_queue(self, name: str) -> None:
"""Delete an empty queue by name.
Expand All @@ -279,6 +292,7 @@ def delete_queue(self, name: str) -> None:
request = submit_pb2.QueueDeleteRequest(name=name)
self.submit_stub.DeleteQueue(request)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_queue(self, name: str) -> submit_pb2.Queue:
"""Get the queue by name.
Expand All @@ -291,6 +305,7 @@ def get_queue(self, name: str) -> submit_pb2.Queue:
response = self.submit_stub.GetQueue(request)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_queue_info(self, name: str) -> submit_pb2.QueueInfo:
"""Get the queue info by name.
Expand Down
6 changes: 5 additions & 1 deletion third_party/airflow/armada/operators/jobservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from google.protobuf import empty_pb2

import tenacity

class JobServiceClient:
"""
Expand All @@ -18,6 +19,7 @@ class JobServiceClient:
def __init__(self, channel):
self.job_stub = jobservice_pb2_grpc.JobServiceStub(channel)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_job_status(
self, queue: str, job_set_id: str, job_id: str
) -> jobservice_pb2.JobServiceResponse:
Expand All @@ -30,11 +32,13 @@ def get_job_status(
:param job_id: The id of the job
:return: A Job Service Request (State, Error)
"""
print("Hello world")
job_service_request = jobservice_pb2.JobServiceRequest(
queue=queue, job_set_id=job_set_id, job_id=job_id
)
return self.job_stub.GetJobStatus(job_service_request)


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def health(self) -> jobservice_pb2.HealthCheckResponse:
"""Health Check for GRPC Request"""
return self.job_stub.Health(request=empty_pb2.Empty())

0 comments on commit c81065a

Please sign in to comment.