From 7abbcfbdb36cbbf78908f8ab251a4e5db774789f Mon Sep 17 00:00:00 2001 From: Gordon Watts Date: Sat, 19 Mar 2022 04:31:45 +0100 Subject: [PATCH] Parse a `files` count thing --- src/servicex_did_finder_lib/communication.py | 46 ++++++++++--- src/servicex_did_finder_lib/util_uri.py | 52 +++++++++++++++ .../test_communication.py | 63 ++++++++++++++++++ .../servicex_did_finder_lib/test_util_uri.py | 65 +++++++++++++++++++ 4 files changed, 218 insertions(+), 8 deletions(-) create mode 100644 src/servicex_did_finder_lib/util_uri.py create mode 100644 tests/servicex_did_finder_lib/test_util_uri.py diff --git a/src/servicex_did_finder_lib/communication.py b/src/servicex_did_finder_lib/communication.py index c0d76f4..24647de 100644 --- a/src/servicex_did_finder_lib/communication.py +++ b/src/servicex_did_finder_lib/communication.py @@ -3,7 +3,7 @@ import json import logging import time -from typing import Any, AsyncGenerator, Callable, Dict, Optional +from typing import Any, AsyncGenerator, Callable, Dict, List, Optional import sys import pika @@ -11,6 +11,7 @@ from servicex_did_finder_lib.did_summary import DIDSummary from servicex_did_finder_lib.did_logging import initialize_root_logger +from servicex_did_finder_lib.util_uri import parse_did_uri from .servicex_adaptor import ServiceXAdapter # The type for the callback method to handle DID's, supplied by the user. @@ -26,17 +27,46 @@ __logging.addHandler(logging.NullHandler()) +class _accumulator: + 'Track or cache files depending on the mode we are operating in' + def __init__(self, sx: ServiceXAdapter, sum: DIDSummary, hold_till_end: bool): + self._servicex = sx + self._summary = sum + self._hold_till_end = hold_till_end + self._file_cache: List[Dict[str, Any]] = [] + + def add(self, file_info: Dict[str, Any]): + 'Track and inject the file back into the system' + if self._hold_till_end: + self._file_cache.append(file_info) + else: + self._summary.add_file(file_info) + if self._summary.file_count == 1: + self._servicex.post_transform_start() + self._servicex.put_file_add(file_info) + + def send_on(self, count): + 'Send the accumulated files' + if self._hold_till_end: + self._hold_till_end = False + files = sorted(self._file_cache, key=lambda x: x['file_path']) + for file_info in files[0:count]: + self.add(file_info) + + async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any], user_callback: UserDIDHandler): - summary = DIDSummary(did) start_time = datetime.now() + + summary = DIDSummary(did) + did_info = parse_did_uri(did) + hold_till_end = did_info.file_count != -1 + acc = _accumulator(servicex, summary, hold_till_end) + async for file_info in user_callback(did, info): + acc.add(file_info) - # Track the file, inject back into the system - summary.add_file(file_info) - if summary.file_count == 1: - servicex.post_transform_start() - servicex.put_file_add(file_info) + acc.send_on(did_info.file_count) # Simple error checking and reporting if summary.file_count == 0: @@ -109,7 +139,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie def init_rabbit_mq(user_callback: UserDIDHandler, rabbitmq_url: str, queue_name: str, retries: int, retry_interval: float, - file_prefix: str = None): + file_prefix: str = None): # type: ignore rabbitmq = None retry_count = 0 diff --git a/src/servicex_did_finder_lib/util_uri.py b/src/servicex_did_finder_lib/util_uri.py new file mode 100644 index 0000000..d879ec4 --- /dev/null +++ b/src/servicex_did_finder_lib/util_uri.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from typing import Dict, List +import urllib + + +@dataclass +class ParsedDIDInfo: + # The did to pass into the library + did: str + + # Mode to get the files (default 'all') + get_mode: str + + # Number of files to fetch (default '-1') + file_count: int + + +def parse_did_uri(uri: str) -> ParsedDIDInfo: + '''Parse the uri that is given to us from ServiceX, pulling out + the components we care about, and keeping the DID that needs to + be passed down. + + Args: + uri (str): DID from ServiceX + + Returns: + ParsedDIDInfo: The URI parsed into parts + ''' + info = urllib.parse.urlparse(uri) # type: ignore + + params = urllib.parse.parse_qs(info.query) # type: ignore + get_string = 'all' if 'get' not in params else params['get'][-1] + file_count = -1 if 'files' not in params else int(params['files'][0]) + + if get_string not in ['all', 'available']: + raise ValueError('Bad value for "get" string in DID - must be "all" or "available", not ' + f'"{get_string}"') + + for k in ['get', 'files']: + if k in params: + del params[k] + + def unwind_params(ps: Dict[str, List[str]]): + for k, values in ps.items(): + for v in values: + yield k, v + + new_query = "&".join(f'{k}={v}' for k, v in unwind_params(params)) + if len(new_query) > 0: + new_query = "?" + new_query + + return ParsedDIDInfo(info.path + new_query, get_string, file_count) diff --git a/tests/servicex_did_finder_lib/test_communication.py b/tests/servicex_did_finder_lib/test_communication.py index 2ecbef9..3d97071 100644 --- a/tests/servicex_did_finder_lib/test_communication.py +++ b/tests/servicex_did_finder_lib/test_communication.py @@ -257,6 +257,69 @@ async def my_user_callback(did, info): assert SXAdaptor.post_status_update.called_once() +@pytest.mark.asyncio +async def test_run_file_fetch_one(SXAdaptor, mocker): + async def my_user_callback(did, info): + return_values = [ + { + 'file_path': '/tmp/foo', + 'adler32': '13e4f', + 'file_size': 1024, + 'file_events': 128 + }, + { + 'file_path': '/tmp/bar', + 'adler32': 'f33d', + 'file_size': 2046, + 'file_events': 64 + } + ] + for v in return_values: + yield v + + await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback) + SXAdaptor.post_transform_start.assert_called_once() + + assert SXAdaptor.put_file_add.call_count == 1 + assert SXAdaptor.put_file_add.call_args_list[0][0][0]['file_path'] == '/tmp/bar' + + SXAdaptor.put_fileset_complete.assert_called_once + assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1 + assert SXAdaptor.post_status_update.called_once() + + +@pytest.mark.asyncio +async def test_run_file_fetch_one_reverse(SXAdaptor, mocker): + 'The files should be sorted so they return the same' + async def my_user_callback(did, info): + return_values = [ + { + 'file_path': '/tmp/bar', + 'adler32': 'f33d', + 'file_size': 2046, + 'file_events': 64 + }, + { + 'file_path': '/tmp/foo', + 'adler32': '13e4f', + 'file_size': 1024, + 'file_events': 128 + }, + ] + for v in return_values: + yield v + + await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback) + SXAdaptor.post_transform_start.assert_called_once() + + assert SXAdaptor.put_file_add.call_count == 1 + assert SXAdaptor.put_file_add.call_args_list[0][0][0]['file_path'] == '/tmp/bar' + + SXAdaptor.put_fileset_complete.assert_called_once + assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1 + assert SXAdaptor.post_status_update.called_once() + + @pytest.mark.asyncio async def test_run_file_fetch_loop_bad_did(SXAdaptor, mocker): async def my_user_callback(did, info): diff --git a/tests/servicex_did_finder_lib/test_util_uri.py b/tests/servicex_did_finder_lib/test_util_uri.py new file mode 100644 index 0000000..f336416 --- /dev/null +++ b/tests/servicex_did_finder_lib/test_util_uri.py @@ -0,0 +1,65 @@ +import pytest +from servicex_did_finder_lib.util_uri import parse_did_uri + + +def test_plain_uri(): + r = parse_did_uri('forkit') + + assert r.did == "forkit" + assert r.get_mode == "all" + assert r.file_count == -1 + + +def test_uri_with_mode_avail(): + r = parse_did_uri('forkit?get=available') + + assert r.did == "forkit" + assert r.get_mode == "available" + assert r.file_count == -1 + + +def test_uri_with_mode_all(): + r = parse_did_uri('forkit?get=all') + + assert r.did == "forkit" + assert r.get_mode == "all" + assert r.file_count == -1 + + +def test_uri_with_mode_bad(): + with pytest.raises(ValueError) as e: + parse_did_uri('forkit?get=all_available') + + assert "all_available" in str(e.value) + + +def test_uri_with_file_count(): + r = parse_did_uri('forkit?files=10') + + assert r.did == "forkit" + assert r.get_mode == "all" + assert r.file_count == 10 + + +def test_uri_with_file_count_neg(): + r = parse_did_uri('forkit?files=-1') + + assert r.did == "forkit" + assert r.get_mode == "all" + assert r.file_count == -1 + + +def test_uri_with_file_and_get(): + r = parse_did_uri('forkit?files=10&get=available') + + assert r.did == "forkit" + assert r.get_mode == "available" + assert r.file_count == 10 + + +def test_uri_with_other_params(): + r = parse_did_uri('forkit?get=available&stuff=hi&files=10') + + assert r.did == "forkit?stuff=hi" + assert r.get_mode == "available" + assert r.file_count == 10