Skip to content

Commit

Permalink
Expire SRA and GEO metadata after 14 days
Browse files Browse the repository at this point in the history
  • Loading branch information
arteymix committed Mar 5, 2024
1 parent b5aa676 commit 90985ba
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 6 deletions.
10 changes: 8 additions & 2 deletions rnaseq_pipeline/sources/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This module contains all the logic to retrieve RNA-Seq data from GEO.
"""

from datetime import timedelta
import gzip
import logging
from subprocess import Popen
Expand All @@ -21,6 +22,7 @@
from ..config import rnaseq_pipeline
from ..miniml_utils import collect_geo_samples, collect_geo_samples_info
from ..platforms import Platform, BgiPlatform, IlluminaPlatform
from ..targets import ExpirableLocalTarget
from ..utils import RerunnableTaskMixin
from .sra import DownloadSraExperiment

Expand Down Expand Up @@ -67,13 +69,15 @@ class DownloadGeoSampleMetadata(RerunnableTaskMixin, luigi.Task):
retry_count = 3

def run(self):
if self.output().is_stale():
logger.info('%s is stale, redownloading...', self.output())
res = requests.get('https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi', params=dict(acc=self.gsm, form='xml'))
res.raise_for_status()
with self.output().open('w') as f:
f.write(res.text)

def output(self):
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}.xml'.format(self.gsm)))
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}.xml'.format(self.gsm)), ttl=timedelta(days=14))

@requires(DownloadGeoSampleMetadata)
class DownloadGeoSample(TaskWithMetadataMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask):
Expand Down Expand Up @@ -113,14 +117,16 @@ class DownloadGeoSeriesMetadata(RerunnableTaskMixin, luigi.Task):
retry_count = 3

def run(self):
if self.output().is_stale():
logger.info('%s is stale, redownloading...', self.output())
res = requests.get('https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi', params=dict(acc=self.gse, form='xml', targ='gsm'))
res.raise_for_status()
with self.output().open('w') as f:
f.write(res.text)

def output(self):
# TODO: remove the _family suffix
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}_family.xml'.format(self.gse)))
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}_family.xml'.format(self.gse)), ttl=timedelta(days=14))

@requires(DownloadGeoSeriesMetadata)
class DownloadGeoSeries(DynamicTaskWithOutputMixin, DynamicWrapperTask):
Expand Down
8 changes: 6 additions & 2 deletions rnaseq_pipeline/sources/sra.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
import gzip
import logging
import os
Expand All @@ -14,6 +15,7 @@
import pandas as pd

from ..config import rnaseq_pipeline
from ..targets import ExpirableLocalTarget
from ..utils import remove_task_output, RerunnableTaskMixin

class sra(luigi.Config):
Expand Down Expand Up @@ -115,11 +117,13 @@ class DownloadSraExperimentRunInfo(TaskWithMetadataMixin, RerunnableTaskMixin, l
retry_count = 1

def run(self):
if self.output().is_stale():
logger.info('%s is stale, redownloading...', self.output())
with self.output().open('w') as f:
f.write(retrieve_runinfo(self.srx))

def output(self):
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srx)))
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srx)), ttl=timedelta(days=14))

@requires(DownloadSraExperimentRunInfo)
class DownloadSraExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask):
Expand Down Expand Up @@ -171,7 +175,7 @@ def run(self):
f.write(retrieve_runinfo(self.srp))

def output(self):
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srp)))
return luigi.ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'sra', '{}.runinfo'.format(self.srp)), ttl=timedelta(days=14))

@requires(DownloadSraProjectRunInfo)
class DownloadSraProject(DynamicTaskWithOutputMixin, DynamicWrapperTask):
Expand Down
34 changes: 33 additions & 1 deletion rnaseq_pipeline/targets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
from os.path import join, exists
from datetime import timedelta
from os.path import join, exists, getctime, getmtime
from time import time

import luigi
import requests
Expand Down Expand Up @@ -52,3 +54,33 @@ def __init__(self, dataset_short_name):

def exists(self):
return self._gemma_api.dataset_has_batch(self.dataset_short_name)

class ExpirableLocalTarget(luigi.LocalTarget):
"""
A local target that can expire according to a TTL value
The TTL can either be a timedelta of a float representing the number of
seconds past the creation time of the target that it will be considered
fresh. Once that delay expired, the target will not be considered as
existing.
By default, creation time is used as per os.path.getctime. Use the
`use_mtime` parameter to use the modification time instead.
"""
def __init__(self, path, ttl, use_mtime=False):
super().__init__(path)
if not isinstance(ttl, timedelta):
self._ttl = timedelta(seconds=ttl)
else:
self._ttl = ttl
self._use_mtime = use_mtime

def is_stale(self):
try:
creation_time = getmtime(self.path) if self._use_mtime else getctime(self.path)
except OSError:
return False # file is missing, assume non-stale
return creation_time + self._ttl.total_seconds() < time()

def exists(self):
return super().exists() and not self.is_stale()
26 changes: 25 additions & 1 deletion tests/test_targets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
from rnaseq_pipeline.targets import GemmaDatasetPlatform, GemmaDatasetHasBatch
import tempfile

from datetime import timedelta
from time import sleep
from rnaseq_pipeline.targets import GemmaDatasetPlatform, GemmaDatasetHasBatch, ExpirableLocalTarget

def test_gemma_targets():
assert GemmaDatasetHasBatch('GSE110256').exists()
assert GemmaDatasetPlatform('GSE110256', 'Generic_mouse_ncbiIds').exists()

def test_expirable_local_target():
with tempfile.TemporaryDirectory() as tmp_dir:
t = ExpirableLocalTarget(tmp_dir + '/test', ttl=timedelta(seconds=1))
assert not t.exists()
with t.open('w') as f:
pass
assert t.exists()
sleep(1)
assert not t.exists()

def test_expirable_local_target_with_float_ttl():
with tempfile.TemporaryDirectory() as tmp_dir:
t = ExpirableLocalTarget(tmp_dir + '/test', ttl=1.0)
assert not t.exists()
with t.open('w') as f:
pass
assert t.exists()
sleep(1)
assert not t.exists()

0 comments on commit 90985ba

Please sign in to comment.