Skip to content

Commit

Permalink
wip: add wags tails
Browse files Browse the repository at this point in the history
  • Loading branch information
jsstevenson committed Oct 26, 2023
1 parent eec896b commit d5794d6
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 545 deletions.
2 changes: 0 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ disease-normalizer = ">=0.3.1"
owlready2 = "*"
rdflib = "*"
wikibaseintegrator = ">=0.12.0"
chembl-downloader = "*"
bioversions = ">=0.4.3"
ipykernel = "*"
pytest = "*"
pytest-cov = "*"
Expand Down
4 changes: 1 addition & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ install_requires =
click
uvicorn
boto3
ga4gh.vrsatile.pydantic >= 0.1.dev3
ga4gh.vrsatile.pydantic ~= 0.1.dev3

[options.package_data]
therapy =
Expand All @@ -36,8 +36,6 @@ dev =
owlready2
rdflib
wikibaseintegrator >= 0.12.0
chembl-downloader
bioversions >= 0.4.3
ipykernel
ipython >= 8.10
pre-commit
Expand Down
181 changes: 19 additions & 162 deletions therapy/etl/base.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
"""A base class for extraction, transformation, and loading of data."""
from abc import ABC, abstractmethod
import ftplib
from pathlib import Path
import logging
from typing import List, Dict, Optional, Callable
import os
import zipfile
import tempfile
import re
from typing import List, Dict, Optional, Type
import json
from functools import lru_cache

from pydantic import ValidationError
import requests
import bioversions
from disease.query import QueryHandler as DiseaseNormalizer
from wags_tails.base_source import DataSource

from therapy import APP_ROOT, ITEM_TYPES, DownloadException
from therapy import ITEM_TYPES
from therapy.schemas import Drug, SourceName
from therapy.database import Database
from therapy.etl.rules import Rules
Expand All @@ -25,8 +19,6 @@
logger = logging.getLogger("therapy")
logger.setLevel(logging.DEBUG)

DEFAULT_DATA_PATH: Path = APP_ROOT / "data"


class Base(ABC):
"""The ETL base class.
Expand All @@ -38,15 +30,17 @@ class Base(ABC):
needed.
"""

def __init__(self, database: Database, data_path: Path = DEFAULT_DATA_PATH) -> None:
_DataSourceClass: Type[DataSource]

def __init__(self, database: Database, data_path: Optional[Path] = None) -> None:
"""Extract from sources.
:param Database database: application database object
:param Path data_path: path to app data directory
:param database: application database object
:param data_path: preferred Thera-Py data directory location, if necessary
"""
self._therapy_data_dir = data_path
self._name = self.__class__.__name__
self.database = database
self._src_dir: Path = Path(data_path / self._name.lower())
self._added_ids: List[str] = []
self._rules = Rules(SourceName(self._name))

Expand All @@ -55,7 +49,7 @@ def perform_etl(self, use_existing: bool = False) -> List[str]:
Returned concept IDs can be passed to Merge method for computing
merged concepts.
:param bool use_existing: if True, don't try to retrieve latest source data
:param use_existing: if True, don't try to retrieve latest source data
:return: list of concept IDs which were successfully processed and
uploaded.
"""
Expand All @@ -64,153 +58,16 @@ def perform_etl(self, use_existing: bool = False) -> List[str]:
self._transform_data()
return self._added_ids

def get_latest_version(self) -> str:
"""Get most recent version of source data. Should be overriden by
sources not added to Bioversions yet, or other special-case sources.
:return: most recent version, as a str
"""
return bioversions.get_version(self.__class__.__name__)
def _extract_data(self, use_existing: bool) -> None:
"""Acquire source data.
@abstractmethod
def _download_data(self) -> None:
"""Acquire source data and deposit in a usable form with correct file
naming conventions (generally, `<source>_<version>.<filetype>`, or
`<source>_<subset>_<version>.<filetype>` if sources require multiple
files). Shouldn't set any instance attributes.
"""
raise NotImplementedError

def _zip_handler(self, dl_path: Path, outfile_path: Path) -> None:
"""Provide simple callback function to extract the largest file within a given
zipfile and save it within the appropriate data directory.
:param Path dl_path: path to temp data file
:param Path outfile_path: path to save file within
"""
with zipfile.ZipFile(dl_path, "r") as zip_ref:
if len(zip_ref.filelist) > 1:
files = sorted(
zip_ref.filelist, key=lambda z: z.file_size, reverse=True
)
target = files[0]
else:
target = zip_ref.filelist[0]
target.filename = outfile_path.name
zip_ref.extract(target, path=outfile_path.parent)
os.remove(dl_path)

@staticmethod
def _http_download(
url: str,
outfile_path: Path,
headers: Optional[Dict] = None,
handler: Optional[Callable[[Path, Path], None]] = None,
) -> None:
"""Perform HTTP download of remote data file.
:param str url: URL to retrieve file from
:param Path outfile_path: path to where file should be saved. Must be an actual
Path instance rather than merely a pathlike string.
:param Optional[Dict] headers: Any needed HTTP headers to include in request
:param Optional[Callable[[Path, Path], None]] handler: provide if downloaded
file requires additional action, e.g. it's a zip file.
"""
if handler:
dl_path = Path(tempfile.gettempdir()) / "therapy_dl_tmp"
else:
dl_path = outfile_path
# use stream to avoid saving download completely to memory
with requests.get(url, stream=True, headers=headers) as r:
try:
r.raise_for_status()
except requests.HTTPError:
raise DownloadException(
f"Failed to download {outfile_path.name} from {url}."
)
with open(dl_path, "wb") as h:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
h.write(chunk)
if handler:
handler(dl_path, outfile_path)

def _ftp_download(self, host: str, host_dir: str, host_fn: str) -> None:
"""Download data file from FTP site.
:param str host: Source's FTP host name
:param str host_dir: Data directory located on FTP site
:param str host_fn: Filename on FTP site to be downloaded
"""
try:
with ftplib.FTP(host) as ftp:
ftp.login()
logger.debug(f"FTP login to {host} was successful")
ftp.cwd(host_dir)
with open(self._src_dir / host_fn, "wb") as fp:
ftp.retrbinary(f"RETR {host_fn}", fp.write)
except ftplib.all_errors as e:
logger.error(f"FTP download failed: {e}")
raise Exception(e)

def _parse_version(
self, file_path: Path, pattern: Optional[re.Pattern] = None
) -> str:
"""Get version number from provided file path.
:param Path file_path: path to located source data file
:param Optional[re.Pattern] pattern: regex pattern to use
:return: source data version
:raises: FileNotFoundError if version parsing fails
"""
if pattern is None:
pattern = re.compile(type(self).__name__.lower() + r"_(.+)\..+")
matches = re.match(pattern, file_path.name)
if matches is None:
raise FileNotFoundError
else:
return matches.groups()[0]

def _get_existing_files(self) -> List[Path]:
"""Get existing source files from data directory.
:return: sorted list of file objects
"""
return list(sorted(self._src_dir.glob(f"{self._name.lower()}_*.*")))

def _extract_data(self, use_existing: bool = False) -> None:
"""Get source file from data directory.
This method should ensure the source data directory exists, acquire source data,
set the source version value, and assign the source file location to
`self._src_file`. Child classes needing additional functionality (like setting
up a DB cursor, or managing multiple source files) will need to reimplement
this method. If `use_existing` is True, the version number will be parsed from
the existing filename; otherwise, it will be retrieved from the data source,
and if the local file is out-of-date, the newest version will be acquired.
This method is responsible for initializing an instance of
``self._DataSourceClass``, and, in most cases, setting ``self._src_file``.
:param bool use_existing: if True, don't try to fetch latest source data
"""
self._src_dir.mkdir(exist_ok=True, parents=True)
src_name = type(self).__name__.lower()
if use_existing:
files = self._get_existing_files()
if len(files) < 1:
raise FileNotFoundError(f"No source data found for {src_name}")
self._src_file: Path = files[-1]
try:
self._version = self._parse_version(self._src_file)
except FileNotFoundError:
raise FileNotFoundError(
f"Unable to parse version value from {src_name} source data file "
f"located at {self._src_file.absolute().as_uri()} -- "
"check filename against schema defined in README: "
"https://github.com/cancervariants/therapy-normalization#update-sources" # noqa: E501
)
else:
self._version = self.get_latest_version()
fglob = f"{src_name}_{self._version}.*"
latest = list(self._src_dir.glob(fglob))
if not latest:
self._download_data()
latest = list(self._src_dir.glob(fglob))
assert len(latest) != 0 # probably unnecessary, but just to be safe
self._src_file = latest[0]
data_source = self._DataSourceClass(data_dir=self._therapy_data_dir)
self._src_file, self._version = data_source.get_latest(from_local=use_existing)

@abstractmethod
def _load_meta(self) -> None:
Expand Down Expand Up @@ -311,11 +168,11 @@ def _load_therapy(self, therapy: Dict) -> None:
class DiseaseIndicationBase(Base):
"""Base class for sources that require disease normalization capabilities."""

def __init__(self, database: Database, data_path: Path = DEFAULT_DATA_PATH):
def __init__(self, database: Database, data_path: Optional[Path] = None):
"""Initialize source ETL instance.
:param therapy.database.Database database: application database
:param Path data_path: path to normalizer data directory
:param database: application database
:param data_path: preferred Thera-Py data directory location, if necessary
"""
super().__init__(database, data_path)
self.disease_normalizer = DiseaseNormalizer(self.database.endpoint_url)
Expand Down
29 changes: 7 additions & 22 deletions therapy/etl/chembl.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
"""This module defines the ChEMBL ETL methods."""
import logging
import os
import shutil
import sqlite3
from typing import Optional, List, Dict

import chembl_downloader
from wags_tails import ChemblData
import bioversions

from therapy.etl.base import DiseaseIndicationBase
Expand All @@ -20,25 +18,7 @@
class ChEMBL(DiseaseIndicationBase):
"""Class for ChEMBL ETL methods."""

def _download_data(self) -> None:
"""Download latest ChEMBL database file from EBI."""
logger.info("Retrieving source data for ChEMBL")
os.environ["PYSTOW_HOME"] = str(self._src_dir.parent.absolute())
tmp_path = chembl_downloader.download_extract_sqlite()
shutil.move(tmp_path, self._src_dir)
shutil.rmtree(tmp_path.parent.parent.parent)
logger.info("Successfully retrieved source data for ChEMBL")

def _extract_data(self, use_existing: bool = False) -> None:
"""Extract data from the ChEMBL source.
:param bool use_existing: if True, don't try to fetch latest source data
"""
super()._extract_data(use_existing)
conn = sqlite3.connect(self._src_file)
conn.row_factory = sqlite3.Row
self._conn = conn
self._cursor = conn.cursor()
_DataSourceClass = ChemblData

@staticmethod
def _unwrap_group_concat(value: Optional[str]) -> List[str]:
Expand Down Expand Up @@ -108,6 +88,11 @@ def _get_indications(self, value: Optional[str]) -> List[Dict]:

def _transform_data(self) -> None:
"""Transform SQLite data and load to DB."""
conn = sqlite3.connect(self._src_file)
conn.row_factory = sqlite3.Row
self._conn = conn
self._cursor = conn.cursor()

query = """
SELECT
md.chembl_id,
Expand Down
23 changes: 3 additions & 20 deletions therapy/etl/chemidplus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
Courtesy of the U.S. National Library of Medicine.
"""
import logging
import zipfile
from os import remove
from shutil import move
from typing import Generator
from pathlib import Path
import xml.etree.ElementTree as ET
import re

from wags_tails import ChemIDplusData

from therapy.etl.base import Base
from therapy.schemas import NamespacePrefix, SourceMeta, SourceName, \
DataLicenseAttributes, RecordParams
Expand All @@ -26,23 +25,7 @@
class ChemIDplus(Base):
"""Class for ChemIDplus ETL methods."""

def _download_data(self) -> None:
"""Download source data from default location."""
logger.info("Retrieving source data for ChemIDplus")
file = "currentchemid.zip"
self._ftp_download("ftp.nlm.nih.gov", "nlmdata/.chemidlease", file)
zip_path = (self._src_dir / file).absolute()
zip_file = zipfile.ZipFile(zip_path, "r")
outfile = self._src_dir / f"chemidplus_{self._version}.xml"
for info in zip_file.infolist():
if re.match(r".*\.xml", info.filename):
xml_filename = info.filename
zip_file.extract(info, path=self._src_dir)
move(str(self._src_dir / xml_filename), outfile)
break
remove(zip_path)
assert outfile.exists()
logger.info("Successfully retrieved source data for ChemIDplus")
_DataSourceClass = ChemIDplusData

@staticmethod
def parse_xml(path: Path, tag: str) -> Generator:
Expand Down
10 changes: 3 additions & 7 deletions therapy/etl/drugbank.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import csv

from wags_tails import DrugBankData

from therapy.schemas import SourceName, SourceMeta, NamespacePrefix
from therapy.etl.base import Base

Expand All @@ -13,13 +15,7 @@
class DrugBank(Base):
"""Class for DrugBank ETL methods."""

def _download_data(self) -> None:
"""Download DrugBank source data."""
logger.info("Retrieving source data for DrugBank")
url = f"https://go.drugbank.com/releases/{self._version.replace('.', '-')}/downloads/all-drugbank-vocabulary" # noqa: E501
csv_file = self._src_dir / f"drugbank_{self._version}.csv"
self._http_download(url, csv_file, handler=self._zip_handler)
logger.info("Successfully retrieved source data for DrugBank")
_DataSourceClass = DrugBankData

def _load_meta(self) -> None:
"""Add DrugBank metadata."""
Expand Down
Loading

0 comments on commit d5794d6

Please sign in to comment.