From 3e38cd28452335cae1c290a7ae64b231de83f3ca Mon Sep 17 00:00:00 2001 From: paer Date: Mon, 21 Aug 2023 11:21:20 +0800 Subject: [PATCH 1/4] job msg mismatch error handle(pb3) Signed-off-by: paer --- fed/grpc/fed.proto | 3 +- fed/grpc/pb3/fed_pb2.py | 161 ++++++++++++++---- fed/grpc/pb3/fed_pb2_grpc.py | 14 -- fed/proxy/grpc/grpc_proxy.py | 20 ++- tests/multi-jobs/test_ignore_other_job_msg.py | 5 +- tests/test_transport_proxy.py | 2 +- 6 files changed, 155 insertions(+), 50 deletions(-) diff --git a/fed/grpc/fed.proto b/fed/grpc/fed.proto index d0e2ee6..418cc0f 100644 --- a/fed/grpc/fed.proto +++ b/fed/grpc/fed.proto @@ -14,5 +14,6 @@ message SendDataRequest { }; message SendDataResponse { - string result = 1; + int32 code = 1; + string result = 2; }; diff --git a/fed/grpc/pb3/fed_pb2.py b/fed/grpc/pb3/fed_pb2.py index 8f7d992..490205c 100644 --- a/fed/grpc/pb3/fed_pb2.py +++ b/fed/grpc/pb3/fed_pb2.py @@ -1,23 +1,8 @@ -# Copyright 2023 The RayFed Team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: fed.proto """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database @@ -28,12 +13,113 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\tfed.proto\"e\n\x0fSendDataRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x17\n\x0fupstream_seq_id\x18\x02 \x01(\t\x12\x19\n\x11\x64ownstream_seq_id\x18\x03 \x01(\t\x12\x10\n\x08job_name\x18\x04 \x01(\t\"\"\n\x10SendDataResponse\x12\x0e\n\x06result\x18\x01 \x01(\t2@\n\x0bGrpcService\x12\x31\n\x08SendData\x12\x10.SendDataRequest\x1a\x11.SendDataResponse\"\x00\x42\x03\x80\x01\x01\x62\x06proto3') +DESCRIPTOR = _descriptor.FileDescriptor( + name='fed.proto', + package='', + syntax='proto3', + serialized_options=b'\200\001\001', + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\tfed.proto\"e\n\x0fSendDataRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x17\n\x0fupstream_seq_id\x18\x02 \x01(\t\x12\x19\n\x11\x64ownstream_seq_id\x18\x03 \x01(\t\x12\x10\n\x08job_name\x18\x04 \x01(\t\"0\n\x10SendDataResponse\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x0e\n\x06result\x18\x02 \x01(\t2@\n\x0bGrpcService\x12\x31\n\x08SendData\x12\x10.SendDataRequest\x1a\x11.SendDataResponse\"\x00\x42\x03\x80\x01\x01\x62\x06proto3' +) + + + + +_SENDDATAREQUEST = _descriptor.Descriptor( + name='SendDataRequest', + full_name='SendDataRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='data', full_name='SendDataRequest.data', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='upstream_seq_id', full_name='SendDataRequest.upstream_seq_id', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='downstream_seq_id', full_name='SendDataRequest.downstream_seq_id', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='job_name', full_name='SendDataRequest.job_name', index=3, + number=4, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=13, + serialized_end=114, +) +_SENDDATARESPONSE = _descriptor.Descriptor( + name='SendDataResponse', + full_name='SendDataResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='code', full_name='SendDataResponse.code', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='result', full_name='SendDataResponse.result', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=116, + serialized_end=164, +) + +DESCRIPTOR.message_types_by_name['SendDataRequest'] = _SENDDATAREQUEST +DESCRIPTOR.message_types_by_name['SendDataResponse'] = _SENDDATARESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) -_SENDDATAREQUEST = DESCRIPTOR.message_types_by_name['SendDataRequest'] -_SENDDATARESPONSE = DESCRIPTOR.message_types_by_name['SendDataResponse'] SendDataRequest = _reflection.GeneratedProtocolMessageType('SendDataRequest', (_message.Message,), { 'DESCRIPTOR' : _SENDDATAREQUEST, '__module__' : 'fed_pb2' @@ -48,15 +134,32 @@ }) _sym_db.RegisterMessage(SendDataResponse) -_GRPCSERVICE = DESCRIPTOR.services_by_name['GrpcService'] -if _descriptor._USE_C_DESCRIPTORS == False: - - DESCRIPTOR._options = None - DESCRIPTOR._serialized_options = b'\200\001\001' - _SENDDATAREQUEST._serialized_start=13 - _SENDDATAREQUEST._serialized_end=114 - _SENDDATARESPONSE._serialized_start=116 - _SENDDATARESPONSE._serialized_end=150 - _GRPCSERVICE._serialized_start=152 - _GRPCSERVICE._serialized_end=216 + +DESCRIPTOR._options = None + +_GRPCSERVICE = _descriptor.ServiceDescriptor( + name='GrpcService', + full_name='GrpcService', + file=DESCRIPTOR, + index=0, + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_start=166, + serialized_end=230, + methods=[ + _descriptor.MethodDescriptor( + name='SendData', + full_name='GrpcService.SendData', + index=0, + containing_service=None, + input_type=_SENDDATAREQUEST, + output_type=_SENDDATARESPONSE, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), +]) +_sym_db.RegisterServiceDescriptor(_GRPCSERVICE) + +DESCRIPTOR.services_by_name['GrpcService'] = _GRPCSERVICE + # @@protoc_insertion_point(module_scope) diff --git a/fed/grpc/pb3/fed_pb2_grpc.py b/fed/grpc/pb3/fed_pb2_grpc.py index 55c674d..830c8b8 100644 --- a/fed/grpc/pb3/fed_pb2_grpc.py +++ b/fed/grpc/pb3/fed_pb2_grpc.py @@ -1,17 +1,3 @@ -# Copyright 2023 The RayFed Team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc diff --git a/fed/proxy/grpc/grpc_proxy.py b/fed/proxy/grpc/grpc_proxy.py index 743fe29..f1dfc71 100644 --- a/fed/proxy/grpc/grpc_proxy.py +++ b/fed/proxy/grpc/grpc_proxy.py @@ -136,7 +136,8 @@ async def send( timeout=timeout, metadata=grpc_metadata, ) - return response + self.handle_response_error(response) + return response.result def get_grpc_config_by_party(self, dest_party): """Overide global config by party specific config @@ -167,6 +168,17 @@ async def get_proxy_config(self, dest_party=None): proxy_config.update({'grpc_options': grpc_options}) return proxy_config + def handle_response_error(self, response): + if response.code == 200: + return + + if 400 <= response.code < 500: + # Request error should also be identified as a sending failure, + # though the request was physically sent. + logger.warning(f"Request was successfully sent but got error response, " + f"code: {response.code}, message: {response.result}.") + raise RuntimeError(response.result) + async def send_data_grpc( data, @@ -192,9 +204,10 @@ async def send_data_grpc( ) logger.debug( f'Received data response from seq_id {downstream_seq_id}, ' + f'code: {response.code}, ' f'result: {response.result}.' ) - return response.result + return response class GrpcReceiverProxy(ReceiverProxy): @@ -287,6 +300,7 @@ async def SendData(self, request, context): f"The reason may be that the ReceiverProxy is listening " f"on the same address with that job.") return fed_pb2.SendDataResponse( + code=417, result=f"JobName mis-match, expected {self._job_name}, got {job_name}.") upstream_seq_id = request.upstream_seq_id downstream_seq_id = request.downstream_seq_id @@ -309,7 +323,7 @@ async def SendData(self, request, context): event = get_from_two_dim_dict(self._events, upstream_seq_id, downstream_seq_id) event.set() logger.debug(f"Event set for {upstream_seq_id}") - return fed_pb2.SendDataResponse(result="OK") + return fed_pb2.SendDataResponse(code=200, result="OK") async def _run_grpc_server( diff --git a/tests/multi-jobs/test_ignore_other_job_msg.py b/tests/multi-jobs/test_ignore_other_job_msg.py index cbb45ac..c81689e 100644 --- a/tests/multi-jobs/test_ignore_other_job_msg.py +++ b/tests/multi-jobs/test_ignore_other_job_msg.py @@ -52,9 +52,10 @@ async def send( timeout=timeout, metadata=grpc_metadata, ) - assert "JobName mis-match" in response + assert response.code is 417 + assert "JobName mis-match" in response.result # So that process can exit - raise RuntimeError() + raise RuntimeError(response.result) @fed.remote diff --git a/tests/test_transport_proxy.py b/tests/test_transport_proxy.py index 368223a..2bca022 100644 --- a/tests/test_transport_proxy.py +++ b/tests/test_transport_proxy.py @@ -114,7 +114,7 @@ async def SendData(self, request, context): assert v == metadata[k] event = asyncio.Event() event.set() - return fed_pb2.SendDataResponse(result="OK") + return fed_pb2.SendDataResponse(code=200,result="OK") async def _test_run_grpc_server( From b48c9be31c75bfccc38f44d469d4a7c972691454 Mon Sep 17 00:00:00 2001 From: paer Date: Mon, 21 Aug 2023 11:49:34 +0800 Subject: [PATCH 2/4] add license Signed-off-by: paer --- fed/grpc/pb3/fed_pb2.py | 14 ++++++++++++++ fed/grpc/pb3/fed_pb2_grpc.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/fed/grpc/pb3/fed_pb2.py b/fed/grpc/pb3/fed_pb2.py index 490205c..5db0463 100644 --- a/fed/grpc/pb3/fed_pb2.py +++ b/fed/grpc/pb3/fed_pb2.py @@ -1,3 +1,17 @@ +# Copyright 2023 The RayFed Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: fed.proto diff --git a/fed/grpc/pb3/fed_pb2_grpc.py b/fed/grpc/pb3/fed_pb2_grpc.py index 830c8b8..55c674d 100644 --- a/fed/grpc/pb3/fed_pb2_grpc.py +++ b/fed/grpc/pb3/fed_pb2_grpc.py @@ -1,3 +1,17 @@ +# Copyright 2023 The RayFed Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc From 4b53aac01261e20eaf7d2a43638539a7f828d746 Mon Sep 17 00:00:00 2001 From: NKcqx <892670992@qq.com> Date: Mon, 21 Aug 2023 11:53:00 +0800 Subject: [PATCH 3/4] pb4 response error code Signed-off-by: NKcqx <892670992@qq.com> --- fed/grpc/pb4/fed_pb2.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fed/grpc/pb4/fed_pb2.py b/fed/grpc/pb4/fed_pb2.py index bd69d26..329cc07 100644 --- a/fed/grpc/pb4/fed_pb2.py +++ b/fed/grpc/pb4/fed_pb2.py @@ -27,7 +27,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\tfed.proto\"e\n\x0fSendDataRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x17\n\x0fupstream_seq_id\x18\x02 \x01(\t\x12\x19\n\x11\x64ownstream_seq_id\x18\x03 \x01(\t\x12\x10\n\x08job_name\x18\x04 \x01(\t\"\"\n\x10SendDataResponse\x12\x0e\n\x06result\x18\x01 \x01(\t2@\n\x0bGrpcService\x12\x31\n\x08SendData\x12\x10.SendDataRequest\x1a\x11.SendDataResponse\"\x00\x42\x03\x80\x01\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\tfed.proto\"e\n\x0fSendDataRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x17\n\x0fupstream_seq_id\x18\x02 \x01(\t\x12\x19\n\x11\x64ownstream_seq_id\x18\x03 \x01(\t\x12\x10\n\x08job_name\x18\x04 \x01(\t\"0\n\x10SendDataResponse\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x0e\n\x06result\x18\x02 \x01(\t2@\n\x0bGrpcService\x12\x31\n\x08SendData\x12\x10.SendDataRequest\x1a\x11.SendDataResponse\"\x00\x42\x03\x80\x01\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -39,7 +39,7 @@ _globals['_SENDDATAREQUEST']._serialized_start=13 _globals['_SENDDATAREQUEST']._serialized_end=114 _globals['_SENDDATARESPONSE']._serialized_start=116 - _globals['_SENDDATARESPONSE']._serialized_end=150 - _globals['_GRPCSERVICE']._serialized_start=152 - _globals['_GRPCSERVICE']._serialized_end=216 + _globals['_SENDDATARESPONSE']._serialized_end=164 + _globals['_GRPCSERVICE']._serialized_start=166 + _globals['_GRPCSERVICE']._serialized_end=230 # @@protoc_insertion_point(module_scope) From 72010cb409e418e8312171f769e0d8c0b29b5ad0 Mon Sep 17 00:00:00 2001 From: paer Date: Mon, 21 Aug 2023 14:56:28 +0800 Subject: [PATCH 4/4] lint codes Signed-off-by: paer --- tests/multi-jobs/test_ignore_other_job_msg.py | 2 +- tests/test_transport_proxy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/multi-jobs/test_ignore_other_job_msg.py b/tests/multi-jobs/test_ignore_other_job_msg.py index c81689e..a75d8f7 100644 --- a/tests/multi-jobs/test_ignore_other_job_msg.py +++ b/tests/multi-jobs/test_ignore_other_job_msg.py @@ -52,7 +52,7 @@ async def send( timeout=timeout, metadata=grpc_metadata, ) - assert response.code is 417 + assert response.code == 417 assert "JobName mis-match" in response.result # So that process can exit raise RuntimeError(response.result) diff --git a/tests/test_transport_proxy.py b/tests/test_transport_proxy.py index 2bca022..9ebb787 100644 --- a/tests/test_transport_proxy.py +++ b/tests/test_transport_proxy.py @@ -114,7 +114,7 @@ async def SendData(self, request, context): assert v == metadata[k] event = asyncio.Event() event.set() - return fed_pb2.SendDataResponse(code=200,result="OK") + return fed_pb2.SendDataResponse(code=200, result="OK") async def _test_run_grpc_server(