Skip to content

Commit

Permalink
Merge pull request #69 from EBISPOT/ftp_as_shared_storage
Browse files Browse the repository at this point in the history
move files with globus, store validated files on ftp
  • Loading branch information
jdhayhurst authored Jun 3, 2020
2 parents 64c16aa + 14079fa commit e5cf352
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 47 deletions.
6 changes: 3 additions & 3 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ deploy_staging:
- RABBITPWD=$(kubectl --namespace rabbitmq get secret rabbitmq -o jsonpath="{.data.rabbitmq-password}" | base64 -d)
- helm init
- helm delete --purge gwas-sumstats-service || true
- helm install --name gwas-sumstats-service --set k8Namespace=gwas,image.repository=$CI_REGISTRY_IMAGE,image.tag=$CI_COMMIT_SHA,image.env.celeryUser=$RABBITUSER,image.env.celeryPwd=$RABBITPWD,image.env.validateSSH=false,image.env.gwasEndpointID=$GWAS_ENDPOINT_ID,image.env.globusSecret=$GLOBUS_SECRET,image.env.transferClientID=$TRANSFER_CLIENT_ID,image.env.clientID=$CLIENT_ID,image.env.ftpServer=$FTP_SERVER,image.env.ftpUser=$FTP_USERNAME,image.env.ftpPassword=$FTP_PASSWORD,image.env.mongoURI=$SANDBOX_MONGO_URI,image.env.mongoUser=$SANDBOX_MONGO_USER,image.env.mongoPassword=$SANDBOX_MONGO_PASSWORD,image.env.mongoDB=$SANDBOX_MONGO_DB ./k8chart/ --wait
- helm install --name gwas-sumstats-service --set k8Namespace=gwas,image.repository=$CI_REGISTRY_IMAGE,image.tag=$CI_COMMIT_SHA,image.env.celeryUser=$RABBITUSER,image.env.celeryPwd=$RABBITPWD,image.env.validateSSH=false,image.env.gwasEndpointID=$GWAS_ENDPOINT_ID,image.env.globusSecret=$GLOBUS_SECRET,image.env.transferClientID=$TRANSFER_CLIENT_ID,image.env.clientID=$CLIENT_ID,image.env.ftpServer=$FTP_SERVER,image.env.ftpUser=$FTP_USERNAME,image.env.ftpPassword=$FTP_PASSWORD,image.env.mongoURI=$SANDBOX_MONGO_URI,image.env.mongoUser=$SANDBOX_MONGO_USER,image.env.mongoPassword=$SANDBOX_MONGO_PASSWORD,image.env.mongoDB=$SANDBOX_MONGO_DB,image.env.validatedPath=$VALIDATED_PATH ./k8chart/ --wait
environment:
name: sandbox
only:
Expand All @@ -69,7 +69,7 @@ deploy_fallback:
- RABBITPWD=$(kubectl --namespace rabbitmq get secret rabbitmq -o jsonpath="{.data.rabbitmq-password}" | base64 -d)
- helm init
- helm delete --purge gwas-sumstats-service || true
- helm install --name gwas-sumstats-service --set k8Namespace=gwas,image.repository=$CI_REGISTRY_IMAGE,image.tag=$CI_COMMIT_SHA,image.env.celeryUser=$RABBITUSER,image.env.celeryPwd=$RABBITPWD,image.env.storagePath=$STORAGE_PATH,image.env.computeFarmLogin=$HX_LOGIN_NODE,image.env.computeFarmUser=$CLUSTER_USERNAME,image.env.httpProxy=$HX_HTTP_PROXY,image.env.httpsProxy=$HX_HTTPS_PROXY,image.env.gwasEndpointID=$GWAS_ENDPOINT_ID,image.env.globusSecret=$GLOBUS_SECRET,image.env.transferClientID=$TRANSFER_CLIENT_ID,image.env.clientID=$CLIENT_ID,image.env.ftpServer=$FTP_SERVER,image.env.ftpUser=$FTP_USERNAME,image.env.ftpPassword=$FTP_PASSWORD,image.env.mongoURI=$PROD_MONGO_URI,image.env.mongoUser=$PROD_MONGO_USER,image.env.mongoPassword=$PROD_MONGO_PASSWORD,image.env.mongoDB=$PROD_MONGO_DB,image.env.remoteHttpProxy=$HX_HTTP_PROXY,image.env.remoteHttpsProxy=$HX_HTTPS_PROXY,image.env.stagingPath=$STAGING_PATH ./k8chart/ --wait
- helm install --name gwas-sumstats-service --set k8Namespace=gwas,image.repository=$CI_REGISTRY_IMAGE,image.tag=$CI_COMMIT_SHA,image.env.celeryUser=$RABBITUSER,image.env.celeryPwd=$RABBITPWD,image.env.storagePath=$STORAGE_PATH,image.env.computeFarmLogin=$HX_LOGIN_NODE,image.env.computeFarmUser=$CLUSTER_USERNAME,image.env.httpProxy=$HX_HTTP_PROXY,image.env.httpsProxy=$HX_HTTPS_PROXY,image.env.gwasEndpointID=$GWAS_ENDPOINT_ID,image.env.globusSecret=$GLOBUS_SECRET,image.env.transferClientID=$TRANSFER_CLIENT_ID,image.env.clientID=$CLIENT_ID,image.env.ftpServer=$FTP_SERVER,image.env.ftpUser=$FTP_USERNAME,image.env.ftpPassword=$FTP_PASSWORD,image.env.mongoURI=$PROD_MONGO_URI,image.env.mongoUser=$PROD_MONGO_USER,image.env.mongoPassword=$PROD_MONGO_PASSWORD,image.env.mongoDB=$PROD_MONGO_DB,image.env.remoteHttpProxy=$HX_HTTP_PROXY,image.env.remoteHttpsProxy=$HX_HTTPS_PROXY,image.env.stagingPath=$STAGING_PATH,image.env.validatedPath=$VALIDATED_PATH ./k8chart/ --wait
environment:
name: fallback
when: manual
Expand All @@ -87,7 +87,7 @@ deploy_prod:
- RABBITPWD=$(kubectl --namespace rabbitmq get secret rabbitmq -o jsonpath="{.data.rabbitmq-password}" | base64 -d)
- helm init
- helm delete --purge gwas-sumstats-service || true
- helm install --name gwas-sumstats-service --set k8Namespace=gwas,image.repository=$CI_REGISTRY_IMAGE,image.tag=$CI_COMMIT_SHA,image.env.celeryUser=$RABBITUSER,image.env.celeryPwd=$RABBITPWD,image.env.storagePath=$STORAGE_PATH,image.env.computeFarmLogin=$HH_LOGIN_NODE,image.env.computeFarmUser=$CLUSTER_USERNAME,image.env.httpProxy=$HH_HTTP_PROXY,image.env.httpsProxy=$HH_HTTPS_PROXY,image.env.gwasEndpointID=$GWAS_ENDPOINT_ID,image.env.globusSecret=$GLOBUS_SECRET,image.env.transferClientID=$TRANSFER_CLIENT_ID,image.env.clientID=$CLIENT_ID,image.env.ftpServer=$FTP_SERVER,image.env.ftpUser=$FTP_USERNAME,image.env.ftpPassword=$FTP_PASSWORD,image.env.mongoURI=$PROD_MONGO_URI,image.env.mongoUser=$PROD_MONGO_USER,image.env.mongoPassword=$PROD_MONGO_PASSWORD,image.env.mongoDB=$PROD_MONGO_DB,image.env.remoteHttpProxy=$PG_HTTP_PROXY,image.env.remoteHttpsProxy=$PG_HTTPS_PROXY,image.env.stagingPath=$STAGING_PATH ./k8chart/ --wait
- helm install --name gwas-sumstats-service --set k8Namespace=gwas,image.repository=$CI_REGISTRY_IMAGE,image.tag=$CI_COMMIT_SHA,image.env.celeryUser=$RABBITUSER,image.env.celeryPwd=$RABBITPWD,image.env.storagePath=$STORAGE_PATH,image.env.computeFarmLogin=$HH_LOGIN_NODE,image.env.computeFarmUser=$CLUSTER_USERNAME,image.env.httpProxy=$HH_HTTP_PROXY,image.env.httpsProxy=$HH_HTTPS_PROXY,image.env.gwasEndpointID=$GWAS_ENDPOINT_ID,image.env.globusSecret=$GLOBUS_SECRET,image.env.transferClientID=$TRANSFER_CLIENT_ID,image.env.clientID=$CLIENT_ID,image.env.ftpServer=$FTP_SERVER,image.env.ftpUser=$FTP_USERNAME,image.env.ftpPassword=$FTP_PASSWORD,image.env.mongoURI=$PROD_MONGO_URI,image.env.mongoUser=$PROD_MONGO_USER,image.env.mongoPassword=$PROD_MONGO_PASSWORD,image.env.mongoDB=$PROD_MONGO_DB,image.env.remoteHttpProxy=$PG_HTTP_PROXY,image.env.remoteHttpsProxy=$PG_HTTPS_PROXY,image.env.stagingPath=$STAGING_PATH,image.env.validatedPath=$VALIDATED_PATH ./k8chart/ --wait
environment:
name: production
when: manual
Expand Down
1 change: 1 addition & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def _env_variable_else(env_var_name, default):
STORAGE_PATH = _env_variable_else('STORAGE_PATH', './data')
LOGGING_PATH = "./logs"
STAGING_PATH = _env_variable_else('STAGING_PATH', 'depo_ss_staging')
VALIDATED_PATH = _env_variable_else('VALIDATED_PATH', 'depo_ss_validated')


# --- Rabbit and Celery --- #
Expand Down
1 change: 1 addition & 0 deletions dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ENV QUEUE_HOST "rabbitmq.rabbitmq"
ENV QUEUE_PORT 5672
ENV STORAGE_PATH "./data"
ENV STAGING_PATH "./staging"
ENV VALIDATED_PATH "./validated"
ENV VALIDATE_WITH_SSH ""
ENV COMPUTE_FARM_LOGIN_NODE ""
ENV COMPUTE_FARM_USERNAME ""
Expand Down
2 changes: 2 additions & 0 deletions k8chart/templates/gwas-ss-worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
value: {{.Values.image.env.storagePath}}
- name: STAGING_PATH
value: {{.Values.image.env.stagingPath}}
- name: VALIDATED_PATH
value: {{.Values.image.env.validatedPath}}
- name: VALIDATE_WITH_SSH
value: "{{.Values.image.env.validateSSH}}"
- name: COMPUTE_FARM_LOGIN_NODE
Expand Down
1 change: 1 addition & 0 deletions k8chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ image:
httpProxy: ""
httpsProxy: ""
stagingPath: "./staging"
validatedPath: "./validated"


volume:
Expand Down
75 changes: 65 additions & 10 deletions sumstats_service/resources/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import validate.validator as val
import pathlib
from sumstats_service.resources.error_classes import *
import sumstats_service.resources.globus as globus
import ftplib


Expand Down Expand Up @@ -101,6 +102,14 @@ def set_store_path(self):
if self.study_id:
self.store_path = os.path.join(self.parent_path, str(self.study_id))

def set_valid_parent_path(self):
if self.study_id:
self.valid_parent_path = os.path.join(config.VALIDATED_PATH, self.callback_id)

def set_valid_path(self):
if self.study_id:
self.valid_path = os.path.join(self.valid_parent_path, str(self.study_id))

def download_from_dropbox(self):
url = self.file_path
url_parse = parse_url(url)
Expand Down Expand Up @@ -198,30 +207,73 @@ def get_dialect(self, csv_file):
logger.error("Guessing extension, setting to .tsv")
return ".tsv"

def move_file_to_staging(self):
self.staging_dir_name = str(self.staging_dir_name.replace(' ', ''))
self.staging_file_name = str(self.staging_file_name.replace(' ', ''))

def tidy_files(self):
# copy files to validated path on ftp
# clean up any files on the the nfs
self.set_parent_path()
self.set_store_path()
# We know the readme name exactly, but we don't know the extension of the sumstats file
source_readme = os.path.join(self.parent_path, str(self.study_id)) + ".README"
upload_to_ftp(server=config.FTP_SERVER, user=config.FTP_USERNAME, password=config.FTP_PASSWORD, source=source_readme, dest_dir=self.staging_dir_name, dest_file="README.txt")
upload_to_ftp(server=config.FTP_SERVER, user=config.FTP_USERNAME, password=config.FTP_PASSWORD, source=source_readme, parent_dir=config.VALIDATED_PATH, dest_dir=self.callback_id, dest_file="README.txt")
try:
self.store_path = glob(self.store_path + ".*[!log]")[0]
if self.store_path:
file_ext = self.get_ext()
dest_file = self.staging_file_name + file_ext
logger.info("syncing file: {} --> {}/{}".format(self.store_path, self.staging_dir_name, dest_file))
upload_to_ftp(server=config.FTP_SERVER, user=config.FTP_USERNAME, password=config.FTP_PASSWORD, source=self.store_path, dest_dir=self.staging_dir_name, dest_file=dest_file)
dest_file = self.study_id + file_ext
logger.info("syncing file: {} --> {}/{}".format(self.store_path, config.VALIDATED_PATH, os.path.join(self.callback_id, dest_file)))
upload_to_ftp(server=config.FTP_SERVER, user=config.FTP_USERNAME, password=config.FTP_PASSWORD, source=self.store_path, parent_dir=config.VALIDATED_PATH, dest_dir=self.callback_id, dest_file=dest_file)
else:
logger.error("Error: {}\nCould not locate file for {}".format(self.study_id))
return False
except (IndexError, FileNotFoundError, OSError) as e:
logger.error("Error: {}\nCould not move file {} to staging".format(e, self.store_path))
logger.error("Error: {}\nCould not move file {} to validated".format(e, self.store_path))
return False
return True
# TODO clear up the files on the store path
# close down globus endpoint


def move_files_to_staging(self):
# ftp mv from validated to staging
try:
self.set_valid_parent_path()
self.set_valid_path()
source_file = self.valid_path
source_file_ext = "".join(pathlib.Path(source_file).suffixes)
source_readme = os.path.join(self.valid_parent_path, "README.txt")

self.staging_dir_name = str(self.staging_dir_name.replace(' ', ''))
self.staging_file_name = str(self.staging_file_name.replace(' ', '')) + source_file_ext

dest_dir = os.path.join(config.STAGING_PATH, self.staging_dir_name)
dest_file = os.path.join(dest_dir, self.staging_file_name)

# move with globus
# move readme
readme_status = mv_file_with_globus(source=source_readme, dest_dir=dest_dir, dest=os.path.join(dest_dir, "README.txt"))
# move sumstats file
file_status = mv_file_with_globus(source=source_readme, dest_dir=dest_dir, dest=dest_file)
if readme_status is False:
logger.error("Error could not move {}".format(str(os.path.join(dest_dir, "README.txt"))))
if file_status is False:
logger.error("Error could not move {}".format(dest_file))
except (IndexError, FileNotFoundError, OSError) as e:
logger.error("Error: {}\nCould not move file {} to staging, callback ID: {}".format(e, self.staging_file_name, self.callback_id))
return False
return True


def mv_file_with_globus(dest_dir, source, dest):
#create the new dir
try:
globus.mkdir(unique_id=dest_dir)
except:
pass
status = globus.rename_file(dest_dir, source, dest)
return status


def md5_check(file):
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
Expand Down Expand Up @@ -319,6 +371,9 @@ def save_response_content(response, destination):
save_response_content(response, destination)





def download_from_ftp(server, user, password, source, dest):
try:
ftp = ftplib.FTP(server)
Expand All @@ -337,11 +392,11 @@ def download_from_ftp(server, user, password, source, dest):
return False


def upload_to_ftp(server, user, password, source, dest_dir, dest_file):
def upload_to_ftp(server, user, password, source, parent_dir, dest_dir, dest_file):
try:
ftp = ftplib.FTP(server)
ftp.login(user, password)
ftp.cwd(config.STAGING_PATH)
ftp.cwd(parent_dir)
filelist = []
ftp.retrlines('LIST',filelist.append)
dir_exists = False
Expand Down
87 changes: 53 additions & 34 deletions sumstats_service/resources/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import globus_sdk
from globus_sdk import (NativeAppAuthClient, TransferClient,
RefreshTokenAuthorizer, ConfidentialAppAuthClient)
from globus_sdk.exc import GlobusAPIError
from globus_sdk.exc import GlobusAPIError, TransferAPIError
from sumstats_service.resources.globus_utils import is_remote_session, enable_requests_logging
from pymongo import MongoClient
from bson.objectid import ObjectId
Expand All @@ -20,7 +20,7 @@
enable_requests_logging()


def mkdir(unique_id, email_address):
def mkdir(unique_id, email_address=None):
transfer = init()
endpoint_id = create_dir(transfer, unique_id, email_address)
return endpoint_id
Expand Down Expand Up @@ -125,39 +125,43 @@ def check_user(email):
return identity_id


def create_dir(transfer, uid, email):
identity_id = check_user(email)
if identity_id:
# create directory
transfer.operation_mkdir(config.GWAS_ENDPOINT_ID, uid)
# create shared endpoint
display_name = '-'.join([str(date.today()), uid[0:8]])
shared_ep_data = {
"DATA_TYPE": "shared_endpoint",
"host_endpoint": config.GWAS_ENDPOINT_ID,
"host_path": '/~/' + uid,
"display_name": 'ebi#gwas#' + display_name,
# optionally specify additional endpoint fields
"description": 'ebi#gwas#' + uid,
"owner_string": "GWAS Catalog",
"contact_email": "gwas-dev@ebi.ac.uk",
"organization": "EBI"
}
create_result = transfer.create_shared_endpoint(shared_ep_data)
endpoint_id = create_result.data['id']

# add user to endpoint
rule_data = {
"DATA_TYPE": "access",
"principal_type": "identity",
"principal": identity_id,
"path": '/',
"permissions": "rw"
}
transfer.add_endpoint_acl_rule(endpoint_id, rule_data)
return endpoint_id
def create_dir(transfer, uid, email=None):
if email:
identity_id = check_user(email)
if identity_id:
# create directory
transfer.operation_mkdir(config.GWAS_ENDPOINT_ID, uid)
# create shared endpoint
display_name = '-'.join([str(date.today()), uid[0:8]])
shared_ep_data = {
"DATA_TYPE": "shared_endpoint",
"host_endpoint": config.GWAS_ENDPOINT_ID,
"host_path": '/~/' + uid,
"display_name": 'ebi#gwas#' + display_name,
# optionally specify additional endpoint fields
"description": 'ebi#gwas#' + uid,
"owner_string": "GWAS Catalog",
"contact_email": "gwas-dev@ebi.ac.uk",
"organization": "EBI"
}
create_result = transfer.create_shared_endpoint(shared_ep_data)
endpoint_id = create_result.data['id']

# add user to endpoint
rule_data = {
"DATA_TYPE": "access",
"principal_type": "identity",
"principal": identity_id,
"path": '/',
"permissions": "rw"
}
transfer.add_endpoint_acl_rule(endpoint_id, rule_data)
return endpoint_id
else:
return None
else:
return None
transfer.operation_mkdir(config.GWAS_ENDPOINT_ID, uid)



def load_tokens_from_db():
Expand Down Expand Up @@ -225,6 +229,21 @@ def do_native_app_authentication(client_id, redirect_uri,
return token_response.by_resource_server


def rename_file(dest_dir, source, dest):
transfer = init()
try:
dir_ls = tr.operation_ls(config.GWAS_ENDPOINT_ID, path=dest_dir)
files = [os.path.join(dest_dir, f["name"]) for f in dir_ls]
if dest not in files:
transfer.operation_rename(config.GWAS_ENDPOINT_ID, source, dest)
except TransferAPIError as e:
print(e)
return False
return True




def main():
transfer = init()

Expand Down
1 change: 1 addition & 0 deletions sumstats_service/resources/study_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def validate_study(self):
if ssf.validate_file():
self.set_data_valid_status(1)
ssf.write_readme_file()
ssf.tidy_files() if config.VALIDATE_WITH_SSH else None
else:
self.set_data_valid_status(0)
self.set_error_code(3)
Expand Down

0 comments on commit e5cf352

Please sign in to comment.