Skip to content

Commit

Permalink
Obey all vs avail get modes
Browse files Browse the repository at this point in the history
  • Loading branch information
gordonwatts committed Apr 2, 2022
1 parent 8447a55 commit 412cecb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/servicex_did_finder_lib/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
hold_till_end = did_info.file_count != -1
acc = _accumulator(servicex, summary, hold_till_end)

async for file_info in user_callback(did_info.did, info):
acc.add(file_info)
try:
async for file_info in user_callback(did_info.did, info):
acc.add(file_info)

acc.send_on(did_info.file_count)
acc.send_on(did_info.file_count)
except Exception:
if did_info.get_mode == 'all':
raise

# Simple error checking and reporting
if summary.file_count == 0:
Expand All @@ -84,7 +88,7 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
}
)

servicex.post_status_update(f'Completed load of file in {elapsed_time} seconds')
servicex.post_status_update(f'Completed load of files in {elapsed_time} seconds')


def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body,
Expand Down
46 changes: 46 additions & 0 deletions tests/servicex_did_finder_lib/test_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,52 @@ async def my_callback(did_name: str, info: Dict[str, Any]) \
SXAdaptor.post_status_update.assert_any_call(ANY, severity='fatal')


def test_failed_file_after_good(rabbitmq, SXAdaptor):
'Test a callback that fails before any files are sent'

async def my_callback(did_name: str, info: Dict[str, Any]) \
-> AsyncGenerator[Dict[str, Any], None]:
yield {
'paths': ["fork/it/over"],
'adler32': 'no clue',
'file_size': 22323,
'file_events': 0,
}
raise Exception('that did not work')

init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
retry_interval=10)
rabbitmq.send_did_request('hi-there')

# Make sure the file was sent along, along with the completion
SXAdaptor.put_file_add.assert_called_once()
SXAdaptor.put_fileset_complete.assert_not_called()
SXAdaptor.post_status_update.assert_any_call(ANY, severity='fatal')


def test_failed_file_after_good_with_avail(rabbitmq, SXAdaptor):
'Test a callback that fails before any files are sent'

async def my_callback(did_name: str, info: Dict[str, Any]) \
-> AsyncGenerator[Dict[str, Any], None]:
yield {
'paths': ["fork/it/over"],
'adler32': 'no clue',
'file_size': 22323,
'file_events': 0,
}
raise Exception('that did not work')

init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
retry_interval=10)
rabbitmq.send_did_request('hi-there?get=available')

# Make sure the file was sent along, along with the completion
SXAdaptor.put_file_add.assert_called_once()
SXAdaptor.put_fileset_complete.assert_called_once()
SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds")


def test_no_files_returned(rabbitmq, SXAdaptor):
'Test a callback that fails before any files are sent'

Expand Down

0 comments on commit 412cecb

Please sign in to comment.