Skip to content

Commit

Permalink
Parse a files count thing
Browse files Browse the repository at this point in the history
  • Loading branch information
gordonwatts committed Mar 19, 2022
1 parent 7c4e0e1 commit 7abbcfb
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 8 deletions.
46 changes: 38 additions & 8 deletions src/servicex_did_finder_lib/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import json
import logging
import time
from typing import Any, AsyncGenerator, Callable, Dict, Optional
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional
import sys

import pika
from make_it_sync import make_sync

from servicex_did_finder_lib.did_summary import DIDSummary
from servicex_did_finder_lib.did_logging import initialize_root_logger
from servicex_did_finder_lib.util_uri import parse_did_uri
from .servicex_adaptor import ServiceXAdapter

# The type for the callback method to handle DID's, supplied by the user.
Expand All @@ -26,17 +27,46 @@
__logging.addHandler(logging.NullHandler())


class _accumulator:
'Track or cache files depending on the mode we are operating in'
def __init__(self, sx: ServiceXAdapter, sum: DIDSummary, hold_till_end: bool):
self._servicex = sx
self._summary = sum
self._hold_till_end = hold_till_end
self._file_cache: List[Dict[str, Any]] = []

def add(self, file_info: Dict[str, Any]):
'Track and inject the file back into the system'
if self._hold_till_end:
self._file_cache.append(file_info)
else:
self._summary.add_file(file_info)
if self._summary.file_count == 1:
self._servicex.post_transform_start()
self._servicex.put_file_add(file_info)

def send_on(self, count):
'Send the accumulated files'
if self._hold_till_end:
self._hold_till_end = False
files = sorted(self._file_cache, key=lambda x: x['file_path'])
for file_info in files[0:count]:
self.add(file_info)


async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any],
user_callback: UserDIDHandler):
summary = DIDSummary(did)
start_time = datetime.now()

summary = DIDSummary(did)
did_info = parse_did_uri(did)
hold_till_end = did_info.file_count != -1
acc = _accumulator(servicex, summary, hold_till_end)

async for file_info in user_callback(did, info):
acc.add(file_info)

# Track the file, inject back into the system
summary.add_file(file_info)
if summary.file_count == 1:
servicex.post_transform_start()
servicex.put_file_add(file_info)
acc.send_on(did_info.file_count)

# Simple error checking and reporting
if summary.file_count == 0:
Expand Down Expand Up @@ -109,7 +139,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
def init_rabbit_mq(user_callback: UserDIDHandler,
rabbitmq_url: str, queue_name: str, retries: int,
retry_interval: float,
file_prefix: str = None):
file_prefix: str = None): # type: ignore
rabbitmq = None
retry_count = 0

Expand Down
52 changes: 52 additions & 0 deletions src/servicex_did_finder_lib/util_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from dataclasses import dataclass
from typing import Dict, List
import urllib


@dataclass
class ParsedDIDInfo:
# The did to pass into the library
did: str

# Mode to get the files (default 'all')
get_mode: str

# Number of files to fetch (default '-1')
file_count: int


def parse_did_uri(uri: str) -> ParsedDIDInfo:
'''Parse the uri that is given to us from ServiceX, pulling out
the components we care about, and keeping the DID that needs to
be passed down.
Args:
uri (str): DID from ServiceX
Returns:
ParsedDIDInfo: The URI parsed into parts
'''
info = urllib.parse.urlparse(uri) # type: ignore

params = urllib.parse.parse_qs(info.query) # type: ignore
get_string = 'all' if 'get' not in params else params['get'][-1]
file_count = -1 if 'files' not in params else int(params['files'][0])

if get_string not in ['all', 'available']:
raise ValueError('Bad value for "get" string in DID - must be "all" or "available", not '
f'"{get_string}"')

for k in ['get', 'files']:
if k in params:
del params[k]

def unwind_params(ps: Dict[str, List[str]]):
for k, values in ps.items():
for v in values:
yield k, v

new_query = "&".join(f'{k}={v}' for k, v in unwind_params(params))
if len(new_query) > 0:
new_query = "?" + new_query

return ParsedDIDInfo(info.path + new_query, get_string, file_count)
63 changes: 63 additions & 0 deletions tests/servicex_did_finder_lib/test_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,69 @@ async def my_user_callback(did, info):
assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
async def test_run_file_fetch_one(SXAdaptor, mocker):
async def my_user_callback(did, info):
return_values = [
{
'file_path': '/tmp/foo',
'adler32': '13e4f',
'file_size': 1024,
'file_events': 128
},
{
'file_path': '/tmp/bar',
'adler32': 'f33d',
'file_size': 2046,
'file_events': 64
}
]
for v in return_values:
yield v

await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback)
SXAdaptor.post_transform_start.assert_called_once()

assert SXAdaptor.put_file_add.call_count == 1
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['file_path'] == '/tmp/bar'

SXAdaptor.put_fileset_complete.assert_called_once
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1
assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
async def test_run_file_fetch_one_reverse(SXAdaptor, mocker):
'The files should be sorted so they return the same'
async def my_user_callback(did, info):
return_values = [
{
'file_path': '/tmp/bar',
'adler32': 'f33d',
'file_size': 2046,
'file_events': 64
},
{
'file_path': '/tmp/foo',
'adler32': '13e4f',
'file_size': 1024,
'file_events': 128
},
]
for v in return_values:
yield v

await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback)
SXAdaptor.post_transform_start.assert_called_once()

assert SXAdaptor.put_file_add.call_count == 1
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['file_path'] == '/tmp/bar'

SXAdaptor.put_fileset_complete.assert_called_once
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1
assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
async def test_run_file_fetch_loop_bad_did(SXAdaptor, mocker):
async def my_user_callback(did, info):
Expand Down
65 changes: 65 additions & 0 deletions tests/servicex_did_finder_lib/test_util_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pytest
from servicex_did_finder_lib.util_uri import parse_did_uri


def test_plain_uri():
r = parse_did_uri('forkit')

assert r.did == "forkit"
assert r.get_mode == "all"
assert r.file_count == -1


def test_uri_with_mode_avail():
r = parse_did_uri('forkit?get=available')

assert r.did == "forkit"
assert r.get_mode == "available"
assert r.file_count == -1


def test_uri_with_mode_all():
r = parse_did_uri('forkit?get=all')

assert r.did == "forkit"
assert r.get_mode == "all"
assert r.file_count == -1


def test_uri_with_mode_bad():
with pytest.raises(ValueError) as e:
parse_did_uri('forkit?get=all_available')

assert "all_available" in str(e.value)


def test_uri_with_file_count():
r = parse_did_uri('forkit?files=10')

assert r.did == "forkit"
assert r.get_mode == "all"
assert r.file_count == 10


def test_uri_with_file_count_neg():
r = parse_did_uri('forkit?files=-1')

assert r.did == "forkit"
assert r.get_mode == "all"
assert r.file_count == -1


def test_uri_with_file_and_get():
r = parse_did_uri('forkit?files=10&get=available')

assert r.did == "forkit"
assert r.get_mode == "available"
assert r.file_count == 10


def test_uri_with_other_params():
r = parse_did_uri('forkit?get=available&stuff=hi&files=10')

assert r.did == "forkit?stuff=hi"
assert r.get_mode == "available"
assert r.file_count == 10

0 comments on commit 7abbcfb

Please sign in to comment.