Skip to content

Commit

Permalink
now getting request_id too.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivukotic committed Sep 21, 2023
1 parent 2ce29ac commit 5d467a1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
10 changes: 8 additions & 2 deletions src/servicex_did_finder_lib/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,16 @@ def rabbit_mq_callback(
did_request = json.loads(body)
did = did_request["did"]
dataset_id = did_request["dataset_id"]
request_id = did_request["request_id"]
endpoint = did_request["endpoint"]
__logging.info(
f"Received DID request {did_request}", extra={"dataset_id": dataset_id}
f"Received DID request {did_request}",
extra={"request_id": request_id, "dataset_id": dataset_id}
)
servicex = ServiceXAdapter(did_request["service-endpoint"], file_prefix)
servicex = ServiceXAdapter(request_id=request_id,
dataset_id=dataset_id,
endpoint=endpoint,
file_prefix=file_prefix)
servicex.post_status_update("DID Request received")

info = {
Expand Down
17 changes: 8 additions & 9 deletions src/servicex_did_finder_lib/servicex_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@


class ServiceXAdapter:
def __init__(self, endpoint, file_prefix=None):
def __init__(self, endpoint, dataset_id, request_id=None, file_prefix=None):
self.endpoint = endpoint
self.request_id = request_id
self.dataset_id = dataset_id
self.file_prefix = file_prefix

self.logger = logging.getLogger(__name__)
Expand All @@ -47,7 +49,7 @@ def post_status_update(self, status_msg, severity="info"):
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.post(self.endpoint + "/status", data={
requests.post(f"{self.endpoint}{self.request_id}/status", data={
"timestamp": datetime.now().isoformat(),
"source": "DID Finder",
"severity": severity,
Expand Down Expand Up @@ -82,7 +84,7 @@ def put_file_add(self, file_info):
while not success and attempts < MAX_RETRIES:
try:
mesg = self._create_json(file_info)
requests.put(self.endpoint + "/files", json=mesg)
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
self.logger.info(f"Metric: {json.dumps(mesg)}")
success = True
except requests.exceptions.ConnectionError:
Expand All @@ -105,7 +107,7 @@ def put_file_add_bulk(self, file_list, chunk_length=30):
mesg.append(self._create_json(fi))
while not success and attempts < MAX_RETRIES:
try:
requests.put(self.endpoint + "/files", json=mesg)
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
self.logger.info(f"Metric: {json.dumps(mesg)}")
success = True
except requests.exceptions.ConnectionError:
Expand All @@ -116,15 +118,12 @@ def put_file_add_bulk(self, file_list, chunk_length=30):
self.logger.error(f'After {attempts} tries, failed to send ServiceX App '
f'a put_file_bulk message: {mesg} - Ignoring error.')

# should be removed...
# transforms can start as soon as request has been made. No need to wait for this.
# whole endpoint '/start' is not needed.
def post_transform_start(self):
success = False
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.post(self.endpoint + "/start")
requests.post(f"{self.endpoint}{self.request_id}/start")
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
Expand All @@ -139,7 +138,7 @@ def put_fileset_complete(self, summary):
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.put(self.endpoint + "/complete", json=summary)
requests.put(f"{self.endpoint}{self.dataset_id}/complete", json=summary)
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
Expand Down

0 comments on commit 5d467a1

Please sign in to comment.