diff --git a/hera_librarian/__init__.py b/hera_librarian/__init__.py index 0f3418f..17a3c70 100644 --- a/hera_librarian/__init__.py +++ b/hera_librarian/__init__.py @@ -1,484 +1,5 @@ -# -*- mode: python; coding: utf-8 -*- -# Copyright 2016 the HERA Team. -# Licensed under the BSD License. +""" +The LibrarianClient and its methods are defined in hera_librarian/client.py. +""" - -import json -import os.path -from pathlib import Path -import urllib.request, urllib.parse, urllib.error -from pkg_resources import get_distribution, DistributionNotFound -import requests -from typing import Optional -from pydantic import BaseModel - -__all__ = str(''' -all_connections -get_client_config -NoSuchConnectionError -RPCError -LibrarianClient -''').split() - - -try: - __version__ = get_distribution(__name__).version -except DistributionNotFound: - # package is not installed - pass - - -class NoSuchConnectionError(Exception): - def __init__(self, conn_name): - super(NoSuchConnectionError, self).__init__("no such connection " + repr(conn_name)) - self.conn_name = conn_name - - -def get_client_config(): - """Parse the client configuration file and return it as a dictionary.""" - path = os.path.expanduser('~/.hl_client.cfg') - with open(path, 'r') as f: - s = f.read() - return json.loads(s) - - -def all_connections(): - """Generate a sequence of LibrarianClient objects for all connections in the - configuration file. - - """ - config = get_client_config() - - for name, info in config.get('connections', {}).items(): - yield LibrarianClient(name, info) - - -class RPCError(Exception): - def __init__(self, req, message): - super(RPCError, self).__init__("RPC call %r failed: %s" % (req, message)) - self.req = req - self.message = message - - -def _normalize_deletion_policy(deletion_policy): - # Keep this test in sync with librarian_server/file.py:DeletionPolicy.parse_safe() - if deletion_policy not in ('allowed', 'disallowed'): - raise Exception('unrecognized deletion policy %r' % (deletion_policy,)) - return deletion_policy - -class LibrarianHTTPError(Exception): - def __init__(self, url, status_code, reason, suggested_remedy): - super(LibrarianHTTPError, self).__init__( - f"HTTP request to {url} failed with status code {status_code} and reason {reason}." - ) - self.url = url - self.status_code = status_code - self.reason = reason - self.suggested_remedy = suggested_remedy - - -class LibrarianClient(object): - conn_name = None - "The name of the Librarian connection we target." - - config = None - "The JSON config fragment corresponding to the desired connection." - - def __init__(self, conn_name, conn_config=None): - """ - A class for interacting with a Librarian server. - - If `conn_config` is not None, it should be a dict containing at least - the entries "url", and then "authenticator" (if the server uses - authenticator-based authentication) XOR "github_username" and - "github_pat" (if the server uses GitHub-based authentication), which - define how to talk to the target Librarian. Otherwise, the file - `~/.hl_client.cfg` will be used to look up a dict containing the same - information. - - A minimal `conn_config` dict should contain keys "url", and then either - "authenticator" OR "github_username" and "github_pat", which are used to - contact the Librarian's RPC API. Note that having both defined will - raise an error, as a server will only do authenticator- or GitHub-based - authentication. The user a priori should know which one the server they - are trying to contact uses. - - Parameters - ---------- - conn_name : str - A string defining the name of the connection to use. - conn_config : dict or None - A dictionary containing details for how to interact with the target - Librarian. If None, then we read from the user's ~/.hl_client.cfg - file. - """ - self.conn_name = conn_name - - if conn_config is not None: - self.config = conn_config - - if "url" in self.config: - if not "/" == self.config["url"][-1]: - self.config["url"] += "/" - else: - config = get_client_config() - self.config = config['connections'].get(conn_name) - if self.config is None: - raise NoSuchConnectionError(conn_name) - - def _do_http_post(self, operation, **kwargs): - """do a POST operation, passing a JSON version of the request and expecting a - JSON reply; return the decoded version of the latter. - - """ - # TODO: This is an awful way to do this configuration... - # figure out if we're using authenticator- or GitHub-based authentication - if "authenticator" in self.config: - if "github_username" in self.config or "github_pat" in self.config: - raise ValueError( - "both 'authenticator' and one or both of {'github_username', " - "'github_pat'} were specified in the config file. This is " - "not supported, please only use one or the other depending " - "on which remote server you are attempting to access." - ) - kwargs['authenticator'] = self.config['authenticator'] - else: - kwargs["github_username"] = self.config["github_username"] - kwargs["github_pat"] = self.config["github_pat"] - - for k in list(kwargs.keys()): - if kwargs[k] is None: - kwargs.pop(k) - req_json = json.dumps(kwargs) - - params = urllib.parse.urlencode({'request': req_json}).encode("utf-8") - url = self.config['url'] + 'api/' + operation - try: - f = urllib.request.urlopen(url, params) - reply = f.read() - except urllib.error.HTTPError as err: - reply = err.read() - try: - reply_json = json.loads(reply) - except ValueError: - raise RPCError(kwargs, 'failed to parse reply as JSON: ' + repr(reply)) - - if not reply_json.get('success', False): - raise RPCError(kwargs, - reply_json.get('message', '')) - - return reply_json - - def do_pydantic_http_post(self, endpoint: str, request_model: Optional[BaseModel] = None, response_model: Optional[BaseModel] = None): - """ - Do a POST operation, passing a JSON version of the request and expecting a - JSON reply; return the decoded version of the latter. - - Parameters - ---------- - endpoint : str - The endpoint to post to. - request_model : pydantic.BaseModel, optional - The request model to send. If None, we don't ask for anything. - response_model : pydantic.BaseModel, optional - The response model to expect. If None, we don't return anything. - - Returns - ------- - response_model - The decoded response model. - """ - full_endpoint = self.config["url"] + "api/v2/" + endpoint - - # Do not do authentication yet. - data = None if request_model is None else request_model.model_dump_json() - - r = requests.post( - full_endpoint, data=data, headers={"Content-Type": "application/json"} - ) - - # Decode the response. - if r.status_code not in [200, 201]: - try: - json = r.json() - except requests.exceptions.JSONDecodeError: - json = {} - - raise LibrarianHTTPError( - full_endpoint, - r.status_code, - r.json().get("reason", ""), - r.json().get("suggested_remedy", ""), - ) - - if response_model is not None: - # Note that the pydantic model wants the full bytes content - # not the deserialized r.json() - return response_model.model_validate_json(r.content) - else: - return None - - def ping(self): - """ - Ping the remote librarian to see if it exists. - - Returns - ------- - - PingResponse - The response from the remote librarian. - - Raises - ------ - - LibrarianHTTPError - If the remote librarian is unreachable. - - pydantic.ValidationError - If the remote librarian returns an invalid response. - """ - from .models.ping import PingRequest, PingResponse - - response: PingResponse = self.do_pydantic_http_post( - endpoint="ping", - request_model=PingRequest(), - response_model=PingResponse, - ) - - return response - - def probe_stores(self, **kwargs): - return self._do_http_post('probe_stores', **kwargs) - - def stores(self): - """Generate a sequence of Stores that are attached to the remote Librarian.""" - - from .base_store import BaseStore - info = self.probe_stores() - - for item in info['stores']: - yield BaseStore(item['name'], item['path_prefix'], item['ssh_host']) - - - def create_file_event(self, file_name, type, **kwargs): - """Note that keyword arguments to this function will automagically be stuffed - inside the "payload" parameter. - - """ - return self._do_http_post('create_file_event', - file_name=file_name, - type=type, - payload=kwargs, - ) - - def assign_observing_sessions(self, minimum_start_jd=None, maximum_start_jd=None): - return self._do_http_post('assign_observing_sessions', - minimum_start_jd=minimum_start_jd, - maximum_start_jd=maximum_start_jd, - ) - - def upload_file( - self, - local_path: Path, - dest_path: Path, - deletion_policy: str='disallowed', - null_obsid: bool=False, - ) -> dict: - - deletion_policy = _normalize_deletion_policy(deletion_policy) - - if dest_path.is_absolute(): - raise Exception(f"Destination path may not be absolute; got {dest_path}") - - # Ask the librarian for a staging directory, and a list of transfer managers - # to try. - - from .utils import get_size_from_path, get_md5_from_path - from .models.uploads import UploadInitiationRequest, UploadInitiationResponse, UploadCompletionRequest - - response: UploadInitiationResponse = self.do_pydantic_http_post( - endpoint="upload/stage", - request_model=UploadInitiationRequest( - upload_size=get_size_from_path(local_path), - upload_checksum=get_md5_from_path(local_path), - upload_name=dest_path.name, - destination_location=dest_path, - # TODO: Figure out a programatic way of getting this. - uploader="TEST_USER", - ), - response_model=UploadInitiationResponse, - ) - - from .transfers import CoreTransferManager - - transfer_managers = response.transfer_providers - - # Now try all the transfer managers. If they're valid, we try to use them. - # If they fail, we should probably catch the exception. - # TODO: Catch the exception on failure. - used_transfer_manager: Optional[CoreTransferManager] = None - used_transfer_manager_name: Optional[str] = None - - # TODO: Should probably have some manual ordering here. - for name, transfer_manager in transfer_managers.items(): - if transfer_manager.valid: - transfer_manager.transfer( - local_path=local_path, remote_path=response.staging_location - ) - - # We used this. - used_transfer_manager = transfer_manager - used_transfer_manager_name = name - - break - else: - print(f"Warning: transfer manager {name} is not valid.") - - if used_transfer_manager is None: - raise Exception("No valid transfer managers found.") - - # If we made it here, the file is successfully on the store! - - request = UploadCompletionRequest( - store_name=response.store_name, - staging_name=response.staging_name, - staging_location=response.staging_location, - upload_name=response.upload_name, - destination_location=dest_path, - transfer_provider_name=used_transfer_manager_name, - transfer_provider=used_transfer_manager, - meta_mode="infer", - deletion_policy=deletion_policy, - # TODO: Figure out what source name actually does. - # INFO: Source name is the person/librarian that uploaded this. - source_name="", - null_obsid=null_obsid, - # TODO: Figure out how to get this programattically. - uploader="TEST_USER", - transfer_id=response.transfer_id, - ) - - self.do_pydantic_http_post( - endpoint="upload/commit", - request_model=request, - ) - - return - - def register_instances(self, store_name, file_info): - return self._do_http_post( - 'register_instances', - store_name=store_name, - file_info=file_info, - ) - - def locate_file_instance(self, file_name): - return self._do_http_post( - 'locate_file_instance', - file_name=file_name, - ) - - def set_one_file_deletion_policy( - self, file_name, deletion_policy, restrict_to_store=None - ): - deletion_policy = _normalize_deletion_policy(deletion_policy) - - return self._do_http_post( - 'set_one_file_deletion_policy', - file_name=file_name, - deletion_policy=deletion_policy, - restrict_to_store=restrict_to_store, - ) - - def delete_file_instances(self, file_name, mode='standard', restrict_to_store=None): - return self._do_http_post( - 'delete_file_instances', - file_name=file_name, - mode=mode, - restrict_to_store=restrict_to_store, - ) - - def delete_file_instances_matching_query( - self, query, mode='standard', restrict_to_store=None - ): - return self._do_http_post( - 'delete_file_instances_matching_query', - query=query, - mode=mode, - restrict_to_store=restrict_to_store, - ) - - def launch_file_copy( - self, - file_name, - connection_name, - remote_store_path=None, - known_staging_store=None, - known_staging_subdir=None, - ): - return self._do_http_post( - 'launch_file_copy', - file_name=file_name, - connection_name=connection_name, - remote_store_path=remote_store_path, - known_staging_store=known_staging_store, - known_staging_subdir=known_staging_subdir, - ) - - def initiate_offload(self, source_store_name, dest_store_name): - return self._do_http_post( - 'initiate_offload', - source_store_name=source_store_name, - dest_store_name=dest_store_name, - ) - - def describe_session_without_event(self, source, event_type): - return self._do_http_post( - 'describe_session_without_event', - source=source, - event_type=event_type, - ) - - def launch_local_disk_stage_operation(self, user, search, dest_dir): - return self._do_http_post( - 'search', - stage_user=user, - search=search, - stage_dest=dest_dir, - type='instances-stores', - output_format='stage-the-files-json', - ) - - def search_sessions(self, search): - return self._do_http_post( - 'search', - search=search, - output_format='session-listing-json', - ) - - def search_files(self, search): - return self._do_http_post( - 'search', - search=search, - output_format='file-listing-json', - ) - - def search_instances(self, search): - return self._do_http_post( - 'search', - search=search, - output_format='instance-listing-json', - ) - - def search_observations(self, search): - return self._do_http_post( - 'search', - search=search, - output_format='obs-listing-json', - ) - - def gather_file_record(self, file_name): - return self._do_http_post('gather_file_record', file_name=file_name) - - def create_file_record(self, rec_info): - return self._do_http_post('create_file_record', **rec_info)# +from .client import LibrarianClient diff --git a/hera_librarian/base_store.py b/hera_librarian/base_store.py deleted file mode 100644 index 1e9ec11..0000000 --- a/hera_librarian/base_store.py +++ /dev/null @@ -1,639 +0,0 @@ -# -*- mode: python; coding: utf-8 -*- -# Copyright 2016 the HERA Collaboration -# Licensed under the BSD License. - -"""A data storage host that a Librarian knows about. - -Librarian clients that want to upload data, etc., SSH into the stores -directly. This class gathers the information and functions used for doing -that. - -""" - - - -__all__ = str(''' -BaseStore -''').split() - -import subprocess -import os.path -import time -import warnings - -from . import RPCError - -NUM_RSYNC_TRIES = 6 - - -class BaseStore(object): - """Note that the Librarian server code subclasses this class, so do not change - its structure without making sure that you're not breaking it. - - """ - name = None - path_prefix = None - ssh_host = None - - def __init__(self, name, path_prefix, ssh_host): - self.name = name - self.path_prefix = path_prefix - self.ssh_host = ssh_host - - # Direct store access. All paths sent to SSH commands should be filtered - # through self._path() to prepend the path_prefix and make sure that we're - # not accidentally passing absolute paths around. - - def _path(self, *pieces): - for p in pieces: - if os.path.isabs(p): - raise ValueError('store paths must not be absolute; got %r' % (pieces,)) - return os.path.join(self.path_prefix, *pieces) - - def _ssh_slurp(self, command, input=None): - """SSH to the store host, run a command, and return its standard output. Raise - an RPCError with standard error output if anything goes wrong. - - You MUST be careful about quoting! `command` is passed as an argument - to 'bash -c', so it goes through one layer of parsing by the shell on - the remote host. For instance, filenames containing '>' or ';' or '(' - or ' ' will cause problems unless you quote them appropriately. We do - *not* launch our SSH process through a shell, so only one layer of - shell quoting is required -- you'd need two if you were just typing - the command in a terminal manually. BUT THEN, you're probably writing - your command string as a Python string, so you probably need another - layer of Python string literal quoting on top of that! - - """ - argv = ['ssh', self.ssh_host, command] - - if input is None: - import os - stdin = open(os.devnull, 'rb') - else: - stdin = subprocess.PIPE - - proc = subprocess.Popen(argv, shell=False, stdin=stdin, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if input is None: - stdin.close() - stdout, stderr = proc.communicate(input=input) - - if proc.returncode != 0: - raise RPCError(argv, 'exit code %d; stdout:\n\n%r\n\nstderr:\n\n%r' - % (proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"))) - - return stdout - - def _stream_path(self, store_path): - """Return a subprocess.Popen instance that streams file contents on its - standard output. If the file is a flat file, this is well-defined; if - the file is a directory, the "contents" are its tar-ification, inside - one level of subdirectory named as the directory is. For instance, if - the target is a directory "/data/foo/bar", containing files "a" and - "b", the returned tar file will contain "bar/a" and "bar/b". - - """ - import os - argv = ['ssh', self.ssh_host, "librarian_stream_file_or_directory.sh '%s'" % - self._path(store_path)] - stdin = open(os.devnull, 'rb') - proc = subprocess.Popen(argv, shell=False, stdin=stdin, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdin.close() - return proc - - def _rsync_transfer(self, local_path, store_path): - """Copy a file to a particular path using rsync. - - Parameters - ---------- - local_path : str - Path to local file to be copied. - store_path : str - Path to store file at on destination host. - - Returns - ------- - None - - Raises - ------ - RPCError - Raised if rsync transfer does not complete successfully. - """ - # Rsync will nest directories in a way that we don't want if we don't - # end their names with "/", but it will error if we end a file name - # with "/". So we have to check: - - if os.path.isdir(local_path) and not local_path.endswith('/'): - local_suffix = '/' - else: - local_suffix = '' - - # flags: archive mode; keep partial transfers. Have SSH work in batch - # mode and turn off known hosts and host key checking to Just Work - # without needing prompts. We used to have SSH use compression, but this - # put too high of a CPU load on the paper1 correlator machine. You could - # imagine making that an option if it helped with data transfer from - # Karoo to US. - - argv = [ - 'rsync', - '-aP', - '-e', - 'ssh -c aes256-gcm@openssh.com -o BatchMode=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no', - local_path + local_suffix, - '%s:%s' % (self.ssh_host, self._path(store_path)) - ] - success = False - - for i in range(NUM_RSYNC_TRIES): - proc = subprocess.Popen(argv, shell=False, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - output = proc.communicate()[0] - - if proc.returncode == 0: - success = True - break - - if not success: - raise RPCError(argv, 'exit code %d; output:\n\n%r' % (proc.returncode, output)) - - def _globus_transfer( - self, - local_path, - store_path, - client_id, - transfer_token, - source_endpoint_id, - destination_endpoint_id, - host_path, - ): - """Copy a file to a particular path using globus. - - For further information on using the Python API for Globus, refer to the - Globus SDK docs: https://globus-sdk-python.readthedocs.io/en/stable/ - - Parameters - ---------- - local_path : str - Path to local file to be copied. - store_path : str - Path to store file at on destination host. Note that if the target - endpoint is a "shared endpoint", this may be relative to the globus - root directory rather than the filesystem root. - client_id : str - The globus client ID to use for the transfer. - transfer_token : str - The globus transfer token to use for the transfer. - source_endpoint_id : str - The globus endpoint ID of the source store. - destination_endpoint_id : str - The globus endpoint ID of the destination store. - host_path : str, optional - The `host_path` of the globus store. When using shared endpoints, - this is the root directory presented to the client. - - Returns - ------- - None - - Raises - ------ - RPCError - Raised if submission of transfer to globus does not complete - successfully or if globus_sdk package is not installed. - """ - try: - import globus_sdk - from globus_sdk.services.auth.errors import AuthAPIError - except ModuleNotFoundError: - raise RPCError( - "globus_sdk import", - "The globus_sdk package must be installed for globus " - "functionality. Please `pip install globus_sdk` and try again." - ) - - # check that both endpoint IDs have been specified - if source_endpoint_id is None or destination_endpoint_id is None: - raise RPCError( - "globus endpoint check", - "Both source_endpoint_id and destination_endpoint_id must be " - "specified to initiate globus transfer." - ) - - # make globus transfer client - client = globus_sdk.NativeAppAuthClient(client_id) - try: - authorizer = globus_sdk.RefreshTokenAuthorizer(transfer_token, client) - except AuthAPIError: - raise RPCError( - "globus authorization", - "globus authentication failed. Please check client_id and " - "authentication token and try again." - ) - tc = globus_sdk.TransferClient(authorizer=authorizer) - - # make a new data transfer object - basename = os.path.basename(os.path.normpath(local_path)) - tdata = globus_sdk.TransferData( - tc, - source_endpoint_id, - destination_endpoint_id, - sync_level="checksum", - label=basename, - notify_on_succeeded=False, - notify_on_failed=True, - notify_on_inactive=True, - ) - - # format the path correctly - store_path = self._path(store_path) - if host_path is not None: - if store_path.startswith(host_path): - store_path = store_path[len(host_path):] - # trim off leading "/" if present - if store_path.startswith("/"): - store_path = store_path.lstrip("/") - - # add data to be transferred - if os.path.isdir(local_path): - recursive = True - else: - recursive = False - tdata.add_item(local_path, store_path, recursive=recursive) - - # initiate transfer - transfer_result = tc.submit_transfer(tdata) - - # get task_id and query status until it finishes - task_id = transfer_result["task_id"] - while True: - task = tc.get_task(task_id) - if task["status"] == "SUCCEEDED": - return - elif task["status"] == "FAILED": - # get the events associated with this transfer to help with debugging - events = tc.task_event_list(task_id) - error_string = "" - for event in events: - error_string.append(event["time"] + ": " + event["description"] + "\n") - raise RPCError("globus transfer", 'events:\n\n%r' % (error_string)) - else: # task is "ACTIVE" - time.sleep(5) - - # Modifications of the store host. These should always be paired with - # appropriate modifications of the Librarian server database, either - # through an RPC call (if you're a client) or a direct change (if you're - # the server). - - def copy_to_store( - self, - local_path, - store_path, - try_globus=False, - client_id=None, - transfer_token=None, - source_endpoint_id=None, - destination_endpoint_id=None, - host_path=None, - ): - """Transfer a file to a particular path in the store. - - You should not copy files directly to their intended destinations. You - should use the Librarian `prepare_upload` RPC call to get a staging - directory and copy your files there; then use the `complete_upload` - RPC call to tell the Librarian that you're done. - - Parameters - ---------- - local_path : str - Path to the local file to upload. - store_path : str - Path to store file at on destination host. - try_globus : bool, optional - Whether to try to use globus to transfer. If False, or if globus - fails, automatically fall back on rsync. - client_id : str, optional - The globus client ID to use for the transfer. - transfer_token : str, optional - The globus transfer token to use for the transfer. - source_endpoint_id : str, optional - The globus endpoint ID of the source store. - destination_endpoint_id : str, optional - The globus endpoint ID of the destination store. - host_path : str, optional - The `host_path` of the globus store. When using shared endpoints, - this is the root directory presented to the client. Note that this - may be different from the `path_prefix` for a given store. - - Returns - ------- - None - """ - if try_globus: - try: - self._globus_transfer( - local_path, - store_path, - client_id, - transfer_token, - source_endpoint_id, - destination_endpoint_id, - host_path, - ) - except RPCError as e: - # something went wrong with globus--fall back on rsync - print("Globus transfer failed: {}\nFalling back on rsync...".format(e)) - self._rsync_transfer(local_path, store_path) - else: - # use rsync from the get-go - self._rsync_transfer(local_path, store_path) - - def _chmod(self, store_path, modespec): - """Change Unix permissions on a path in the store. - - `modespec` is a textual specification that is passed to the `chmod` - command. This is useful since Librarian "files" can be either Unix - files or directories, so for many use cases we do not necessarily want - to be operating in terms of numerical mode specifications. For the - same reason, we always provide the `-R` option to `chmod`. - - Returns the standard output of `chmod`, which should be empty on - success. RPCError will be raised if the invoked command exits with a - failure code. - - """ - return self._ssh_slurp("chmod -R '%s' '%s'" % (modespec, self._path(store_path))) - - def _move(self, source_store_path, dest_store_path, chmod_spec=None): - """Move a path in the store. - - We make sure that parent directories exist if needed. We refuse to - overwrite existing files. If `dest_store_path` is an existing - directory, we refuse to place the source file inside of it. - - I can't actually find a way to get `mv` to indicate an error in the - case that it refuses to overwrite the destination, so we test that the - mv succeeded by seeing if the source file disappeared. This approach - has the important attribute of not being racy. - - If the source file is already in the Librarian or is an upload from a - different Librarian, it may be read-only. You can't move a read-only - directory -- because its '..' entry needs to be altered, I think. - (This is not true for flat files -- the read/write permission that - matters is usually just that of the containing directory, not the item - itself.) So, we make the source item writable before moving in case it - it's a read-only directory. - - If a file was uploaded to this Librarian in read-write mode but we - want our files to be read-only, the uploaded data need to be made - read-only -- but, as per the previous paragraph, this must happen - after moving the data to their final destination. The `chmod_spec` - argument enables a recursive post-mv `chmod` to support this - functionality; the `chmod` could be done through a separate call, but - this way it can all be taken care of in one SSH invocation with a - minimal window of writeability. In principle it would be nice to have - *zero* window of writeability but so long as we support "files" that - are really directories, I believe that is simply not possible. - - """ - dest_parent = os.path.dirname(dest_store_path) - ssp = self._path(source_store_path) - dsp = self._path(dest_store_path) - - if chmod_spec is not None: - piece = " && chmod -R '%s' '%s'" % (chmod_spec, dsp) - else: - piece = '' - - return self._ssh_slurp( - "mkdir -p '%s' && chmod u+w '%s' && mv -nT '%s' '%s' && test ! -e '%s'%s" % - (self._path(dest_parent), ssp, ssp, dsp, ssp, piece) - ) - - def _delete(self, store_path, chmod_before=False): - """Delete a path from the store. - - We use the `-r` flag of `rm` to delete recursively, but not the `-f` - flag, so an error will be raised if the intended path does not exist. - Note that the standard input of `rm` will not be a terminal, so it - should never attempt to prompt if the file is read-only. - - The Librarian can be configured to make items read-only on ingest. If - that happens, in order to delete a directory we need to chmod it - first. Hence the `chmod_before` flag. - - """ - if chmod_before: - part1 = "chmod -R u+w '%s' && " % self._path(store_path) - else: - part1 = '' - return self._ssh_slurp(part1 + "rm -r '%s'" % self._path(store_path)) - - def _create_tempdir(self, key='libtmp'): - """Create a temporary directory in the store's root and return its "store - path". - - """ - # we need to convert the output of _ssh_slurp -- a path in bytes -- to a string - output = self._ssh_slurp('mktemp -d -p %s %s.XXXXXX' % (self.path_prefix, key)).decode("utf-8") - fullpath = output.splitlines()[-1].strip() - - if not fullpath.startswith(self.path_prefix): - raise RPCError('unexpected output from mktemp on %s: %s' - % (self.name, fullpath)) - - return fullpath[len(self.path_prefix) + 1:] - - # Interrogations of the store -- these don't change anything so they don't - # necessarily need to be paired with Librarian database modifications. - - def get_info_for_path(self, storepath): - """`storepath` is a path relative to our `path_prefix`. We assume that we are - not running on the store host, but can transparently SSH to it. - - """ - import json - text = self._ssh_slurp( - "python -c \'import hera_librarian.utils as u; u.print_info_for_path(\"%s\")\'" - % (self._path(storepath)) - ) - return json.loads(text) - - _cached_space_info = None - _space_info_timestamp = None - - def get_space_info(self): - """Get information about how much space is available in the store. We have a - simpleminded cache since it's nice just to be able to call the - function, but SSHing into the store every time is going to be a bit - silly. - - """ - import time - now = time.time() - - # 30 second lifetime: - if self._cached_space_info is not None and now - self._space_info_timestamp < 30: - return self._cached_space_info - - output = self._ssh_slurp('df -B1 %s' % self._path()) - bits = output.splitlines()[-1].split() - info = {} - info['used'] = int(bits[2]) # measured in bytes - info['available'] = int(bits[3]) # measured in bytes - info['total'] = info['used'] + info['available'] - - self._cached_space_info = info - self._space_info_timestamp = now - - return info - - @property - def capacity(self): - """Returns the total capacity of the store, in bytes. - - Accessing this property may trigger an SSH into the store host! - - """ - return self.get_space_info()['total'] - - @property - def space_left(self): - """Returns the amount of space left in the store, in bytes. - - Accessing this property may trigger an SSH into the store host! - - Note: we can't call this "available" since that conflicts with the - boolean availability flag in the server. - - """ - return self.get_space_info()['available'] - - @property - def usage_percentage(self): - """Returns the amount of the storage capacity that is currently used as a - percentage. - - Accessing this property may trigger an SSH into the store host! - - """ - info = self.get_space_info() - return 100. * info['used'] / (info['total']) - - def upload_file_to_other_librarian( - self, - conn_name, - rec_info, - local_store_path, - remote_store_path=None, - known_staging_store=None, - known_staging_subdir=None, - use_globus=False, - client_id=None, - transfer_token=None, - source_endpoint_id=None, - ): - """Upload a given file to a different Librarian. - - This function will SSH into the store host, from which it will launch an - rsync or globus transfer, and it will not return until everything is - done! This means that in the real world it may not return for hours, and - it will not infrequently raise an exception. - - TODO: there is no internal progress tracking; we just block and - eventually return some textual output from the rsync-within-SSH. This is - far from ideal. Globus offers some information about transfer progress, - but we do not (yet) have automated ways of retrieving it. - - Parameters - ---------- - conn_name : str - The name of the external librarian to upload a file to. - rec_info : dict - A dictionary containing information about the file records being - transferred. These are needed by the receiving librarian. - local_store_path : str - The full path to the file in the local store. - remote_store_path : str, optional - The full path to the file destination in the remote store. If not - specified, it will default to be the same as local_store_path. - known_staging_store : str, optional - The store corresponding to the already-uploaded file. Must be specified - if `known_staging_subdir` is specified. - known_staging_subdir : str, optional - The target directory corresponding to the already-uploaded file. Must by - specified if `known_staging_store` is specified. - use_globus : bool, optional - Whether to try to use globus to transfer. If False, or if globus - fails, automatically fall back on rsync. - client_id : str, optional - The globus client ID to use for the transfer. - transfer_token : str, optional - The globus transfer token to use for the transfer. - source_endpoint_id : str, optional - The globus endpoint ID of the source store. May be omitted, in which - case we assume it is a "personal" (as opposed to public) client. - - Returns - ------- - bytes - The output of the `_ssh_slurp` command to log into the store and - launch the librarian upload. - """ - if remote_store_path is None: - remote_store_path = local_store_path - - if (known_staging_store is None) ^ (known_staging_subdir is None): - raise ValueError('both known_staging_store and known_staging_subdir must be specified') - - if known_staging_store is None: - pre_staged_arg = '' - else: - pre_staged_arg = ' --pre-staged=%s:%s' % (known_staging_store, known_staging_subdir) - - import json - rec_text = json.dumps(rec_info) - - command = 'librarian upload --meta=json-stdin%s %s %s %s' % ( - pre_staged_arg, conn_name, self._path(local_store_path), remote_store_path) - # optional globus additions to the command - if use_globus: - command += f" --use_globus --client_id={client_id} --transfer_token={transfer_token}" - if source_endpoint_id is not None: - command += f" --source_endpoint_id={source_endpoint_id}" - - # actually run the command - return self._ssh_slurp(command, input=rec_text.encode("utf-8")) - - def upload_file_to_local_store(self, local_store_path, dest_store, dest_rel): - """Fire off an rsync process on the store that will upload a given file to - another store *on the same Librarian*. Like - `upload_file_to_other_librarian`, this function will SSH into the - store host launch an rsync, and not return until everything is done. - - `destrel` is the "store path" of where the file should be placed on - the destination Librarian. This should be a staging directory. - - This function is needed to implement the Librarian's "offload" - feature. - - """ - c = ("librarian offload-helper --name '%s' --pp '%s' --host '%s' " - "--destrel '%s' '%s'" % (dest_store.name, dest_store.path_prefix, - dest_store.ssh_host, dest_rel, self._path(local_store_path))) - return self._ssh_slurp(c) - - def check_stores_connections(self): - """Tell the store to check its ability to connect to other Librarians and - *their* stores. - - This just runs the command "librarian check-connections" and returns - its textual output. In principle this command could be given an option - to return its output in JSON and we could parse it, but for the - envisioned use cases text will be OK. - - """ - return self._ssh_slurp('librarian check-connections').decode('utf-8') diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index 3ed8d31..dd4e26e 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -1025,7 +1025,7 @@ def upload(args): try: from pathlib import Path - client.upload_file( + client.upload( local_path=Path(args.local_path), dest_path=Path(args.dest_store_path), deletion_policy=args.deletion, diff --git a/hera_librarian/client.py b/hera_librarian/client.py new file mode 100644 index 0000000..fab8eb0 --- /dev/null +++ b/hera_librarian/client.py @@ -0,0 +1,274 @@ +""" +The public-facing LibrarianClient object. +""" + +from pathlib import Path +from typing import TYPE_CHECKING, Optional + +import requests +from pydantic import BaseModel + +from .deletion import DeletionPolicy +from .exceptions import LibrarianError, LibrarianHTTPError +from .models.ping import PingRequest, PingResponse +from .models.uploads import (UploadCompletionRequest, UploadInitiationRequest, + UploadInitiationResponse) +from .utils import get_md5_from_path, get_size_from_path + +if TYPE_CHECKING: + from .transfers import CoreTransferManager + + +class LibrarianClient: + """ + A client for the Librarian API. + """ + + host: str + port: int + user: str + + def __init__(self, host: str, port: int, user: str): + """ + Create a new LibrarianClient. + + Parameters + ---------- + host : str + The hostname of the Librarian server. + port : int + The port of the Librarian server. + user : str + The name of the user. + """ + + if host[-1] == "/": + self.host = host[:-1] + else: + self.host = host + + self.port = port + self.user = user + + def __repr__(self): + return f"Librarian Client ({self.user}) for {self.host}:{self.port}" + + @property + def hostname(self): + return f"{self.host}:{self.port}/api/v2" + + def resolve(self, path: str): + """ + Resolve a path to a URL. + + Parameters + ---------- + path : str + The path to resolve. + + Returns + ------- + str + The resolved URL. + """ + + if path[0] == "/": + return f"{self.hostname}{path}" + else: + return f"{self.hostname}/{path}" + + def post( + self, + endpoint: str, + request: Optional[BaseModel] = None, + response: Optional[BaseModel] = None, + ) -> Optional[BaseModel]: + """ + Do a POST operation, passing a JSON version of the request and expecting a + JSON reply; return the decoded version of the latter. + + Parameters + ---------- + endpoint : str + The endpoint to post to. + request : pydantic.BaseModel, optional + The request model to send. If None, we don't ask for anything. + response : pydantic.BaseModel, optional + The response model to expect. If None, we don't return anything. + + Returns + ------- + response, optional + The decoded response model, or None. + + Raises + ------ + + LibrarianHTTPError + If the HTTP request fails. + + pydantic.ValidationError + If the remote librarian returns an invalid response. + """ + + data = None if request is None else request.model_dump_json() + + r = requests.post( + self.resolve(endpoint), + data=data, + headers={"Content-Type": "application/json"}, + ) + + if str(r.status_code)[0] != "2": + try: + json = r.json() + except requests.exceptions.JSONDecodeError: + json = {} + + raise LibrarianHTTPError( + url=endpoint, + status_code=r.status_code, + reason=json.get("reason", ""), + suggested_remedy=json.get( + "suggested_remedy", "" + ), + ) + + if response is None: + return None + else: + # Note that the pydantic model wants the full bytes content + # not the deserialized r.json() + return response.model_validate_json(r.content) + + def ping(self) -> PingResponse: + """ + Ping the remote librarian to see if it exists. + + Returns + ------- + + PingResponse + The response from the remote librarian. + + Raises + ------ + + LibrarianHTTPError + If the remote librarian is unreachable. + + pydantic.ValidationError + If the remote librarian returns an invalid response. + """ + + response: PingResponse = self.post( + endpoint="ping", + request=PingRequest(), + response=PingResponse, + ) + + return response + + def upload( + self, + local_path: Path, + dest_path: Path, + deletion_policy: DeletionPolicy | str = DeletionPolicy.DISALLOWED, + ): + """ + Upload a file or directory to the librarian. + + Parameters + ---------- + local_path : Path + Path of the file or directory to upload. + dest_path : Path + The destination 'path' on the librarian store (often the same as your filename, but may be under some root directory). + deletion_policy : DeletionPolicy | str, optional + Whether or not this file may be deleted, by default DeletionPolicy.DISALLOWED + + Returns + ------- + dict + _description_ + + Raises + ------ + ValueError + If the provided path is incorrect. + LibrarianError: + If the remote librarian cannot be transferred to. + """ + + if isinstance(deletion_policy, str): + deletion_policy = DeletionPolicy.from_str(deletion_policy) + + if dest_path.is_absolute(): + raise ValueError(f"Destination path may not be absolute; got {dest_path}") + + # Ask the librarian for a staging directory, and a list of transfer managers + # to try. + + response: UploadInitiationResponse = self.post( + endpoint="upload/stage", + request=UploadInitiationRequest( + upload_size=get_size_from_path(local_path), + upload_checksum=get_md5_from_path(local_path), + upload_name=dest_path.name, + destination_location=dest_path, + uploader=self.user, + ), + response=UploadInitiationResponse, + ) + + transfer_managers = response.transfer_providers + + # Now try all the transfer managers. If they're valid, we try to use them. + # If they fail, we should probably catch the exception. + # TODO: Catch the exception on failure. + used_transfer_manager: Optional["CoreTransferManager"] = None + used_transfer_manager_name: Optional[str] = None + + # TODO: Should probably have some manual ordering here. + for name, transfer_manager in transfer_managers.items(): + if transfer_manager.valid: + transfer_manager.transfer( + local_path=local_path, remote_path=response.staging_location + ) + + # We used this. + used_transfer_manager = transfer_manager + used_transfer_manager_name = name + + break + else: + print(f"Warning: transfer manager {name} is not valid.") + + if used_transfer_manager is None: + raise LibrarianError("No valid transfer managers found.") + + # If we made it here, the file is successfully on the store! + request = UploadCompletionRequest( + store_name=response.store_name, + staging_name=response.staging_name, + staging_location=response.staging_location, + upload_name=response.upload_name, + destination_location=dest_path, + transfer_provider_name=used_transfer_manager_name, + transfer_provider=used_transfer_manager, + # Note: meta_mode is used in current status + meta_mode="infer", + deletion_policy=deletion_policy, + source_name=self.user, + # Note: we ALWAYS use null_obsid + null_obsid=True, + uploader=self.user, + transfer_id=response.transfer_id, + ) + + self.post( + endpoint="upload/commit", + request=request, + ) + + return diff --git a/librarian_server/deletion.py b/hera_librarian/deletion.py similarity index 75% rename from librarian_server/deletion.py rename to hera_librarian/deletion.py index efe2d89..b1be483 100644 --- a/librarian_server/deletion.py +++ b/hera_librarian/deletion.py @@ -1,13 +1,13 @@ -from .logger import log from enum import Enum + class DeletionPolicy(Enum): """ Enumeration for whether or not a file can be deleted from a store. Always defaults to 'DISALLOWED' when parsing. - """ - + """ + DISALLOWED = 0 ALLOWED = 1 @@ -19,5 +19,4 @@ def from_str(cls, text: str) -> "DeletionPolicy": elif text == "allowed": return cls.ALLOWED else: - log.warning(f"Unrecognized deletion policy {text}; using DISALLOWED") - return cls.DISALLOWED \ No newline at end of file + return cls.DISALLOWED diff --git a/hera_librarian/exceptions.py b/hera_librarian/exceptions.py new file mode 100644 index 0000000..c8b5094 --- /dev/null +++ b/hera_librarian/exceptions.py @@ -0,0 +1,19 @@ +""" +Exceptions for the hera_librarian client library. +""" + + +class LibrarianHTTPError(Exception): + def __init__(self, url, status_code, reason, suggested_remedy): + super(LibrarianHTTPError, self).__init__( + f"HTTP request to {url} failed with status code {status_code} and reason {reason}." + ) + self.url = url + self.status_code = status_code + self.reason = reason + self.suggested_remedy = suggested_remedy + + +class LibrarianError(Exception): + def __init__(self, message): + super(LibrarianError, self).__init__(message) diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index 6247589..272d275 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -21,7 +21,7 @@ TransferStatus, Librarian, ) -from librarian_server.deletion import DeletionPolicy +from hera_librarian.deletion import DeletionPolicy from hera_librarian.models.clone import ( CloneCompleteRequest, diff --git a/librarian_server/orm/instance.py b/librarian_server/orm/instance.py index 63815d7..339e6d4 100644 --- a/librarian_server/orm/instance.py +++ b/librarian_server/orm/instance.py @@ -4,8 +4,9 @@ what files have instances on remote librarians that we are aware about. """ +from hera_librarian.deletion import DeletionPolicy + from .. import database as db -from ..deletion import DeletionPolicy from ..settings import server_settings from datetime import datetime diff --git a/librarian_server/orm/librarian.py b/librarian_server/orm/librarian.py index 2a92318..bd8e56a 100644 --- a/librarian_server/orm/librarian.py +++ b/librarian_server/orm/librarian.py @@ -6,7 +6,8 @@ from datetime import datetime from .. import database as db -from hera_librarian import LibrarianClient, LibrarianHTTPError +from hera_librarian import LibrarianClient +from hera_librarian.exceptions import LibrarianHTTPError from pydantic import ValidationError class Librarian(db.Base): diff --git a/librarian_server/orm/storemetadata.py b/librarian_server/orm/storemetadata.py index 8e6e5d3..c27cabe 100644 --- a/librarian_server/orm/storemetadata.py +++ b/librarian_server/orm/storemetadata.py @@ -10,9 +10,9 @@ from ..stores import Stores, CoreStore from hera_librarian.transfers import CoreTransferManager, transfer_manager_from_name from hera_librarian.models.uploads import UploadCompletionRequest +from hera_librarian.deletion import DeletionPolicy from ..webutil import ServerError -from ..deletion import DeletionPolicy from enum import Enum from pathlib import Path diff --git a/tests/integration_test/conftest.py b/tests/integration_test/conftest.py index 8dfa8ce..02a0e96 100644 --- a/tests/integration_test/conftest.py +++ b/tests/integration_test/conftest.py @@ -77,11 +77,9 @@ def librarian_client(server) -> LibrarianClient: """ client = LibrarianClient( - conn_name="test", - conn_config={ - "url": f"http://localhost:{server.id}/", - "authenticator": None, - }, + host="http://localhost", + port=server.id, + user="test-A" ) yield client diff --git a/tests/integration_test/test_configuration.py b/tests/integration_test/test_configuration.py index d6cba7c..e16de14 100644 --- a/tests/integration_test/test_configuration.py +++ b/tests/integration_test/test_configuration.py @@ -25,14 +25,9 @@ def test_simple_ping(server): assert response.status_code -def test_ping_server(server): +def test_ping_server(librarian_client): """ Tests that we can ping the server. """ - client = LibrarianClient( - conn_name="test", - conn_config={"url": f"http://localhost:{server.id}/", "authenticator": None}, - ) - - assert client.ping() + assert librarian_client.ping() diff --git a/tests/integration_test/test_upload_file.py b/tests/integration_test/test_upload_file.py index f3026f2..59f0423 100644 --- a/tests/integration_test/test_upload_file.py +++ b/tests/integration_test/test_upload_file.py @@ -8,7 +8,7 @@ def test_upload_simple(librarian_client, garbage_file, server): # Perform the upload - librarian_client.upload_file(garbage_file, Path("test_file"), null_obsid=True) + librarian_client.upload(garbage_file, Path("test_file")) # Check we got it! # TODO: Implement that check within the librarian client (i.e. the ability to search db) @@ -31,8 +31,8 @@ def test_upload_simple(librarian_client, garbage_file, server): def test_upload_file_to_unique_directory(librarian_client, garbage_file, server): - librarian_client.upload_file( - garbage_file, Path("test_directory/test_file"), null_obsid=True + librarian_client.upload( + garbage_file, Path("test_directory/test_file") ) # Check we got it (by manually verifying)