-
-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rucio replica sorting service using geolocation (#896)
- Loading branch information
Showing
11 changed files
with
1,606 additions
and
11 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
167 changes: 167 additions & 0 deletions
167
did_finder_rucio/src/rucio_did_finder/replica_distance.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
# Copyright (c) 2024, IRIS-HEP | ||
# All rights reserved. | ||
# | ||
# Redistribution and use in source and binary forms, with or without | ||
# modification, are permitted provided that the following conditions are met: | ||
# | ||
# * Redistributions of source code must retain the above copyright notice, this | ||
# list of conditions and the following disclaimer. | ||
# | ||
# * Redistributions in binary form must reproduce the above copyright notice, | ||
# this list of conditions and the following disclaimer in the documentation | ||
# and/or other materials provided with the distribution. | ||
# | ||
# * Neither the name of the copyright holder nor the names of its | ||
# contributors may be used to endorse or promote products derived from | ||
# this software without specific prior written permission. | ||
# | ||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | ||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | ||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
import logging | ||
import os | ||
|
||
from typing import List, Mapping, Optional, Tuple | ||
from socket import gethostbyname | ||
import math | ||
from functools import lru_cache | ||
import tempfile | ||
from urllib.parse import urlparse | ||
import geoip2.database | ||
import geoip2.errors | ||
from collections import namedtuple | ||
|
||
|
||
Replica_distance = namedtuple('Replica_distance', 'replica distance') | ||
logger = logging.getLogger('ReplicaDistanceService') | ||
|
||
|
||
def _haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float): | ||
''' Assume inputs are in degrees; will convert to radians. Returns distance in radians ''' | ||
dellat = math.radians(lat2-lat1) | ||
dellon = math.radians(lon2-lon1) | ||
hav_theta = ((1-math.cos(dellat))/2 + | ||
math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*(1-math.cos(dellon))/2) | ||
|
||
return 2*math.asin(math.sqrt(hav_theta)) | ||
|
||
|
||
@lru_cache | ||
def _get_distance(database: Optional[geoip2.database.Reader], | ||
fqdn: str, my_lat: float, my_lon: float): | ||
""" | ||
Determine angular distance between server at fqdn and (my_lat, my_lon). | ||
If there is a failure of fdqn location lookup, will return pi | ||
(the largest possible physical result) | ||
""" | ||
if database is None: | ||
return math.pi | ||
try: | ||
loc_data = database.city(gethostbyname(fqdn)).location | ||
except geoip2.errors.AddressNotFoundError as e: | ||
logger.warning(f'Cannot geolocate {fqdn}, returning maximum distance.\nError: {e}') | ||
return math.pi | ||
site_lat, site_lon = loc_data.latitude, loc_data.longitude | ||
if site_lat is None or site_lon is None: | ||
return math.pi | ||
return _haversine_distance(site_lat, site_lon, my_lat, my_lon) | ||
|
||
|
||
class ReplicaSorter(object): | ||
_database: Optional[geoip2.database.Reader] = None | ||
# we keep the temporary directory around so it won't get randomly deleted by the GC | ||
_tmpdir: Optional[tempfile.TemporaryDirectory] = None | ||
|
||
def __init__(self, db_url_tuple: Optional[Tuple[str, bool]] = None): | ||
""" | ||
Argument is an optional tuple of (URL, bool). | ||
The URL is assumed to be a file to download; the bool indicates whether | ||
it is ready to be used (True) or needs unpacking (False) | ||
""" | ||
if db_url_tuple is None: | ||
db_url_tuple = self.get_download_url_from_environment() | ||
self._download_data(db_url_tuple) | ||
|
||
def sort_replicas(self, replicas: List[str], location: Mapping[str, float]) -> List[str]: | ||
""" | ||
Main method of this class. | ||
replicas: list of strings which are the URLs for the replicas for a file | ||
location: dict of the form {'latitude': xxx, 'longitude': yyy} where xxx and yyy are floats | ||
giving the latitude and longitude in signed degrees | ||
""" | ||
if not self._database: | ||
return replicas | ||
if len(replicas) == 1: | ||
return replicas | ||
fqdns = [(urlparse(replica).hostname, replica) for replica in replicas] | ||
distances = [Replica_distance(replica=replica, | ||
distance=_get_distance(self._database, fqdn, | ||
location['latitude'], | ||
location['longitude'] | ||
) | ||
) | ||
for fqdn, replica in fqdns] | ||
distances.sort(key=lambda x: x.distance) | ||
return [_.replica for _ in distances] | ||
|
||
@classmethod | ||
def get_download_url_from_key_and_edition(cls, license_key: str, edition: str): | ||
""" | ||
Construct the (url, unpacked) tuple to feed to the constructor from a license key | ||
and an edition of the MaxMind database. | ||
""" | ||
return (('https://download.maxmind.com/app/geoip_download?' | ||
f'edition_id={edition}&license_key={license_key}&suffix=tar.gz'), | ||
False) | ||
|
||
@classmethod | ||
def get_download_url_from_environment(cls) -> Optional[Tuple[str, bool]]: | ||
""" | ||
Based on environment variables, this will give a tuple of the URL and a bool which is | ||
True if the file from the URL is ready to use as is, False if needs to be unpacked | ||
""" | ||
if url := os.environ.get('GEOIP_DB_URL', ''): | ||
return (url, True) | ||
key = os.environ.get('GEOIP_DB_LICENSE_KEY', '') | ||
edition = os.environ.get('GEOIP_DB_EDITION', '') | ||
if (key and edition): | ||
return cls.get_download_url_from_key_and_edition(key, edition) | ||
else: | ||
return None | ||
|
||
def _download_data(self, db_url_tuple: Optional[Tuple[str, bool]]) -> None: | ||
""" | ||
Retrieves and unpacks the MaxMind databases and initializes the GeoIP reader | ||
""" | ||
from urllib.request import urlretrieve | ||
import tarfile | ||
import glob | ||
if db_url_tuple is None: | ||
return | ||
url, unpacked = db_url_tuple | ||
try: | ||
fname, _ = urlretrieve(url) | ||
except Exception as e: | ||
logger.error(f'Failure retrieving GeoIP database {url}.\nError: {e}') | ||
return | ||
try: | ||
if unpacked: | ||
self._database = geoip2.database.Reader(fname) | ||
else: | ||
tarball = tarfile.open(fname) | ||
self._tmpdir = tempfile.TemporaryDirectory() | ||
tarball.extractall(self._tmpdir.name) | ||
self._database = geoip2.database.Reader(glob.glob(os.path.join(self._tmpdir.name, | ||
'*/*mmdb') | ||
)[0]) | ||
except Exception as e: | ||
logger.error(f'Failure initializing the GeoIP database reader.\nError: {e}') | ||
self._database = None | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# Copyright (c) 2024, IRIS-HEP | ||
# All rights reserved. | ||
# | ||
# Redistribution and use in source and binary forms, with or without | ||
# modification, are permitted provided that the following conditions are met: | ||
# | ||
# * Redistributions of source code must retain the above copyright notice, this | ||
# list of conditions and the following disclaimer. | ||
# | ||
# * Redistributions in binary form must reproduce the above copyright notice, | ||
# this list of conditions and the following disclaimer in the documentation | ||
# and/or other materials provided with the distribution. | ||
# | ||
# * Neither the name of the copyright holder nor the names of its | ||
# contributors may be used to endorse or promote products derived from | ||
# this software without specific prior written permission. | ||
# | ||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | ||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | ||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
|
||
GEOIP_URL = 'https://ponyisi.web.cern.ch/public/GeoLite2-City.mmdb' | ||
GEOIP_TGZ_URL = 'https://ponyisi.web.cern.ch/public/GeoLite2-City_20241015.tar.gz' | ||
|
||
# some URLs (real FQDNs, not real file paths) | ||
REPLICAS = ['https://ccxrootdatlas.in2p3.fr:1094//pnfs/DAOD_PHYSLITE.37020764._000004.pool.root.1', | ||
'root://fax.mwt2.org:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1', | ||
'root://atlasdcache-kit.gridka.de:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1'] | ||
|
||
SORTED_REPLICAS = ['root://fax.mwt2.org:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1', | ||
'https://ccxrootdatlas.in2p3.fr:1094//pnfs/DAOD_PHYSLITE.37020764._000004.pool.root.1', # noqa: E501 | ||
'root://atlasdcache-kit.gridka.de:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1'] # noqa: E501 | ||
|
||
JUNK_REPLICAS = ['https://junk.does.not.exist.org/', | ||
'root://fax.mwt2.org:1094//pnfs/uchicago.edu/'] | ||
SORTED_JUNK_REPLICAS = ['root://fax.mwt2.org:1094//pnfs/uchicago.edu/', | ||
'https://junk.does.not.exist.org/'] | ||
|
||
LOCATION = {'latitude': 41.78, 'longitude': -87.7} | ||
|
||
|
||
def test_sorting(): | ||
"""Also test unpacking tgz database""" | ||
from rucio_did_finder.replica_distance import ReplicaSorter | ||
rs = ReplicaSorter((GEOIP_TGZ_URL, False)) | ||
# Given location (Chicago) replicas should sort US, FR, DE | ||
sorted = rs.sort_replicas(REPLICAS, LOCATION) | ||
assert sorted == SORTED_REPLICAS | ||
# the nonexistent FQDN should sort at end | ||
sorted = rs.sort_replicas(JUNK_REPLICAS, LOCATION) | ||
assert sorted == SORTED_JUNK_REPLICAS | ||
|
||
|
||
def test_envvars(): | ||
"""Only tests unpacked DB download""" | ||
from rucio_did_finder.replica_distance import ReplicaSorter | ||
import os | ||
os.environ['GEOIP_DB_URL'] = GEOIP_URL | ||
rs = ReplicaSorter() | ||
sorted = rs.sort_replicas(REPLICAS, LOCATION) | ||
assert sorted == SORTED_REPLICAS | ||
del os.environ['GEOIP_DB_URL'] | ||
|
||
|
||
def test_bad_geodb(): | ||
"""Tests what happens when we have a bad DB URL""" | ||
from rucio_did_finder.replica_distance import ReplicaSorter | ||
rs = ReplicaSorter(('https://junk.does.not.exist.org', False)) | ||
assert rs._database is None | ||
sorted = rs.sort_replicas(REPLICAS, LOCATION) | ||
assert sorted == REPLICAS |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.