Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rucio replica sorting service using geolocation #896

Merged
merged 21 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,176 changes: 921 additions & 255 deletions did_finder_rucio/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions did_finder_rucio/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ python = "^3.9"
rucio-clients = "^34.2.0"
xmltodict = "^0.13.0"
servicex-did-finder-lib = "^3.0.0"
geoip2 = "^4.7.0"
requests = ">=2.25.0,<3.0.0"

[tool.poetry.group.test]
Expand Down
15 changes: 14 additions & 1 deletion did_finder_rucio/src/rucio_did_finder/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from rucio_did_finder.lookup_request import LookupRequest
from rucio_did_finder.rucio_adapter import RucioAdapter
from servicex_did_finder_lib import DIDFinderApp
from .replica_distance import ReplicaSorter

__log = logging.getLogger(__name__)

Expand All @@ -43,14 +44,26 @@
replica_client = ReplicaClient()
rucio_adapter = RucioAdapter(did_client, replica_client, False)

if 'RUCIO_LATITUDE' in os.environ and 'RUCIO_LONGITUDE' in os.environ\
and 'USE_REPLICA_SORTER' in os.environ:
location = {'latitude': float(os.environ['RUCIO_LATITUDE']),
'longitude': float(os.environ['RUCIO_LONGITUDE'])
}
replica_sorter = ReplicaSorter()
else:
location = None
replica_sorter = None

app = DIDFinderApp('rucio', did_finder_args={"rucio_adapter": rucio_adapter})


def find_files(did_name, info, did_finder_args):
lookup_request = LookupRequest(
did=did_name,
rucio_adapter=did_finder_args['rucio_adapter'],
dataset_id=info['dataset-id']
dataset_id=info['dataset-id'],
replica_sorter=replica_sorter,
location=location,
)
for file in lookup_request.lookup_files():
yield file
Expand Down
14 changes: 13 additions & 1 deletion did_finder_rucio/src/rucio_did_finder/lookup_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import logging
from datetime import datetime
from rucio_did_finder.rucio_adapter import RucioAdapter
from .replica_distance import ReplicaSorter
from typing import Optional, Mapping


class LookupRequest:
def __init__(self, did: str,
rucio_adapter: RucioAdapter,
dataset_id: str = 'bogus-id'):
dataset_id: str = 'bogus-id',
replica_sorter: Optional[ReplicaSorter] = None,
location: Optional[Mapping[str, float]] = None):
'''Create the `LookupRequest` object that is responsible for returning
lists of files. Processes things in chunks.

Expand All @@ -51,6 +55,9 @@ def __init__(self, did: str,
self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.NullHandler())

self.location = location
self.replica_sorter = replica_sorter

def lookup_files(self):
"""
lookup files.
Expand All @@ -68,6 +75,11 @@ def lookup_files(self):
n_files += 1
ds_size += af['file_size']
total_paths += len(af['paths'])
ipaths = af['paths'].copy()
self.logger.debug(f'path before {ipaths}')
if self.replica_sorter is not None and self.location is not None:
af['paths'] = self.replica_sorter.sort_replicas(ipaths, self.location)
self.logger.debug(f'path after {af["paths"]}')
full_file_list.append(af)
yield full_file_list

Expand Down
167 changes: 167 additions & 0 deletions did_finder_rucio/src/rucio_did_finder/replica_distance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright (c) 2024, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
import os

from typing import List, Mapping, Optional, Tuple
from socket import gethostbyname
import math
from functools import lru_cache
import tempfile
from urllib.parse import urlparse
import geoip2.database
import geoip2.errors
from collections import namedtuple


Replica_distance = namedtuple('Replica_distance', 'replica distance')
logger = logging.getLogger('ReplicaDistanceService')


def _haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float):
''' Assume inputs are in degrees; will convert to radians. Returns distance in radians '''
dellat = math.radians(lat2-lat1)
dellon = math.radians(lon2-lon1)
hav_theta = ((1-math.cos(dellat))/2 +
math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*(1-math.cos(dellon))/2)

return 2*math.asin(math.sqrt(hav_theta))


@lru_cache
def _get_distance(database: Optional[geoip2.database.Reader],
fqdn: str, my_lat: float, my_lon: float):
"""
Determine angular distance between server at fqdn and (my_lat, my_lon).
If there is a failure of fdqn location lookup, will return pi
(the largest possible physical result)
"""
if database is None:
return math.pi
ponyisi marked this conversation as resolved.
Show resolved Hide resolved
try:
loc_data = database.city(gethostbyname(fqdn)).location
except geoip2.errors.AddressNotFoundError as e:
logger.warning(f'Cannot geolocate {fqdn}, returning maximum distance.\nError: {e}')
return math.pi
site_lat, site_lon = loc_data.latitude, loc_data.longitude
if site_lat is None or site_lon is None:
return math.pi
return _haversine_distance(site_lat, site_lon, my_lat, my_lon)


class ReplicaSorter(object):
_database: Optional[geoip2.database.Reader] = None
# we keep the temporary directory around so it won't get randomly deleted by the GC
_tmpdir: Optional[tempfile.TemporaryDirectory] = None

def __init__(self, db_url_tuple: Optional[Tuple[str, bool]] = None):
"""
Argument is an optional tuple of (URL, bool).
The URL is assumed to be a file to download; the bool indicates whether
it is ready to be used (True) or needs unpacking (False)
"""
if db_url_tuple is None:
db_url_tuple = self.get_download_url_from_environment()
self._download_data(db_url_tuple)

def sort_replicas(self, replicas: List[str], location: Mapping[str, float]) -> List[str]:
"""
Main method of this class.
replicas: list of strings which are the URLs for the replicas for a file
location: dict of the form {'latitude': xxx, 'longitude': yyy} where xxx and yyy are floats
ponyisi marked this conversation as resolved.
Show resolved Hide resolved
giving the latitude and longitude in signed degrees
"""
if not self._database:
return replicas
if len(replicas) == 1:
return replicas
fqdns = [(urlparse(replica).hostname, replica) for replica in replicas]
distances = [Replica_distance(replica=replica,
distance=_get_distance(self._database, fqdn,
location['latitude'],
location['longitude']
)
)
for fqdn, replica in fqdns]
distances.sort(key=lambda x: x.distance)
return [_.replica for _ in distances]

@classmethod
def get_download_url_from_key_and_edition(cls, license_key: str, edition: str):
"""
Construct the (url, unpacked) tuple to feed to the constructor from a license key
and an edition of the MaxMind database.
"""
return (('https://download.maxmind.com/app/geoip_download?'
f'edition_id={edition}&license_key={license_key}&suffix=tar.gz'),
False)

@classmethod
def get_download_url_from_environment(cls) -> Optional[Tuple[str, bool]]:
"""
Based on environment variables, this will give a tuple of the URL and a bool which is
True if the file from the URL is ready to use as is, False if needs to be unpacked
"""
if url := os.environ.get('GEOIP_DB_URL', ''):
return (url, True)
key = os.environ.get('GEOIP_DB_LICENSE_KEY', '')
edition = os.environ.get('GEOIP_DB_EDITION', '')
if (key and edition):
return cls.get_download_url_from_key_and_edition(key, edition)
else:
return None

def _download_data(self, db_url_tuple: Optional[Tuple[str, bool]]) -> None:
"""
Retrieves and unpacks the MaxMind databases and initializes the GeoIP reader
"""
from urllib.request import urlretrieve
import tarfile
import glob
if db_url_tuple is None:
return
url, unpacked = db_url_tuple
try:
fname, _ = urlretrieve(url)
except Exception as e:
logger.error(f'Failure retrieving GeoIP database {url}.\nError: {e}')
return
try:
if unpacked:
self._database = geoip2.database.Reader(fname)
else:
tarball = tarfile.open(fname)
self._tmpdir = tempfile.TemporaryDirectory()
tarball.extractall(self._tmpdir.name)
self._database = geoip2.database.Reader(glob.glob(os.path.join(self._tmpdir.name,
'*/*mmdb')
)[0])
except Exception as e:
logger.error(f'Failure initializing the GeoIP database reader.\nError: {e}')
self._database = None
return
1 change: 0 additions & 1 deletion did_finder_rucio/src/rucio_did_finder/rucio_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(self, did_client, replica_client, report_logical_files=False):

def client_location(self):
client_location = {}
# setting the site actually seems to work
if 'SITE_NAME' in os.environ:
client_location['site'] = os.environ['SITE_NAME']
latitude = os.environ.get('RUCIO_LATITUDE')
Expand Down
78 changes: 78 additions & 0 deletions did_finder_rucio/tests/did_finder/test_replica_sorter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright (c) 2024, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

GEOIP_URL = 'https://ponyisi.web.cern.ch/public/GeoLite2-City.mmdb'
GEOIP_TGZ_URL = 'https://ponyisi.web.cern.ch/public/GeoLite2-City_20241015.tar.gz'

# some URLs (real FQDNs, not real file paths)
REPLICAS = ['https://ccxrootdatlas.in2p3.fr:1094//pnfs/DAOD_PHYSLITE.37020764._000004.pool.root.1',
'root://fax.mwt2.org:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1',
'root://atlasdcache-kit.gridka.de:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1']

SORTED_REPLICAS = ['root://fax.mwt2.org:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1',
'https://ccxrootdatlas.in2p3.fr:1094//pnfs/DAOD_PHYSLITE.37020764._000004.pool.root.1', # noqa: E501
'root://atlasdcache-kit.gridka.de:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1'] # noqa: E501

JUNK_REPLICAS = ['https://junk.does.not.exist.org/',
'root://fax.mwt2.org:1094//pnfs/uchicago.edu/']
SORTED_JUNK_REPLICAS = ['root://fax.mwt2.org:1094//pnfs/uchicago.edu/',
'https://junk.does.not.exist.org/']

LOCATION = {'latitude': 41.78, 'longitude': -87.7}


def test_sorting():
"""Also test unpacking tgz database"""
from rucio_did_finder.replica_distance import ReplicaSorter
rs = ReplicaSorter((GEOIP_TGZ_URL, False))
# Given location (Chicago) replicas should sort US, FR, DE
sorted = rs.sort_replicas(REPLICAS, LOCATION)
assert sorted == SORTED_REPLICAS
# the nonexistent FQDN should sort at end
sorted = rs.sort_replicas(JUNK_REPLICAS, LOCATION)
assert sorted == SORTED_JUNK_REPLICAS


def test_envvars():
"""Only tests unpacked DB download"""
from rucio_did_finder.replica_distance import ReplicaSorter
import os
os.environ['GEOIP_DB_URL'] = GEOIP_URL
rs = ReplicaSorter()
sorted = rs.sort_replicas(REPLICAS, LOCATION)
assert sorted == SORTED_REPLICAS
del os.environ['GEOIP_DB_URL']


def test_bad_geodb():
"""Tests what happens when we have a bad DB URL"""
from rucio_did_finder.replica_distance import ReplicaSorter
rs = ReplicaSorter(('https://junk.does.not.exist.org', False))
assert rs._database is None
sorted = rs.sort_replicas(REPLICAS, LOCATION)
assert sorted == REPLICAS
1 change: 1 addition & 0 deletions helm/example_secrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ data:
rabbitmq-password: << rabbitMQ password >>
rabbitmq-erlang-cookie: << rabbitMQ erlang cookie >>
postgresql-password: << postgresql password for postgres user >>
geoip-license-key: << MaxMind license key for GeoIP database download >>
20 changes: 20 additions & 0 deletions helm/servicex/templates/did-finder-rucio/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ spec:
- name: SITE_NAME
value: "{{ .Values.didFinder.rucio.site }}"
{{- end }}
{{- if .Values.didFinder.rucio.replicaSorterEnabled }}
- name: USE_REPLICA_SORTER
value: "1"
{{- if .Values.didFinder.rucio.geoIPURL }}
- name: GEOIP_DB_URL
value: {{ .Values.didFinder.rucio.geoIPURL }}
{{- else }}
{{- if .Values.didFinder.rucio.geoIPEdition -}}
- name: GEOIP_DB_EDITION
value: {{ .Values.didFinder.rucio.geoIPEdition }}
{{- end -}}
{{- if .Values.secrets -}}
- name: GEOIP_DB_LICENSE_KEY
valueFrom:
secretKeyRef:
name: {{ .Values.secrets }}
key: geoip-license-key
{{- end }}
{{- end }}
{{- end }}
- name: INSTANCE_NAME
value: {{ .Release.Name }}
volumeMounts:
Expand Down
5 changes: 4 additions & 1 deletion helm/servicex/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ codeGen:
defaultScienceContainerImage: sslhep/servicex_func_adl_cms_aod_transformer
defaultScienceContainerTag: cmssw-5-3-32


didFinder:
CERNOpenData:
enabled: true
Expand All @@ -98,6 +97,10 @@ didFinder:
servicex_latitude: 41.78
servicex_longitude: -87.7
tag: develop
replicaSorterEnabled: false
geoIPURL: null
geoIPEdition: null

gridAccount: <your account>
logging:
logstash:
Expand Down
Loading