From 412cecb1ef9649ee73ee50bd5b3f67cc0860f101 Mon Sep 17 00:00:00 2001 From: Gordon Watts Date: Sat, 2 Apr 2022 03:35:31 +0200 Subject: [PATCH] Obey all vs avail get modes --- src/servicex_did_finder_lib/communication.py | 12 +++-- .../test_communication.py | 46 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/servicex_did_finder_lib/communication.py b/src/servicex_did_finder_lib/communication.py index 8903b57..e140673 100644 --- a/src/servicex_did_finder_lib/communication.py +++ b/src/servicex_did_finder_lib/communication.py @@ -63,10 +63,14 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st hold_till_end = did_info.file_count != -1 acc = _accumulator(servicex, summary, hold_till_end) - async for file_info in user_callback(did_info.did, info): - acc.add(file_info) + try: + async for file_info in user_callback(did_info.did, info): + acc.add(file_info) - acc.send_on(did_info.file_count) + acc.send_on(did_info.file_count) + except Exception: + if did_info.get_mode == 'all': + raise # Simple error checking and reporting if summary.file_count == 0: @@ -84,7 +88,7 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st } ) - servicex.post_status_update(f'Completed load of file in {elapsed_time} seconds') + servicex.post_status_update(f'Completed load of files in {elapsed_time} seconds') def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body, diff --git a/tests/servicex_did_finder_lib/test_communication.py b/tests/servicex_did_finder_lib/test_communication.py index c8ef41e..8260ec2 100644 --- a/tests/servicex_did_finder_lib/test_communication.py +++ b/tests/servicex_did_finder_lib/test_communication.py @@ -175,6 +175,52 @@ async def my_callback(did_name: str, info: Dict[str, Any]) \ SXAdaptor.post_status_update.assert_any_call(ANY, severity='fatal') +def test_failed_file_after_good(rabbitmq, SXAdaptor): + 'Test a callback that fails before any files are sent' + + async def my_callback(did_name: str, info: Dict[str, Any]) \ + -> AsyncGenerator[Dict[str, Any], None]: + yield { + 'paths': ["fork/it/over"], + 'adler32': 'no clue', + 'file_size': 22323, + 'file_events': 0, + } + raise Exception('that did not work') + + init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12, + retry_interval=10) + rabbitmq.send_did_request('hi-there') + + # Make sure the file was sent along, along with the completion + SXAdaptor.put_file_add.assert_called_once() + SXAdaptor.put_fileset_complete.assert_not_called() + SXAdaptor.post_status_update.assert_any_call(ANY, severity='fatal') + + +def test_failed_file_after_good_with_avail(rabbitmq, SXAdaptor): + 'Test a callback that fails before any files are sent' + + async def my_callback(did_name: str, info: Dict[str, Any]) \ + -> AsyncGenerator[Dict[str, Any], None]: + yield { + 'paths': ["fork/it/over"], + 'adler32': 'no clue', + 'file_size': 22323, + 'file_events': 0, + } + raise Exception('that did not work') + + init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12, + retry_interval=10) + rabbitmq.send_did_request('hi-there?get=available') + + # Make sure the file was sent along, along with the completion + SXAdaptor.put_file_add.assert_called_once() + SXAdaptor.put_fileset_complete.assert_called_once() + SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds") + + def test_no_files_returned(rabbitmq, SXAdaptor): 'Test a callback that fails before any files are sent'