Skip to content

Commit

Permalink
big cleanup. no request_did, no prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
ivukotic committed Oct 4, 2023
1 parent b9fc72d commit 0e072be
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 108 deletions.
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ Create an async callback method that `yield`s file info dictionaries. For exampl
The arguments to the method are straight forward:

* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
* `info` contains a dict of various info about the request that asked for this DID:
* `request-id` The request id that has this DID associated. For logging.
* `info` contains a dict of various info about the request that asked for this DID.

Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows:

Expand Down Expand Up @@ -114,7 +113,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
__log = logger.getLogger(__name__)
async def my_callback(did_name: str, info: Dict[str, Any]):
__log.info(f'Looking up dataset {did_name}.',
extra={'requestId': info['request-id']})
extra={'somethign': info['something']})

for i in range(0, 10):
yield {
Expand All @@ -125,9 +124,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
}
```

Note the parameter `request-id`: this marks the log messages with the request id that triggered this DID request. This will enable the system to track all log messages across all containers connected with this particular request id - making debugging a lot easier.

The `start_did_finder` will configure the python root logger properly to dump messages with a request ID in them.
The `start_did_finder` will configure the python root logger properly.

## URI Format

Expand Down
29 changes: 6 additions & 23 deletions src/servicex_did_finder_lib/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ async def run_file_fetch_loop(
# If we've been holding onto any files, we need to send them now.
acc.send_on(did_info.file_count)

# Simple error checking and reporting
if summary.file_count == 0:
servicex.post_status_update(
f"DID Finder found zero files for dataset {did}", severity="fatal"
)

elapsed_time = int((datetime.now() - start_time).total_seconds())
servicex.put_fileset_complete(
{
Expand All @@ -107,11 +101,10 @@ async def run_file_fetch_loop(
"elapsed-time": elapsed_time,
}
)
servicex.post_status_update(f"Completed load of files in {elapsed_time} seconds")


def rabbit_mq_callback(
user_callback: UserDIDHandler, channel, method, properties, body, file_prefix=None
user_callback: UserDIDHandler, channel, method, properties, body
):
"""rabbit_mq_callback Respond to RabbitMQ Message
Expand All @@ -125,25 +118,20 @@ def rabbit_mq_callback(
method ([type]): Delivery method
properties ([type]): Properties of the message
body ([type]): The body (json for us) of the message
file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service
"""
dataset_id = None # set this in case we get an exception while loading request
request_id = None
try:
# Unpack the message. Really bad if we fail up here!
did_request = json.loads(body)
did = did_request["did"]
dataset_id = did_request["dataset_id"]
request_id = did_request["request_id"]
endpoint = did_request["endpoint"]
__logging.info(
f"Received DID request {did_request}",
extra={"request_id": request_id, "dataset_id": dataset_id}
extra={"dataset_id": dataset_id}
)
servicex = ServiceXAdapter(request_id=request_id,
dataset_id=dataset_id,
endpoint=endpoint,
file_prefix=file_prefix)
servicex = ServiceXAdapter(dataset_id=dataset_id,
endpoint=endpoint)

info = {
"dataset-id": dataset_id,
Expand All @@ -155,11 +143,7 @@ def rabbit_mq_callback(

except Exception as e:
_, exec_value, _ = sys.exc_info()
__logging.exception("DID Request Failed", extra={"dataset_id": dataset_id})
servicex.post_status_update(
f"DID Request Failed for id {dataset_id}: " f"{str(e)} - {exec_value}",
severity="fatal",
)
__logging.exception(f"DID Request Failed {str(e)}", extra={"dataset_id": dataset_id})
raise

except Exception as e:
Expand All @@ -177,7 +161,6 @@ def init_rabbit_mq(
queue_name: str,
retries: int,
retry_interval: float,
file_prefix: str = None,
): # type: ignore
rabbitmq = None
retry_count = 0
Expand All @@ -194,7 +177,7 @@ def init_rabbit_mq(
queue=queue_name,
auto_ack=False,
on_message_callback=lambda c, m, p, b: rabbit_mq_callback(
user_callback, c, m, p, b, file_prefix
user_callback, c, m, p, b
),
)
_channel.start_consuming()
Expand Down
14 changes: 7 additions & 7 deletions src/servicex_did_finder_lib/did_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

class DIDFormatter(logging.Formatter):
"""
Need a customer formatter to allow for logging with request ids that vary.
Normally log messages are "level instance component request_id msg" and
request_id gets set by initialize_logging but we need a handler that'll let
us pass in the request id and have that embedded in the log message
Need a customer formatter to allow for logging with dataset ids that vary.
Normally log messages are "level instance component dataset_id msg" and
dataset_id gets set by initialize_logging but we need a handler that'll let
us pass in the dataset id and have that embedded in the log message
"""

def format(self, record: logging.LogRecord) -> str:
Expand All @@ -18,10 +18,10 @@ def format(self, record: logging.LogRecord) -> str:
:return: formatted log message
"""

if hasattr(record, "requestId"):
if hasattr(record, "datasetId"):
return super().format(record)
else:
setattr(record, "requestId", None)
setattr(record, "datasetId", None)
return super().format(record)


Expand All @@ -36,7 +36,7 @@ def initialize_root_logger(did_scheme: str):
instance = os.environ.get('INSTANCE_NAME', 'Unknown')
formatter = DIDFormatter('%(levelname)s ' +
f"{instance} {did_scheme}_did_finder " +
'%(requestId)s %(message)s')
'%(datasetId)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(logging.INFO)
Expand Down
34 changes: 3 additions & 31 deletions src/servicex_did_finder_lib/servicex_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,17 @@


class ServiceXAdapter:
def __init__(self, endpoint, dataset_id, request_id=None, file_prefix=None):
def __init__(self, endpoint, dataset_id):
self.endpoint = endpoint
self.request_id = request_id
self.dataset_id = dataset_id
self.file_prefix = file_prefix

self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.NullHandler())

# TODO - remove. did finder should not know about request_id and call any
# endpoint with request_id.
def post_status_update(self, status_msg, severity="info"):
success = False
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.post(f"{self.endpoint}{self.request_id}/status", data={
"timestamp": datetime.now().isoformat(),
"source": "DID Finder",
"severity": severity,
"info": status_msg
})
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
f'(try {attempts} out of {MAX_RETRIES}')
attempts += 1
if not success:
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status '
f'message: {str(status_msg)} - Ignoring error.')

# TODO - remove
def _prefix_file(self, file_path):
return file_path if not self.file_prefix else self.file_prefix+file_path

def _create_json(self, file_info):
return {
"timestamp": datetime.now().isoformat(),
"paths": [self._prefix_file(fp) for fp in file_info['paths']],
"paths": file_info['paths'],
'adler32': file_info['adler32'],
'file_size': file_info['file_size'],
'file_events': file_info['file_events']
Expand All @@ -86,7 +58,7 @@ def put_file_add(self, file_info):
try:
mesg = self._create_json(file_info)
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
self.logger.info(f"Metric: {json.dumps(mesg)}")
self.logger.info("adding file:", extra=file_info)
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
Expand Down
22 changes: 1 addition & 21 deletions tests/servicex_did_finder_lib/test_communication.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import json
from typing import Any, AsyncGenerator, Dict, Optional
from unittest.mock import ANY, MagicMock, patch
from unittest.mock import MagicMock, patch

import pika
import pytest
Expand All @@ -27,7 +27,6 @@ def send_did_request(self, did_name: str):
properties = MagicMock()
body = json.dumps(
{
"request_id": '123-456',
"did": did_name,
"dataset_id": "000-111-222-444",
"endpoint": "http://localhost:2334/",
Expand Down Expand Up @@ -315,7 +314,6 @@ async def my_callback(
# Make sure the file was sent along, along with the completion
SXAdaptor.put_file_add.assert_not_called()
SXAdaptor.put_fileset_complete.assert_not_called()
SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal")


def test_failed_file_after_good(rabbitmq, SXAdaptor):
Expand Down Expand Up @@ -344,7 +342,6 @@ async def my_callback(
# Make sure the file was sent along, along with the completion
SXAdaptor.put_file_add.assert_called_once()
SXAdaptor.put_fileset_complete.assert_not_called()
SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal")


def test_failed_file_after_good_with_avail(rabbitmq, SXAdaptor):
Expand Down Expand Up @@ -373,7 +370,6 @@ async def my_callback(
# Make sure the file was sent along, along with the completion
SXAdaptor.put_file_add.assert_called_once()
SXAdaptor.put_fileset_complete.assert_called_once()
SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds")


def test_failed_file_after_good_with_avail_limited_number(rabbitmq, SXAdaptor):
Expand Down Expand Up @@ -408,7 +404,6 @@ async def my_callback(
# Make sure the file was sent along, along with the completion
SXAdaptor.put_file_add_bulk.assert_called_once()
SXAdaptor.put_fileset_complete.assert_called_once()
SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds")


def test_no_files_returned(rabbitmq, SXAdaptor):
Expand All @@ -433,7 +428,6 @@ async def my_callback(
SXAdaptor.put_file_add.assert_not_called()
SXAdaptor.put_fileset_complete.assert_called_once()
assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 0
SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal")


def test_rabbitmq_connection_failure(rabbitmq_fail_once, SXAdaptor):
Expand Down Expand Up @@ -532,8 +526,6 @@ async def my_user_callback(did, info):
assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-events"] == 192
assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-bytes"] == 3070

assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
async def test_run_file_bulk_fetch_loop(SXAdaptor, mocker):
Expand Down Expand Up @@ -570,8 +562,6 @@ async def my_user_callback(did, info):
assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-events"] == 192
assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-bytes"] == 3070

assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
async def test_run_file_fetch_one(SXAdaptor, mocker):
Expand Down Expand Up @@ -609,7 +599,6 @@ async def my_user_callback(did, info):

SXAdaptor.put_fileset_complete.assert_called_once
assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1
assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
Expand Down Expand Up @@ -650,7 +639,6 @@ async def my_user_callback(did, info):

SXAdaptor.put_fileset_complete.assert_called_once
assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1
assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
Expand Down Expand Up @@ -689,7 +677,6 @@ async def my_user_callback(did, info):

SXAdaptor.put_fileset_complete.assert_called_once
assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1
assert SXAdaptor.post_status_update.called_once()


@pytest.mark.asyncio
Expand All @@ -703,10 +690,3 @@ async def my_user_callback(did, info):

assert SXAdaptor.put_file_add.assert_not_called
SXAdaptor.put_fileset_complete.assert_called_once()

assert (
SXAdaptor.post_status_update.call_args_list[0][0][0]
== "DID Finder found zero files for dataset 123-456"
)

assert SXAdaptor.post_status_update.call_args_list[0][1]["severity"] == "fatal"
20 changes: 0 additions & 20 deletions tests/servicex_did_finder_lib/test_servicex_did.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,3 @@ def test_put_file_add_bulk_large():
'file_events': 3141
}] * 32)
assert len(responses.calls) == 2


@responses.activate
def test_put_file_add_with_prefix():

responses.add(responses.PUT, 'http://servicex.org/12345/files', status=206)
sx = ServiceXAdapter("http://servicex.org/", '12345', file_prefix="xcache123:")
sx.put_file_add({
'paths': ['root://foo.bar.ROOT'],
'adler32': '32',
'file_size': 1024,
'file_events': 3141
})

assert len(responses.calls) == 1
submitted = json.loads(responses.calls[0].request.body)
assert submitted['paths'][0] == 'xcache123:root://foo.bar.ROOT'
assert submitted['adler32'] == '32'
assert submitted['file_events'] == 3141
assert submitted['file_size'] == 1024

0 comments on commit 0e072be

Please sign in to comment.