diff --git a/Pipfile b/Pipfile index 5b4dcda1..85cb7c43 100644 --- a/Pipfile +++ b/Pipfile @@ -17,8 +17,6 @@ disease-normalizer = ">=0.3.1" owlready2 = "*" rdflib = "*" wikibaseintegrator = ">=0.12.0" -chembl-downloader = "*" -bioversions = ">=0.4.3" ipykernel = "*" pytest = "*" pytest-cov = "*" diff --git a/setup.cfg b/setup.cfg index 6ce846c5..9a89f254 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,7 +20,7 @@ install_requires = click uvicorn boto3 - ga4gh.vrsatile.pydantic >= 0.1.dev3 + ga4gh.vrsatile.pydantic ~= 0.1.dev3 [options.package_data] therapy = @@ -36,8 +36,6 @@ dev = owlready2 rdflib wikibaseintegrator >= 0.12.0 - chembl-downloader - bioversions >= 0.4.3 ipykernel ipython >= 8.10 pre-commit diff --git a/therapy/etl/base.py b/therapy/etl/base.py index 09c94c01..178dd37a 100644 --- a/therapy/etl/base.py +++ b/therapy/etl/base.py @@ -1,22 +1,16 @@ """A base class for extraction, transformation, and loading of data.""" from abc import ABC, abstractmethod -import ftplib from pathlib import Path import logging -from typing import List, Dict, Optional, Callable -import os -import zipfile -import tempfile -import re +from typing import List, Dict, Optional, Type import json from functools import lru_cache from pydantic import ValidationError -import requests -import bioversions from disease.query import QueryHandler as DiseaseNormalizer +from wags_tails.base_source import DataSource -from therapy import APP_ROOT, ITEM_TYPES, DownloadException +from therapy import ITEM_TYPES from therapy.schemas import Drug, SourceName from therapy.database import Database from therapy.etl.rules import Rules @@ -25,8 +19,6 @@ logger = logging.getLogger("therapy") logger.setLevel(logging.DEBUG) -DEFAULT_DATA_PATH: Path = APP_ROOT / "data" - class Base(ABC): """The ETL base class. @@ -38,15 +30,17 @@ class Base(ABC): needed. """ - def __init__(self, database: Database, data_path: Path = DEFAULT_DATA_PATH) -> None: + _DataSourceClass: Type[DataSource] + + def __init__(self, database: Database, data_path: Optional[Path] = None) -> None: """Extract from sources. - :param Database database: application database object - :param Path data_path: path to app data directory + :param database: application database object + :param data_path: preferred Thera-Py data directory location, if necessary """ + self._therapy_data_dir = data_path self._name = self.__class__.__name__ self.database = database - self._src_dir: Path = Path(data_path / self._name.lower()) self._added_ids: List[str] = [] self._rules = Rules(SourceName(self._name)) @@ -55,7 +49,7 @@ def perform_etl(self, use_existing: bool = False) -> List[str]: Returned concept IDs can be passed to Merge method for computing merged concepts. - :param bool use_existing: if True, don't try to retrieve latest source data + :param use_existing: if True, don't try to retrieve latest source data :return: list of concept IDs which were successfully processed and uploaded. """ @@ -64,153 +58,16 @@ def perform_etl(self, use_existing: bool = False) -> List[str]: self._transform_data() return self._added_ids - def get_latest_version(self) -> str: - """Get most recent version of source data. Should be overriden by - sources not added to Bioversions yet, or other special-case sources. - :return: most recent version, as a str - """ - return bioversions.get_version(self.__class__.__name__) + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. - @abstractmethod - def _download_data(self) -> None: - """Acquire source data and deposit in a usable form with correct file - naming conventions (generally, `_.`, or - `__.` if sources require multiple - files). Shouldn't set any instance attributes. - """ - raise NotImplementedError - - def _zip_handler(self, dl_path: Path, outfile_path: Path) -> None: - """Provide simple callback function to extract the largest file within a given - zipfile and save it within the appropriate data directory. - :param Path dl_path: path to temp data file - :param Path outfile_path: path to save file within - """ - with zipfile.ZipFile(dl_path, "r") as zip_ref: - if len(zip_ref.filelist) > 1: - files = sorted( - zip_ref.filelist, key=lambda z: z.file_size, reverse=True - ) - target = files[0] - else: - target = zip_ref.filelist[0] - target.filename = outfile_path.name - zip_ref.extract(target, path=outfile_path.parent) - os.remove(dl_path) - - @staticmethod - def _http_download( - url: str, - outfile_path: Path, - headers: Optional[Dict] = None, - handler: Optional[Callable[[Path, Path], None]] = None, - ) -> None: - """Perform HTTP download of remote data file. - :param str url: URL to retrieve file from - :param Path outfile_path: path to where file should be saved. Must be an actual - Path instance rather than merely a pathlike string. - :param Optional[Dict] headers: Any needed HTTP headers to include in request - :param Optional[Callable[[Path, Path], None]] handler: provide if downloaded - file requires additional action, e.g. it's a zip file. - """ - if handler: - dl_path = Path(tempfile.gettempdir()) / "therapy_dl_tmp" - else: - dl_path = outfile_path - # use stream to avoid saving download completely to memory - with requests.get(url, stream=True, headers=headers) as r: - try: - r.raise_for_status() - except requests.HTTPError: - raise DownloadException( - f"Failed to download {outfile_path.name} from {url}." - ) - with open(dl_path, "wb") as h: - for chunk in r.iter_content(chunk_size=8192): - if chunk: - h.write(chunk) - if handler: - handler(dl_path, outfile_path) - - def _ftp_download(self, host: str, host_dir: str, host_fn: str) -> None: - """Download data file from FTP site. - :param str host: Source's FTP host name - :param str host_dir: Data directory located on FTP site - :param str host_fn: Filename on FTP site to be downloaded - """ - try: - with ftplib.FTP(host) as ftp: - ftp.login() - logger.debug(f"FTP login to {host} was successful") - ftp.cwd(host_dir) - with open(self._src_dir / host_fn, "wb") as fp: - ftp.retrbinary(f"RETR {host_fn}", fp.write) - except ftplib.all_errors as e: - logger.error(f"FTP download failed: {e}") - raise Exception(e) - - def _parse_version( - self, file_path: Path, pattern: Optional[re.Pattern] = None - ) -> str: - """Get version number from provided file path. - - :param Path file_path: path to located source data file - :param Optional[re.Pattern] pattern: regex pattern to use - :return: source data version - :raises: FileNotFoundError if version parsing fails - """ - if pattern is None: - pattern = re.compile(type(self).__name__.lower() + r"_(.+)\..+") - matches = re.match(pattern, file_path.name) - if matches is None: - raise FileNotFoundError - else: - return matches.groups()[0] - - def _get_existing_files(self) -> List[Path]: - """Get existing source files from data directory. - :return: sorted list of file objects - """ - return list(sorted(self._src_dir.glob(f"{self._name.lower()}_*.*"))) - - def _extract_data(self, use_existing: bool = False) -> None: - """Get source file from data directory. - - This method should ensure the source data directory exists, acquire source data, - set the source version value, and assign the source file location to - `self._src_file`. Child classes needing additional functionality (like setting - up a DB cursor, or managing multiple source files) will need to reimplement - this method. If `use_existing` is True, the version number will be parsed from - the existing filename; otherwise, it will be retrieved from the data source, - and if the local file is out-of-date, the newest version will be acquired. + This method is responsible for initializing an instance of + ``self._DataSourceClass``, and, in most cases, setting ``self._src_file``. :param bool use_existing: if True, don't try to fetch latest source data """ - self._src_dir.mkdir(exist_ok=True, parents=True) - src_name = type(self).__name__.lower() - if use_existing: - files = self._get_existing_files() - if len(files) < 1: - raise FileNotFoundError(f"No source data found for {src_name}") - self._src_file: Path = files[-1] - try: - self._version = self._parse_version(self._src_file) - except FileNotFoundError: - raise FileNotFoundError( - f"Unable to parse version value from {src_name} source data file " - f"located at {self._src_file.absolute().as_uri()} -- " - "check filename against schema defined in README: " - "https://github.com/cancervariants/therapy-normalization#update-sources" # noqa: E501 - ) - else: - self._version = self.get_latest_version() - fglob = f"{src_name}_{self._version}.*" - latest = list(self._src_dir.glob(fglob)) - if not latest: - self._download_data() - latest = list(self._src_dir.glob(fglob)) - assert len(latest) != 0 # probably unnecessary, but just to be safe - self._src_file = latest[0] + data_source = self._DataSourceClass(data_dir=self._therapy_data_dir) + self._src_file, self._version = data_source.get_latest(from_local=use_existing) @abstractmethod def _load_meta(self) -> None: @@ -311,11 +168,11 @@ def _load_therapy(self, therapy: Dict) -> None: class DiseaseIndicationBase(Base): """Base class for sources that require disease normalization capabilities.""" - def __init__(self, database: Database, data_path: Path = DEFAULT_DATA_PATH): + def __init__(self, database: Database, data_path: Optional[Path] = None): """Initialize source ETL instance. - :param therapy.database.Database database: application database - :param Path data_path: path to normalizer data directory + :param database: application database + :param data_path: preferred Thera-Py data directory location, if necessary """ super().__init__(database, data_path) self.disease_normalizer = DiseaseNormalizer(self.database.endpoint_url) diff --git a/therapy/etl/chembl.py b/therapy/etl/chembl.py index c3fe4322..0bac515a 100644 --- a/therapy/etl/chembl.py +++ b/therapy/etl/chembl.py @@ -1,11 +1,9 @@ """This module defines the ChEMBL ETL methods.""" import logging -import os -import shutil import sqlite3 from typing import Optional, List, Dict -import chembl_downloader +from wags_tails import ChemblData import bioversions from therapy.etl.base import DiseaseIndicationBase @@ -20,25 +18,7 @@ class ChEMBL(DiseaseIndicationBase): """Class for ChEMBL ETL methods.""" - def _download_data(self) -> None: - """Download latest ChEMBL database file from EBI.""" - logger.info("Retrieving source data for ChEMBL") - os.environ["PYSTOW_HOME"] = str(self._src_dir.parent.absolute()) - tmp_path = chembl_downloader.download_extract_sqlite() - shutil.move(tmp_path, self._src_dir) - shutil.rmtree(tmp_path.parent.parent.parent) - logger.info("Successfully retrieved source data for ChEMBL") - - def _extract_data(self, use_existing: bool = False) -> None: - """Extract data from the ChEMBL source. - - :param bool use_existing: if True, don't try to fetch latest source data - """ - super()._extract_data(use_existing) - conn = sqlite3.connect(self._src_file) - conn.row_factory = sqlite3.Row - self._conn = conn - self._cursor = conn.cursor() + _DataSourceClass = ChemblData @staticmethod def _unwrap_group_concat(value: Optional[str]) -> List[str]: @@ -108,6 +88,11 @@ def _get_indications(self, value: Optional[str]) -> List[Dict]: def _transform_data(self) -> None: """Transform SQLite data and load to DB.""" + conn = sqlite3.connect(self._src_file) + conn.row_factory = sqlite3.Row + self._conn = conn + self._cursor = conn.cursor() + query = """ SELECT md.chembl_id, diff --git a/therapy/etl/chemidplus.py b/therapy/etl/chemidplus.py index 2e534489..c4462b0c 100644 --- a/therapy/etl/chemidplus.py +++ b/therapy/etl/chemidplus.py @@ -3,14 +3,13 @@ Courtesy of the U.S. National Library of Medicine. """ import logging -import zipfile -from os import remove -from shutil import move from typing import Generator from pathlib import Path import xml.etree.ElementTree as ET import re +from wags_tails import ChemIDplusData + from therapy.etl.base import Base from therapy.schemas import NamespacePrefix, SourceMeta, SourceName, \ DataLicenseAttributes, RecordParams @@ -26,23 +25,7 @@ class ChemIDplus(Base): """Class for ChemIDplus ETL methods.""" - def _download_data(self) -> None: - """Download source data from default location.""" - logger.info("Retrieving source data for ChemIDplus") - file = "currentchemid.zip" - self._ftp_download("ftp.nlm.nih.gov", "nlmdata/.chemidlease", file) - zip_path = (self._src_dir / file).absolute() - zip_file = zipfile.ZipFile(zip_path, "r") - outfile = self._src_dir / f"chemidplus_{self._version}.xml" - for info in zip_file.infolist(): - if re.match(r".*\.xml", info.filename): - xml_filename = info.filename - zip_file.extract(info, path=self._src_dir) - move(str(self._src_dir / xml_filename), outfile) - break - remove(zip_path) - assert outfile.exists() - logger.info("Successfully retrieved source data for ChemIDplus") + _DataSourceClass = ChemIDplusData @staticmethod def parse_xml(path: Path, tag: str) -> Generator: diff --git a/therapy/etl/drugbank.py b/therapy/etl/drugbank.py index accadc98..a57559e7 100644 --- a/therapy/etl/drugbank.py +++ b/therapy/etl/drugbank.py @@ -3,6 +3,8 @@ import logging import csv +from wags_tails import DrugBankData + from therapy.schemas import SourceName, SourceMeta, NamespacePrefix from therapy.etl.base import Base @@ -13,13 +15,7 @@ class DrugBank(Base): """Class for DrugBank ETL methods.""" - def _download_data(self) -> None: - """Download DrugBank source data.""" - logger.info("Retrieving source data for DrugBank") - url = f"https://go.drugbank.com/releases/{self._version.replace('.', '-')}/downloads/all-drugbank-vocabulary" # noqa: E501 - csv_file = self._src_dir / f"drugbank_{self._version}.csv" - self._http_download(url, csv_file, handler=self._zip_handler) - logger.info("Successfully retrieved source data for DrugBank") + _DataSourceClass = DrugBankData def _load_meta(self) -> None: """Add DrugBank metadata.""" diff --git a/therapy/etl/drugsatfda.py b/therapy/etl/drugsatfda.py index b54bde1e..09fd7c33 100644 --- a/therapy/etl/drugsatfda.py +++ b/therapy/etl/drugsatfda.py @@ -2,9 +2,9 @@ from typing import List, Optional import json -import requests +from wags_tails import DrugsAtFdaData -from therapy import DownloadException, logger +from therapy import logger from therapy.schemas import SourceMeta, SourceName, NamespacePrefix, ApprovalRating, \ RecordParams from therapy.etl.base import Base @@ -13,32 +13,7 @@ class DrugsAtFDA(Base): """Class for Drugs@FDA ETL methods.""" - def _download_data(self) -> None: - """Download source data from instance-provided source URL.""" - logger.info("Retrieving source data for Drugs@FDA") - url = "https://download.open.fda.gov/drug/drugsfda/drug-drugsfda-0001-of-0001.json.zip" # noqa: E501 - outfile_path = self._src_dir / f"drugsatfda_{self._version}.json" - self._http_download(url, outfile_path, handler=self._zip_handler) - logger.info("Successfully retrieved source data for Drugs@FDA") - - def get_latest_version(self) -> str: - """Retrieve latest version of source data. - :return: version as a str -- expected formatting is YYYY-MM-DD - """ - r = requests.get("https://api.fda.gov/download.json") - if r.status_code == 200: - r_json = r.json() - try: - date = r_json["results"]["drug"]["drugsfda"]["export_date"] - except KeyError: - msg = "Unable to parse OpenFDA version API - check for breaking changes" - logger.error(msg) - raise DownloadException(msg) - return date - else: - raise requests.HTTPError( - "Unable to retrieve version from FDA API", response=requests.Response() - ) + _DataSourceClass = DrugsAtFdaData def _load_meta(self) -> None: """Add Drugs@FDA metadata.""" diff --git a/therapy/etl/guidetopharmacology.py b/therapy/etl/guidetopharmacology.py index 054d217c..b7db90c4 100644 --- a/therapy/etl/guidetopharmacology.py +++ b/therapy/etl/guidetopharmacology.py @@ -2,10 +2,9 @@ from typing import Optional, Dict, Any, List, Union import csv import html -from pathlib import Path import re -import requests +from wags_tails.guide_to_pharmacology import GToPLigandData, GtoPLigandPaths from therapy import logger from therapy.schemas import SourceMeta, SourceName, NamespacePrefix, ApprovalRating @@ -19,81 +18,21 @@ class GuideToPHARMACOLOGY(Base): """Class for Guide to PHARMACOLOGY ETL methods.""" - def _download_data(self) -> None: - """Download the latest version of Guide to PHARMACOLOGY.""" - logger.info("Retrieving source data for Guide to PHARMACOLOGY") - if not self._ligands_file.exists(): - self._http_download("https://www.guidetopharmacology.org/DATA/ligands.tsv", - self._ligands_file) - assert self._ligands_file.exists() - if not self._mapping_file.exists(): - self._http_download("https://www.guidetopharmacology.org/DATA/ligand_id_mapping.tsv", # noqa: E501 - self._mapping_file) - assert self._mapping_file.exists() - logger.info("Successfully retrieved source data for Guide to PHARMACOLOGY") + _DataSourceClass = GToPLigandData - def _download_file(self, file_url: str, fn: str) -> None: - """Download individual data file. + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. - :param str file_url: Data url for file - :param str fn: File name - """ - r = requests.get(file_url) - if r.status_code == 200: - prefix = SourceName.GUIDETOPHARMACOLOGY.value.lower() - path = self._src_dir / f"{prefix}_{fn}_{self._version}.tsv" - if fn == "ligands": - self._ligands_file: Path = path - else: - self._mapping_file: Path = path - with open(str(path), "wb") as f: - f.write(r.content) - - def _extract_data(self, use_existing: bool = False) -> None: - """Gather GtoPdb source files. + This method is responsible for initializing an instance of + ``self._DataSourceClass``, and, in most cases, setting ``self._src_file``. :param bool use_existing: if True, don't try to fetch latest source data """ - self._src_dir.mkdir(exist_ok=True, parents=True) - prefix = SourceName.GUIDETOPHARMACOLOGY.value.lower() - if use_existing: - ligands_files = list(sorted(self._src_dir.glob(f"{prefix}_ligands_*.tsv"))) - if len(ligands_files) < 1: - raise FileNotFoundError("No GtoPdb ligands files found") - - for ligands_file in ligands_files[::-1]: - try: - version = self._parse_version( - ligands_file, - re.compile(prefix + r"_ligands_(.+)\.tsv") - ) - except FileNotFoundError: - raise FileNotFoundError( - "Unable to parse GtoPdb version value from ligands file " - f"located at {ligands_file.absolute().as_uri()} -- check " - "filename against schema defined in README: " - "https://github.com/cancervariants/therapy-normalization#update-sources" # noqa: E501 - ) - check_mapping_file = self._src_dir / f"{prefix}_ligand_id_mapping_{version}.tsv" # noqa: E501 - if check_mapping_file.exists(): - self._version = version - self._ligands_file = ligands_file - self._mapping_file = check_mapping_file - break - if self._mapping_file is None: - raise FileNotFoundError( - "Unable to find complete GtoPdb data set with matching version " - "values. Check filenames against schema defined in README: " - "https://github.com/cancervariants/therapy-normalization#update-sources" # noqa: E501 - ) - else: - self._version = self.get_latest_version() - self._ligands_file = self._src_dir / f"{prefix}_ligands_{self._version}.tsv" - self._mapping_file = self._src_dir / f"{prefix}_ligand_id_mapping_{self._version}.tsv" # noqa: E501 - if not (self._ligands_file.exists() and self._mapping_file.exists()): - self._download_data() - assert self._ligands_file.exists() - assert self._mapping_file.exists() + data_source: GToPLigandData = self._DataSourceClass( + data_dir=self._therapy_data_dir + ) # type: ignore + src_files, self._version = data_source.get_latest(from_local=use_existing) + self._src_files: GtoPLigandPaths = src_files def _transform_data(self) -> None: """Transform Guide To PHARMACOLOGY data.""" @@ -117,7 +56,7 @@ def _transform_ligands(self, data: Dict) -> None: :param dict data: Transformed data """ - with open(self._ligands_file, "r") as f: + with open(self._src_files.ligands, "r") as f: rows = csv.reader(f, delimiter="\t") # check that file structure is the same @@ -200,7 +139,7 @@ def _transform_ligand_id_mappings(self, data: Dict) -> None: :param dict data: Transformed data """ - with open(self._mapping_file.absolute(), "r") as f: + with open(self._src_files.ligand_id_mapping.absolute(), "r") as f: rows = csv.reader(f, delimiter="\t") next(rows) if next(rows) != [ diff --git a/therapy/etl/hemonc.py b/therapy/etl/hemonc.py index 2d4b61b3..c274f1c9 100644 --- a/therapy/etl/hemonc.py +++ b/therapy/etl/hemonc.py @@ -1,16 +1,10 @@ """Provide ETL methods for HemOnc.org data.""" import logging -from pathlib import Path -from typing import Dict, Tuple, Optional +from typing import Dict, Tuple import csv -import os -import zipfile -import re -import requests -import isodate +from wags_tails.hemonc import HemOncData, HemOncPaths -from therapy import DownloadException from therapy.schemas import NamespacePrefix, SourceMeta, SourceName, RecordParams, \ ApprovalRating from therapy.etl.base import DiseaseIndicationBase @@ -23,120 +17,19 @@ class HemOnc(DiseaseIndicationBase): """Class for HemOnc.org ETL methods.""" - def get_latest_version(self) -> str: - """Retrieve latest version of source data. - :raise: Exception if retrieval is unsuccessful - """ - response = requests.get("https://dataverse.harvard.edu/api/datasets/export?persistentId=doi:10.7910/DVN/9CY9C6&exporter=dataverse_json") # noqa: E501 - try: - response.raise_for_status() - except requests.HTTPError as e: - logger.error("Unable to retrieve HemOnc version from Harvard Dataverse") - raise e - iso_datetime = isodate.parse_datetime( - response.json()["datasetVersion"]["releaseTime"] - ) - return iso_datetime.strftime(isodate.isostrf.DATE_EXT_COMPLETE) - - def _zip_handler(self, dl_path: Path, outfile_path: Path) -> None: - """Extract concepts, rels, and synonyms files from tmp zip file and save to - data directory. - :param Path dl_path: path to temp data zipfile - :param Path outfile_path: directory to save data within - """ - file_terms = ("concepts", "rels", "synonyms") - with zipfile.ZipFile(dl_path, "r") as zip_ref: - for file in zip_ref.filelist: - for term in file_terms: - if term in file.filename: - file.filename = f"hemonc_{term}_{self._version}.csv" - zip_ref.extract(file, outfile_path) + _DataSourceClass = HemOncData - os.remove(dl_path) - - def _download_data(self) -> None: - """Download HemOnc.org source data. Requires Harvard Dataverse API key to be set - as environment variable DATAVERSE_API_KEY. Instructions for generating an API - key are available here: https://guides.dataverse.org/en/latest/user/account.html - - :raises: DownloadException if API key environment variable isn't set - """ - api_key = os.environ.get("DATAVERSE_API_KEY") - if api_key is None: - raise DownloadException( - "Must provide Harvard Dataverse API key in environment variable " - "DATAVERSE_API_KEY. See " - "https://guides.dataverse.org/en/latest/user/account.html" - ) - url = "https://dataverse.harvard.edu//api/access/dataset/:persistentId/?persistentId=doi:10.7910/DVN/9CY9C6" # noqa: E501 - headers = {"X-Dataverse-key": api_key} - self._http_download(url, self._src_dir, headers, self._zip_handler) + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. - def _extract_data(self, use_existing: bool = False) -> None: - """Get source files from data directory. - - The following files are necessary for data processing: - hemonc_concepts_.csv - hemonc_rels_.csv - hemonc_synonyms_.csv - This method will attempt to retrieve their latest versions if they are - unavailable locally. + This method is responsible for initializing an instance of + ``self._DataSourceClass``, and, in most cases, setting ``self._src_file``. :param bool use_existing: if True, don't try to fetch latest source data """ - self._src_dir.mkdir(exist_ok=True, parents=True) - - if use_existing: - concepts = list(sorted(self._src_dir.glob("hemonc_concepts_*.csv"))) - if len(concepts) < 1: - raise FileNotFoundError("No HemOnc concepts file found") - - src_files: Optional[Tuple] = None - for concepts_file in concepts[::-1]: - try: - version = self._parse_version( - concepts_file, - re.compile(r"hemonc_concepts_(.+)\.csv") - ) - except FileNotFoundError: - raise FileNotFoundError( - f"Unable to parse HemOnc version value from concepts file " - f"located at {concepts_file.absolute().as_uri()} -- check " - "filename against schema defined in README: " - "https://github.com/cancervariants/therapy-normalization#update-sources" # noqa: E501 - ) - other_files = ( - self._src_dir / f"hemonc_rels_{version}.csv", - self._src_dir / f"hemonc_synonyms_{version}.csv" - ) - if other_files[0].exists() and other_files[1].exists(): - self._version = version - src_files = ( - concepts_file, - other_files[0], - other_files[1] - ) - break - if src_files is None: - raise FileNotFoundError( - "Unable to find complete HemOnc data set with matching version " - "values. Check filenames against schema defined in README: " - "https://github.com/cancervariants/therapy-normalization#update-sources" # noqa: E501 - ) - else: - self._src_files = src_files - else: - self._version = self.get_latest_version() - data_filenames = ( - self._src_dir / f"hemonc_concepts_{self._version}.csv", - self._src_dir / f"hemonc_rels_{self._version}.csv", - self._src_dir / f"hemonc_synonyms_{self._version}.csv" - ) - if not all((f.exists() for f in data_filenames)): - self._download_data() - self._src_files = data_filenames - for file in self._src_files: - assert file.exists() + data_source: HemOncData = self._DataSourceClass(data_dir=self._therapy_data_dir) # type: ignore # noqa: E501 + src_files, self._version = data_source.get_latest(from_local=use_existing) + self._src_files: HemOncPaths = src_files def _load_meta(self) -> None: """Add HemOnc metadata.""" @@ -164,7 +57,7 @@ def _get_concepts(self) -> Tuple[Dict, Dict, Dict]: brand_names: Dict[str, str] = {} # hemonc id -> brand name conditions: Dict[str, str] = {} # hemonc id -> condition name - concepts_file = open(self._src_files[0], "r") + concepts_file = open(self._src_files.concepts, "r") concepts_reader = csv.reader(concepts_file) next(concepts_reader) # skip header for row in concepts_reader: @@ -217,7 +110,7 @@ def _get_rels(self, therapies: Dict, brand_names: Dict, :param dict conditions: mapping from IDs to disease conditions :return: therapies dict updated with brand names and conditions """ - rels_file = open(self._src_files[1], "r") + rels_file = open(self._src_files.rels, "r") rels_reader = csv.reader(rels_file) next(rels_reader) # skip header @@ -282,7 +175,7 @@ def _get_synonyms(self, therapies: Dict) -> Dict: :param dict therapies: mapping of IDs to therapy objects :return: therapies dict with synonyms added as aliases """ - synonyms_file = open(self._src_files[2], "r") + synonyms_file = open(self._src_files.synonyms, "r") synonyms_reader = csv.reader(synonyms_file) next(synonyms_reader) for row in synonyms_reader: diff --git a/therapy/etl/ncit.py b/therapy/etl/ncit.py index 46fb4c56..4ee82811 100644 --- a/therapy/etl/ncit.py +++ b/therapy/etl/ncit.py @@ -2,11 +2,10 @@ import logging from typing import Set -import requests import owlready2 as owl from owlready2.entity import ThingClass +from wags_tails import NcitData -from therapy import DownloadException from therapy.schemas import SourceName, NamespacePrefix, SourceMeta, RecordParams from therapy.etl.base import Base @@ -23,41 +22,7 @@ class NCIt(Base): * NCIt classes that are subclasses of C1909 (Pharmacologic Substance) """ - def _download_data(self) -> None: - """Download NCI thesaurus source file. - The NCI directory structure can be a little tricky, so this method attempts to - retrieve a file matching the latest version number from both the subdirectory - root (where the current version is typically posted) as well as the year-by-year - archives if that fails. - """ - logger.info("Retrieving source data for NCIt") - base_url = "https://evs.nci.nih.gov/ftp1/NCI_Thesaurus" - # ping base NCIt directory - release_fname = f"Thesaurus_{self._version}.OWL.zip" - src_url = f"{base_url}/{release_fname}" - r_try = requests.get(src_url) - if r_try.status_code != 200: - # ping NCIt archive directories - archive_url = f"{base_url}/archive/{self._version}_Release/{release_fname}" - archive_try = requests.get(archive_url) - if archive_try.status_code != 200: - old_archive_url = f"{base_url}/archive/20{self._version[0:2]}/{self._version}_Release/{release_fname}" # noqa: E501 - old_archive_try = requests.get(old_archive_url) - if old_archive_try.status_code != 200: - msg = ( - f"NCIt download failed: tried {src_url}, {archive_url}, and " - f"{old_archive_url}" - ) - logger.error(msg) - raise DownloadException(msg) - else: - src_url = old_archive_url - else: - src_url = archive_url - - self._http_download(src_url, self._src_dir / f"ncit_{self._version}.owl", - handler=self._zip_handler) - logger.info("Successfully retrieved source data for NCIt") + _DataSourceClass = NcitData def _get_desc_nodes(self, node: ThingClass, uq_nodes: Set[ThingClass]) -> Set[ThingClass]: diff --git a/therapy/etl/rxnorm.py b/therapy/etl/rxnorm.py index 7497dd37..4abf0114 100644 --- a/therapy/etl/rxnorm.py +++ b/therapy/etl/rxnorm.py @@ -7,18 +7,16 @@ """ import csv import logging -import shutil -import zipfile +from pathlib import Path import re -from os import environ, remove from typing import List, Dict -from pathlib import Path +from wags_tails import CustomData, RxNormData import yaml import bioversions from boto3.dynamodb.table import BatchWriter -from therapy import DownloadException, XREF_SOURCES, ASSOC_WITH_SOURCES, ITEM_TYPES +from therapy import XREF_SOURCES, ASSOC_WITH_SOURCES, ITEM_TYPES from therapy.schemas import SourceName, NamespacePrefix, SourceMeta, ApprovalRating, \ RecordParams from therapy.etl.base import Base @@ -44,77 +42,43 @@ class RxNorm(Base): """Class for RxNorm ETL methods.""" - def _create_drug_form_yaml(self) -> None: + @staticmethod + def _create_drug_form_yaml(file: Path, version: str, rxnorm_file: Path) -> None: """Create a YAML file containing RxNorm drug form values.""" - self._drug_forms_file = self._src_dir / f"rxnorm_drug_forms_{self._version}.yaml" # noqa: E501 dfs = [] - with open(self._src_file) as f: # type: ignore + with open(rxnorm_file, "r") as f: data = csv.reader(f, delimiter="|") for row in data: if row[12] == "DF" and row[11] == "RXNORM": if row[14] not in dfs: dfs.append(row[14]) - with open(self._drug_forms_file, "w") as file: - yaml.dump(dfs, file) - - def _zip_handler(self, dl_path: Path, outfile_path: Path) -> None: - """Extract required files from RxNorm zip. This method should be passed to - the base class's _http_download method. - :param Path dl_path: path to RxNorm zip file in tmp directory - :param Path outfile_path: path to RxNorm data directory - """ - rrf_path = outfile_path / f"rxnorm_{self._version}.RRF" - with zipfile.ZipFile(dl_path, "r") as zf: - rrf = zf.open("rrf/RXNCONSO.RRF") - target = open(rrf_path, "wb") - with rrf, target: - shutil.copyfileobj(rrf, target) - remove(dl_path) - self._src_file = rrf_path - self._create_drug_form_yaml() - logger.info("Successfully retrieved source data for RxNorm") - - def _download_data(self) -> None: - """Download latest RxNorm data file. - - :raises DownloadException: if API Key is not defined in the environment. - """ - logger.info("Retrieving source data for RxNorm") - api_key = environ.get("RXNORM_API_KEY") - if api_key is None: - logger.error("Could not find RXNORM_API_KEY in environment variables.") - raise DownloadException("RXNORM_API_KEY not found.") - - url = bioversions.resolve("rxnorm").homepage - if not url: - raise DownloadException("Could not resolve RxNorm homepage") - - self._http_download( - f"https://uts-ws.nlm.nih.gov/download?url={url}&apiKey={api_key}", - self._src_dir, - handler=self._zip_handler - ) + with open(file, "w") as f: + yaml.dump(dfs, f) - def _get_existing_files(self) -> List[Path]: - """Get existing source RRF files from data directory. - :return: sorted list of file objects - """ - return list(sorted(self._src_dir.glob("rxnorm_*.RRF"))) + _DataSourceClass = RxNormData + + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. - def _extract_data(self, use_existing: bool = False) -> None: - """Get source files from RxNorm data directory. - This class expects a file named `rxnorm_.RRF` and a file named - `rxnorm_drug_forms_.yaml`. This method will download and - generate them if they are unavailable. + This method is responsible for initializing an instance of + ``self._DataSourceClass``, and, in most cases, setting ``self._src_file``. :param bool use_existing: if True, don't try to fetch latest source data """ - super()._extract_data(use_existing) - drug_forms_path = self._src_dir / f"rxnorm_drug_forms_{self._version}.yaml" - if not drug_forms_path.exists(): - self._create_drug_form_yaml() - else: - self._drug_forms_file = drug_forms_path + data_source = self._DataSourceClass(data_dir=self._therapy_data_dir) + self._src_file, self._version = data_source.get_latest(from_local=use_existing) + + drug_form_data_handler = CustomData( + src_name="rxnorm", + file_suffix="yaml", + latest_version_cb=lambda: self._version, + download_cb=lambda file, version: self._create_drug_form_yaml( + file, version, self._src_file + ), + data_dir=self._therapy_data_dir, + file_name="rxnorm_drug_forms", + ) + self._drug_forms_file, _ = drug_form_data_handler.get_latest() def _transform_data(self) -> None: """Transform the RxNorm source.""" diff --git a/therapy/etl/wikidata.py b/therapy/etl/wikidata.py index bde19de4..a2879dc8 100644 --- a/therapy/etl/wikidata.py +++ b/therapy/etl/wikidata.py @@ -2,9 +2,11 @@ import json import logging import datetime +from pathlib import Path from typing import Dict, Any from wikibaseintegrator.wbi_helpers import execute_sparql_query +from wags_tails.custom import CustomData from therapy import XREF_SOURCES, DownloadException from therapy.schemas import SourceName, NamespacePrefix, RecordParams, SourceMeta @@ -79,10 +81,14 @@ class Wikidata(Base): """Class for Wikidata ETL methods.""" - def _download_data(self) -> None: - """Download latest Wikidata source dump.""" - logger.info("Retrieving source data for Wikidata") + @staticmethod + def _download_data(file: Path, version: str) -> None: + """Download latest Wikidata source dump. + :param file: location to save data dump at + :param version: not used by this method + :raise DownloadException: if SPARQL query fails + """ medicine_query_results = execute_sparql_query(SPARQL_QUERY) if medicine_query_results is None: raise DownloadException("Wikidata medicine SPARQL query failed") @@ -94,16 +100,35 @@ def _download_data(self) -> None: for attr in item: params[attr] = item[attr]["value"] transformed_data.append(params) - with open(f"{self._src_dir}/wikidata_{self._version}.json", "w+") as f: + with open(file, "w+") as f: json.dump(transformed_data, f) - logger.info("Successfully retrieved source data for Wikidata") - def get_latest_version(self) -> str: + @staticmethod + def _get_latest_version() -> str: """Wikidata is constantly, immediately updated, so source data has no strict versioning. We use the current date as a pragmatic way to indicate the version. """ return datetime.datetime.today().strftime("%Y-%m-%d") + _DataSourceClass = CustomData + + def _extract_data(self, use_existing: bool) -> None: + """Acquire source data. + + This method is responsible for initializing an instance of + ``self._DataSourceClass``, and, in most cases, setting ``self._src_file``. + + :param bool use_existing: if True, don't try to fetch latest source data + """ + data_source: CustomData = self._DataSourceClass( + src_name="wikidata_drugs", # type: ignore + file_suffix="json", # type: ignore + latest_version_cb=self._get_latest_version, # type: ignore + download_cb=self._download_data, # type: ignore + data_dir=self._therapy_data_dir + ) # type: ignore + self._src_file, self._version = data_source.get_latest(from_local=use_existing) + def _load_meta(self) -> None: """Add Wikidata metadata.""" metadata = SourceMeta(