diff --git a/client/python/armada_client/client.py b/client/python/armada_client/client.py index c9d1e3db703..440ebebd4cb 100644 --- a/client/python/armada_client/client.py +++ b/client/python/armada_client/client.py @@ -21,6 +21,7 @@ from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 from armada_client.permissions import Permissions from armada_client.typings import JobState +import tenacity class ArmadaClient: @@ -38,6 +39,7 @@ def __init__(self, channel): self.event_stub = event_pb2_grpc.EventStub(channel) self.usage_stub = usage_pb2_grpc.UsageStub(channel) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_job_events_stream( self, queue: str, @@ -86,21 +88,24 @@ def unmarshal_event_response(event: event_pb2.EventStreamMessage) -> Event: """ return Event(event) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def submit_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Submit Service. :return: A HealthCheckResponse object. """ return self.submit_stub.Health(request=empty_pb2.Empty()) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def event_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Event Service. :return: A HealthCheckResponse object. """ return self.event_stub.Health(request=empty_pb2.Empty()) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def submit_jobs( self, queue: str, job_set_id: str, job_request_items ) -> submit_pb2.JobSubmitResponse: @@ -119,7 +124,8 @@ def submit_jobs( ) response = self.submit_stub.SubmitJobs(request) return response - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def cancel_jobs( self, queue: Optional[str] = None, @@ -152,7 +158,8 @@ def cancel_jobs( response = self.submit_stub.CancelJobs(request) return response - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def cancel_jobset( self, queue: str, @@ -181,6 +188,7 @@ def cancel_jobset( response = self.submit_stub.CancelJobSet(request) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def reprioritize_jobs( self, new_priority: float, @@ -222,6 +230,7 @@ def reprioritize_jobs( response = self.submit_stub.ReprioritizeJobs(request) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the CreateQueue RPC to create a queue. @@ -232,6 +241,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.CreateQueue(queue) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the UpdateQueue RPC to update a queue. @@ -242,6 +252,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.UpdateQueue(queue) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def create_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueCreateResponse: @@ -255,6 +266,7 @@ def create_queues( response = self.submit_stub.CreateQueues(queue_list) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def update_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueUpdateResponse: @@ -268,6 +280,7 @@ def update_queues( response = self.submit_stub.UpdateQueues(queue_list) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def delete_queue(self, name: str) -> None: """Delete an empty queue by name. @@ -279,6 +292,7 @@ def delete_queue(self, name: str) -> None: request = submit_pb2.QueueDeleteRequest(name=name) self.submit_stub.DeleteQueue(request) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_queue(self, name: str) -> submit_pb2.Queue: """Get the queue by name. @@ -291,6 +305,7 @@ def get_queue(self, name: str) -> submit_pb2.Queue: response = self.submit_stub.GetQueue(request) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_queue_info(self, name: str) -> submit_pb2.QueueInfo: """Get the queue info by name. diff --git a/third_party/airflow/armada/operators/jobservice.py b/third_party/airflow/armada/operators/jobservice.py index 2d5b4248e42..266c6ed1506 100644 --- a/third_party/airflow/armada/operators/jobservice.py +++ b/third_party/airflow/armada/operators/jobservice.py @@ -2,6 +2,7 @@ from google.protobuf import empty_pb2 +import tenacity class JobServiceClient: """ @@ -18,6 +19,7 @@ class JobServiceClient: def __init__(self, channel): self.job_stub = jobservice_pb2_grpc.JobServiceStub(channel) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_job_status( self, queue: str, job_set_id: str, job_id: str ) -> jobservice_pb2.JobServiceResponse: @@ -30,11 +32,13 @@ def get_job_status( :param job_id: The id of the job :return: A Job Service Request (State, Error) """ + print("Hello world") job_service_request = jobservice_pb2.JobServiceRequest( queue=queue, job_set_id=job_set_id, job_id=job_id ) return self.job_stub.GetJobStatus(job_service_request) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def health(self) -> jobservice_pb2.HealthCheckResponse: """Health Check for GRPC Request""" return self.job_stub.Health(request=empty_pb2.Empty())