diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index 15d4b22..8ff048b 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -7,14 +7,15 @@ """ import argparse +import json import os import sys -import json import time +from pathlib import Path from . import LibrarianClient - -from .exceptions import LibrarianClientRemovedFunctionality +from .exceptions import LibrarianClientRemovedFunctionality, LibrarianError +from .settings import client_settings __version__ = "TEST" @@ -85,14 +86,16 @@ def print_table(dict_list, col_list=None, col_names=None): myList = [col_list] # 1st row = header else: if len(col_names) != len(col_list): - raise ValueError("Number of column headers specified must match number of columns") + raise ValueError( + "Number of column headers specified must match number of columns" + ) myList = [col_names] for item in dict_list: - myList.append([str(item[col] or '') for col in col_list]) + myList.append([str(item[col] or "") for col in col_list]) # figure out the maximum size for each column colSize = [max(list(map(len, col))) for col in zip(*myList)] - formatStr = ' | '.join(["{{:<{}}}".format(i) for i in colSize]) - myList.insert(1, ['-' * i for i in colSize]) # Seperating line + formatStr = " | ".join(["{{:<{}}}".format(i) for i in colSize]) + myList.insert(1, ["-" * i for i in colSize]) # Seperating line for item in myList: print(formatStr.format(*item)) @@ -100,7 +103,7 @@ def print_table(dict_list, col_list=None, col_names=None): # from https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size -def sizeof_fmt(num, suffix='B'): +def sizeof_fmt(num, suffix="B"): """Format the size of a file in human-readable values. Parameters @@ -120,11 +123,11 @@ def sizeof_fmt(num, suffix='B'): Follows the Django convention of the web search where base-10 prefixes are used, but base-2 counting is done. """ - for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']: + for unit in ["", "k", "M", "G", "T", "P", "E", "Z"]: if abs(num) < 1024.0: return "{0:3.1f} {1:}{2:}".format(num, unit, suffix) num /= 1024.0 - return "{0:.1f} {1:}{2:}".format(num, 'Y', suffix) + return "{0:.1f} {1:}{2:}".format(num, "Y", suffix) # make the base parser @@ -145,10 +148,11 @@ def generate_parser(): description="librarian is a command for interacting with the hera_librarian" ) ap.add_argument( - "-V", "--version", + "-V", + "--version", action="version", version="librarian {}".format(__version__), - help="Show the librarian version and exit." + help="Show the librarian version and exit.", ) # add subparsers @@ -186,14 +190,25 @@ def config_add_file_event_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("add-file-event", description=doc, help=hlp) - sp.add_argument("conn_name", metavar="CONNECTION-NAME", type=str, - help=_conn_name_help) - sp.add_argument("file_path", metavar="PATH/TO/FILE", type=str, - help="The path to file in librarian.") - sp.add_argument("event_type", metavar="EVENT-TYPE", type=str, - help="The type of event.") - sp.add_argument("key_vals", metavar="key1=val1...", type=str, nargs="+", - help="key-value pairs of events.") + sp.add_argument( + "conn_name", metavar="CONNECTION-NAME", type=str, help=_conn_name_help + ) + sp.add_argument( + "file_path", + metavar="PATH/TO/FILE", + type=str, + help="The path to file in librarian.", + ) + sp.add_argument( + "event_type", metavar="EVENT-TYPE", type=str, help="The type of event." + ) + sp.add_argument( + "key_vals", + metavar="key1=val1...", + type=str, + nargs="+", + help="key-value pairs of events.", + ) sp.set_defaults(func=add_file_event) return @@ -208,12 +223,20 @@ def config_add_obs_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("add-obs", description=doc, help=hlp) - sp.add_argument("conn_name", metavar="CONNECTION-NAME", type=str, - help=_conn_name_help) - sp.add_argument("store_name", metavar="NAME", - help="The 'store' name under which the Librarian knows this computer.") - sp.add_argument("paths", metavar="PATHS", nargs="+", - help="The paths to the files on this computer.") + sp.add_argument( + "conn_name", metavar="CONNECTION-NAME", type=str, help=_conn_name_help + ) + sp.add_argument( + "store_name", + metavar="NAME", + help="The 'store' name under which the Librarian knows this computer.", + ) + sp.add_argument( + "paths", + metavar="PATHS", + nargs="+", + help="The paths to the files on this computer.", + ) sp.set_defaults(func=add_obs) return @@ -233,12 +256,21 @@ def config_assign_session_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("assign-sessions", description=doc, help=hlp) - sp.add_argument("--min-start-jd", dest="minimum_start_jd", metavar="JD", type=float, - help="Only consider observations starting after JD.") - sp.add_argument("--max-start-jd", dest="maximum_start_jd", metavar="JD", type=float, - help="Only consider observations starting before JD.") - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) + sp.add_argument( + "--min-start-jd", + dest="minimum_start_jd", + metavar="JD", + type=float, + help="Only consider observations starting after JD.", + ) + sp.add_argument( + "--max-start-jd", + dest="maximum_start_jd", + metavar="JD", + type=float, + help="Only consider observations starting before JD.", + ) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) sp.set_defaults(func=assign_sessions) return @@ -293,14 +325,24 @@ def config_delete_files_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("delete-files", description=doc, help=hlp) - sp.add_argument("-n", "--noop", dest="noop", action="store_true", - help="Enable no-op mode: nothing is actually deleted.") - sp.add_argument("--store", metavar="STORE-NAME", - help="Only delete instances found on the named store.") - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) - sp.add_argument("query", metavar="QUERY", - help="The JSON-formatted search identifying files to delete.") + sp.add_argument( + "-n", + "--noop", + dest="noop", + action="store_true", + help="Enable no-op mode: nothing is actually deleted.", + ) + sp.add_argument( + "--store", + metavar="STORE-NAME", + help="Only delete instances found on the named store.", + ) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "query", + metavar="QUERY", + help="The JSON-formatted search identifying files to delete.", + ) sp.set_defaults(func=delete_files) return @@ -317,12 +359,13 @@ def config_initiate_offload_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("initiate-offload", description=doc, help=hlp) - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) - sp.add_argument("source_name", metavar="SOURCE-NAME", - help="The name of the source store.") - sp.add_argument("dest_name", metavar="DEST-NAME", - help="The name of the destination store.") + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "source_name", metavar="SOURCE-NAME", help="The name of the source store." + ) + sp.add_argument( + "dest_name", metavar="DEST-NAME", help="The name of the destination store." + ) sp.set_defaults(func=initiate_offload) return @@ -341,17 +384,33 @@ def config_launch_copy_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("launch-copy", description=doc, help=hlp) - sp.add_argument("--dest", type=str, - help="The path in which the file should be stored at the destination. " - "Default is the same as used locally.") - sp.add_argument("--pre-staged", dest="pre_staged", metavar="STORENAME:SUBDIR", - help="Specify that the data have already been staged at the destination.") - sp.add_argument("source_conn_name", metavar="SOURCE-CONNECTION-NAME", - help="Which Librarian originates the copy; as in ~/.hl_client.cfg.") - sp.add_argument("dest_conn_name", metavar="DEST-CONNECTION-NAME", - help="Which Librarian receives the copy; as in ~/.hl_client.cfg.") - sp.add_argument("file_name", metavar="FILE-NAME", - help="The name of the file to copy; need not be a local path.") + sp.add_argument( + "--dest", + type=str, + help="The path in which the file should be stored at the destination. " + "Default is the same as used locally.", + ) + sp.add_argument( + "--pre-staged", + dest="pre_staged", + metavar="STORENAME:SUBDIR", + help="Specify that the data have already been staged at the destination.", + ) + sp.add_argument( + "source_conn_name", + metavar="SOURCE-CONNECTION-NAME", + help="Which Librarian originates the copy; as in ~/.hl_client.cfg.", + ) + sp.add_argument( + "dest_conn_name", + metavar="DEST-CONNECTION-NAME", + help="Which Librarian receives the copy; as in ~/.hl_client.cfg.", + ) + sp.add_argument( + "file_name", + metavar="FILE-NAME", + help="The name of the file to copy; need not be a local path.", + ) sp.set_defaults(func=launch_copy) return @@ -367,10 +426,8 @@ def config_locate_file_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("locate-file", description=doc, help=hlp) - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) - sp.add_argument("file_name", metavar="PATH", - help="The name of the file to locate.") + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument("file_name", metavar="PATH", help="The name of the file to locate.") sp.set_defaults(func=locate_file) return @@ -385,12 +442,25 @@ def config_offload_helper_subparser(sub_parsers): # add sub parser # purposely don't add help for this function, to prevent users from using it accidentally sp = sub_parsers.add_parser("offload-helper", description=doc) - sp.add_argument("--name", required=True, help="Displayed name of the destination store.") - sp.add_argument("--pp", required=True, help='"Path prefix" of the destination store.') - sp.add_argument("--host", required=True, help="Target SSH host of the destination store.") - sp.add_argument("--destrel", required=True, help="Destination path, relative to the path prefix.") - sp.add_argument("local_path", metavar="LOCAL-PATH", - help="The name of the file to upload on this machine.") + sp.add_argument( + "--name", required=True, help="Displayed name of the destination store." + ) + sp.add_argument( + "--pp", required=True, help='"Path prefix" of the destination store.' + ) + sp.add_argument( + "--host", required=True, help="Target SSH host of the destination store." + ) + sp.add_argument( + "--destrel", + required=True, + help="Destination path, relative to the path prefix.", + ) + sp.add_argument( + "local_path", + metavar="LOCAL-PATH", + help="The name of the file to upload on this machine.", + ) sp.set_defaults(func=offload_helper) return @@ -408,11 +478,15 @@ def config_search_files_subparser(sub_parsers): hlp = "Search for files matching a query" # add sub parser - sp = sub_parsers.add_parser("search-files", description=doc, epilog=example, help=hlp) - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) - sp.add_argument("search", metavar="JSON-SEARCH", - help="A JSON search specification; files that match will be displayed.") + sp = sub_parsers.add_parser( + "search-files", description=doc, epilog=example, help=hlp + ) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "search", + metavar="JSON-SEARCH", + help="A JSON search specification; files that match will be displayed.", + ) sp.set_defaults(func=search_files) return @@ -427,14 +501,20 @@ def config_set_file_deletion_policy_subparser(sub_parsers): # add sub parser sp = sub_parsers.add_parser("set-file-deletion-policy", description=doc, help=hlp) - sp.add_argument("--store", metavar="STORE-NAME", - help="Only alter instances found on the named store.") - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) - sp.add_argument("file_name", metavar="FILE-NAME", - help="The name of the file to modify.") - sp.add_argument("deletion", metavar="POLICY", - help='The new deletion policy: "allowed" or "disallowed"') + sp.add_argument( + "--store", + metavar="STORE-NAME", + help="Only alter instances found on the named store.", + ) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "file_name", metavar="FILE-NAME", help="The name of the file to modify." + ) + sp.add_argument( + "deletion", + metavar="POLICY", + help='The new deletion policy: "allowed" or "disallowed"', + ) sp.set_defaults(func=set_file_deletion_policy) return @@ -453,15 +533,27 @@ def config_stage_files_subparser(sub_parsers): hlp = "Stage the files matching a query" # add sub parser - sp = sub_parsers.add_parser("stage-files", description=doc, epilog=example, help=hlp) - sp.add_argument("-w", "--wait", dest="wait", action="store_true", - help="If specified, do not exit until the staging is done.") - sp.add_argument("conn_name", metavar="CONNECTION-NAME", - help=_conn_name_help) - sp.add_argument("dest_dir", metavar="DEST-PATH", - help="What directory to put the staged files in.") - sp.add_argument("search", metavar="JSON-SEARCH", - help="A JSON search specification; files that match will be staged.") + sp = sub_parsers.add_parser( + "stage-files", description=doc, epilog=example, help=hlp + ) + sp.add_argument( + "-w", + "--wait", + dest="wait", + action="store_true", + help="If specified, do not exit until the staging is done.", + ) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "dest_dir", + metavar="DEST-PATH", + help="What directory to put the staged files in.", + ) + sp.add_argument( + "search", + metavar="JSON-SEARCH", + help="A JSON search specification; files that match will be staged.", + ) sp.set_defaults(func=stage_files) return @@ -469,14 +561,14 @@ def config_stage_files_subparser(sub_parsers): def config_upload_subparser(sub_parsers): # function documentation - doc = """Upload a file to a Librarian. Do NOT use this script if the file that you + doc = """Upload a file to a Librarian. Do NOT use this script if the file that you wish to upload is already known to the local Librarian. In that case, use the "librarian launch-copy" script -- it will make sure to preserve the associated metadata correctly. """ - example = """The LOCAL-PATH specifies where to find the source data on this machine, and + example = """The LOCAL-PATH specifies where to find the source data on this machine, and can take any form. The DEST-PATH specifies where the data should be store in the Librarian and should look something like "2345678/data.txt". The 'basename' of DEST-PATH gives the unique filename under which the data @@ -488,74 +580,78 @@ def config_upload_subparser(sub_parsers): under the name "2345678" with an empty 'store path'. """ - hlp = "Upload files to the librarian" - - # add sub parser - sp = sub_parsers.add_parser("upload", description=doc, epilog=example, help=hlp) - sp.add_argument( - "--meta", - dest="meta", - default="infer", - help='How to gather metadata: "json-stdin" or "infer"', - ) - sp.add_argument( - "--null-obsid", - dest="null_obsid", - action="store_true", - help="Require the new file to have *no* obsid associate (for maintenance files)", - ) - sp.add_argument( - "--deletion", - dest="deletion", - default="disallowed", - help=( - 'Whether the created file instance will be deletable: "allowed" or "disallowed"' - ), - ) - sp.add_argument( - "--pre-staged", - dest="pre_staged", - metavar="STORENAME:SUBDIR", - help="Specify that the data have already been staged at the destination.", - ) - sp.add_argument( - "conn_name", metavar="CONNECTION-NAME", help=_conn_name_help, - ) - sp.add_argument( - "local_path", metavar="LOCAL-PATH", help="The path to the data on this machine.", - ) - sp.add_argument( - "dest_store_path", - metavar="DEST-PATH", - help='The destination location: combination of "store path" and file name.', - ) - sp.add_argument( - "--use_globus", - dest="use_globus", - action="store_true", - help="Specify that we should try to use globus to transfer data.", - ) - sp.add_argument( - "--client_id", - dest="client_id", - metavar="CLIENT-ID", - help="The globus client ID.", - ) - sp.add_argument( - "--transfer_token", - dest="transfer_token", - metavar="TRANSFER-TOKEN", - help="The globus transfer token.", - ) - sp.add_argument( - "--source_endpoint_id", - dest="source_endpoint_id", - metavar="SOURCE-ENDPOINT-ID", - help="The source endpoint ID for the globus transfer.", - ) - sp.set_defaults(func=upload) - - return + hlp = "Upload files to the librarian" + + # add sub parser + sp = sub_parsers.add_parser("upload", description=doc, epilog=example, help=hlp) + sp.add_argument( + "--meta", + dest="meta", + default="infer", + help='How to gather metadata: "json-stdin" or "infer"', + ) + sp.add_argument( + "--null-obsid", + dest="null_obsid", + action="store_true", + help="Require the new file to have *no* obsid associate (for maintenance files)", + ) + sp.add_argument( + "--deletion", + dest="deletion", + default="disallowed", + help=( + 'Whether the created file instance will be deletable: "allowed" or "disallowed"' + ), + ) + sp.add_argument( + "--pre-staged", + dest="pre_staged", + metavar="STORENAME:SUBDIR", + help="Specify that the data have already been staged at the destination.", + ) + sp.add_argument( + "conn_name", + metavar="CONNECTION-NAME", + help=_conn_name_help, + ) + sp.add_argument( + "local_path", + metavar="LOCAL-PATH", + help="The path to the data on this machine.", + ) + sp.add_argument( + "dest_store_path", + metavar="DEST-PATH", + help='The destination location: combination of "store path" and file name.', + ) + sp.add_argument( + "--use_globus", + dest="use_globus", + action="store_true", + help="Specify that we should try to use globus to transfer data.", + ) + sp.add_argument( + "--client_id", + dest="client_id", + metavar="CLIENT-ID", + help="The globus client ID.", + ) + sp.add_argument( + "--transfer_token", + dest="transfer_token", + metavar="TRANSFER-TOKEN", + help="The globus transfer token.", + ) + sp.add_argument( + "--source_endpoint_id", + dest="source_endpoint_id", + metavar="SOURCE-ENDPOINT-ID", + help="The source endpoint ID for the globus transfer.", + ) + sp.set_defaults(func=upload) + + return def add_file_event(args): @@ -564,12 +660,9 @@ def add_file_event(args): """ raise LibrarianClientRemovedFunctionality( - "add_file_event", - "File events are no longer part of the librarian." + "add_file_event", "File events are no longer part of the librarian." ) - return - def add_obs(args): """ @@ -577,12 +670,9 @@ def add_obs(args): """ raise LibrarianClientRemovedFunctionality( - "add_obs", - "Consider using the 'upload' command instead." + "add_obs", "Consider using the 'upload' command instead." ) - return - def launch_copy(args): """ @@ -591,11 +681,9 @@ def launch_copy(args): raise LibrarianClientRemovedFunctionality( "launch_copy", - "This is no longer required as it is handled by the background tasks." + "This is no longer required as it is handled by the background tasks.", ) - return - def assign_sessions(args): """ @@ -603,12 +691,9 @@ def assign_sessions(args): """ raise LibrarianClientRemovedFunctionality( - "assign_sessions", - "Observing sessions are no longer tracked." + "assign_sessions", "Observing sessions are no longer tracked." ) - return - def check_connections(args): """ @@ -616,316 +701,104 @@ def check_connections(args): as well as their stores. """ - from . import all_connections any_failed = False - for client in all_connections(): - print('Checking ability to establish HTTP connection to "%s" (%s) ...' % (client.conn_name, client.config['url'])) + for conn_name, conn_info in client_settings.connections.items(): + client = LibrarianClient.from_info(conn_info) try: - result = client.ping() - print(' ... OK') + client.ping() + print("Connection to {} ({}) succeeded.".format(conn_name, client.hostname)) except Exception as e: - print(' ... error: %s' % e) + print( + "Connection to {} ({}) failed: {}".format(conn_name, client.hostname, e) + ) any_failed = True - continue - - print(' Querying "%s" for its stores and how to connect to them ...' % client.conn_name) - - for store in client.stores(): - print(' Checking ability to establish SSH connection to remote store "%s" (%s:%s) ...' - % (store.name, store.ssh_host, store.path_prefix)) - - try: - result = store.get_space_info() - print(' ... OK') - except Exception as e: - print(' ... error: %s' % e) - any_failed = True if any_failed: sys.exit(1) - print() - print('Everything worked!') - def copy_metadata(args): """ Copy metadata for files from one librarian to another. """ - source_client = LibrarianClient(args.source_conn_name) - dest_client = LibrarianClient(args.dest_conn_name) - - # get metadata from source... - try: - rec_info = source_client.gather_file_record(args.file_name) - except RPCError as e: - die("fetching metadata failed: {}".format(e)) - - # ...and upload it to dest - try: - dest_client.create_file_record(rec_info) - except RPCError as e: - die("uploading metadata failed: {}".format(e)) - return + raise LibrarianClientRemovedFunctionality( + "copy_metadata", "Metadata copying is now handled using background tasks." + ) def delete_files(args): """ Request to delete instances of files matching a given query. """ - def str_or_huh(x): - if x is None: - return "???" - return str(x) - - # Let's do it - client = LibrarianClient(args.conn_name) - - if args.noop: - print("No-op mode enabled: files will not actually be deleted.") - print() - itemtext = "todelete" - summtext = "would have been deleted" - mode = "noop" - else: - itemtext = "deleted" - summtext = "were deleted" - mode = "standard" - try: - result = client.delete_file_instances_matching_query(args.query, - mode=mode, - restrict_to_store=args.store) - allstats = result["stats"] - except RPCError as e: - die("multi-delete failed: {}".format(e)) - - n_files = 0 - n_noinst = 0 - n_deleted = 0 - n_error = 0 - - for fname, stats in sorted(iter(allstats.items()), key=lambda t: t[0]): - nd = stats.get("n_deleted", 0) - nk = stats.get("n_kept", 0) - ne = stats.get("n_error", 0) - - if nd + nk + ne == 0: - # This file had no instances. Don't bother printing it. - n_noinst += 1 - continue - - n_files += 1 - n_deleted += nd - n_error += ne - deltext = str_or_huh(stats.get("n_deleted")) - kepttext = str_or_huh(stats.get("n_kept")) - errtext = str_or_huh(stats.get("n_error")) - - print("{0}: {1}={2} kept={3} error={4}".format(fname, itemtext, deltext, kepttext, errtext)) - - if n_files: - print("") - print("{:d} files were matched, {:d} had instances; {:d} instances {}".format( - n_files + n_noinst, n_files, n_deleted, summtext)) - - if n_error: - print("WARNING: {:d} error(s) occurred; see server logs for information".format(n_error)) - sys.exit(1) - - return + raise LibrarianClientRemovedFunctionality( + "delete_files", "Deletion is currently not available using the client." + ) def initiate_offload(args): """ Initiate an "offload": move a bunch of file instances from one store to another. """ - # Let's do it - client = LibrarianClient(args.conn_name) - - try: - result = client.initiate_offload(args.source_name, args.dest_name) - except RPCError as e: - die("offload failed to launch: {}".format(e)) - - if "outcome" not in result: - die('malformed server response (no "outcome" field): {}'.format(repr(result))) - - if result["outcome"] == "store-shut-down": - print("The store has no file instances needing offloading. It was placed offline and may now be closed out.") - elif result["outcome"] == "task-launched": - print("Task launched, intending to offload {} instances".format(str(result.get("instance-count", "???")))) - print() - print("A noop-ified command to delete offloaded instances from the store is:") - print(" librarian delete-files --noop --store '{}' '{}' '{\"at-least-instances\": 2}'".format( - args.source_name, args.conn_name)) - else: - die('malformed server response (unrecognized "outcome" field): {}'.format(repr(result))) - return + raise LibrarianClientRemovedFunctionality( + "initiate_offload", "Offloading is now handled using background tasks." + ) def locate_file(args): """ Ask the Librarian where to find a file. """ - # Let's do it - # In case the user has provided directory components: - file_name = os.path.basename(args.file_name) - client = LibrarianClient(args.conn_name) - - try: - result = client.locate_file_instance(file_name) - except RPCError as e: - die("couldn't locate file: {}".format(e)) - - print("{store_ssh_host}:{full_path_on_store}".format(**result)) - return + raise NotImplementedError( + "This needs to be implemented, but requires a change to the Librarian API." + ) def offload_helper(args): """ Launch this script to implement the "offload" functionality. """ - # Due to how the Librarian has to arrange things, it's possible that the - # instance that we want to copy was deleted before this script got run. If so, - # so be it -- don't signal an error. - if not os.path.exists(args.local_path): - print("source path {} does not exist -- doing nothing".format(args.local_path)) - sys.exit(0) - - # The rare librarian script that does not use the LibrarianClient class! - try: - dest = base_store.BaseStore(args.name, args.pp, args.host) - dest.copy_to_store(args.local_path, args.destrel) - except Exception as e: - die(e) - return + raise LibrarianClientRemovedFunctionality( + "offload_helper", "Offloading is now handled using background tasks." + ) def search_files(args): """ Search for files in the librarian. """ - # Let's do it - client = LibrarianClient(args.conn_name) - try: - result = client.search_files(args.search) - except RPCError as e: - die("search failed: {}".format(e)) - - nresults = len(result["results"]) - if nresults == 0: - # we didn't get anything - die("No files matched this search") - - print("Found {:d} matching files".format(nresults)) - # first go through entries to format file size and remove potential null obsids - for entry in result["results"]: - entry["size"] = sizeof_fmt(entry["size"]) - if entry["obsid"] is None: - entry["obsid"] = "None" - - # now print the results as a table - print_table(result["results"], ["name", "create_time", "obsid", "type", "size"], - ["Name", "Created", "Observation", "Type", "Size"]) - - return + raise NotImplementedError( + "This needs to be implemented, but requires a change to the Librarian API." + ) def set_file_deletion_policy(args): """ Set the "deletion policy" of one instance of this file. """ - # In case they gave a full path: - file_name = os.path.basename(args.file_name) - - # Let's do it - client = LibrarianClient(args.conn_name) - try: - result = client.set_one_file_deletion_policy(file_name, args.deletion_policy, - restrict_to_store=args.store) - except RPCError as e: - die("couldn't alter policy: {}".format(e)) - return + raise LibrarianClientRemovedFunctionality( + "set_file_deletion_policy", + "Deletion is currently not available using the client.", + ) def stage_files(args): """ Tell the Librarian to stage files onto the local scratch disk. """ - # Let's do it - client = LibrarianClient(args.conn_name) - - # Get the username. We could make this a command-line option but I think it's - # better to keep this a semi-secret. Note that the server does absolutely no - # verification of the values that are passed in. - - import getpass - user = getpass.getuser() - - # Resolve the destination in case the user provides, say, `.`, where the - # server is not going to know what that means. This will need elaboration if - # we add options for the server to come up with a destination automatically or - # other things like that. - our_dest = os.path.realpath(args.dest_dir) - try: - result = client.launch_local_disk_stage_operation(user, args.search, our_dest) - except RPCError as e: - die("couldn't start the stage operation: {}".format(e)) - - # This is a bit of future-proofing; we might teach the Librarian to choose a - # "reasonable" output directory on your behalf. - dest = result["destination"] - - print("Launched operation to stage {:d} instances ({:d} bytes) to {}".format( - result["n_instances"], result["n_bytes"], dest)) - - if not args.wait: - print("Operation is complete when {}/STAGING-IN-PROGRESS is removed.".format(dest)) - else: - # The API call should not return until the progress-marker file is - # created, so if we don't see that it exists, it should be the case that - # the staging started and finished before we could check. - if not os.path.isdir(dest): - die("cannot wait for staging to complete: destination directory {} not " - "visible on this machine. Missing network filesystem?".format(dest)) - - marker_path = os.path.join(dest, "STAGING-IN-PROGRESS") - t0 = time.time() - print("Started waiting for staging to finish at:", time.asctime(time.localtime(t0))) - - while os.path.exists(marker_path): - time.sleep(3) - - if os.path.exists(os.path.join(dest, "STAGING-SUCCEEDED")): - print("Staging completed successfully ({:0.1f}s elapsed).".format(time.time() - t0)) - sys.exit(0) - - try: - with open(os.path.join(dest, "STAGING-ERRORS"), "rt") as f: - print("Staging completed WITH ERRORS ({:0.1f}s elapsed).".format(time.time() - t0), - file=sys.stderr) - print("", file=sys.stderr) - for line in f: - print(line.rstrip(), file=sys.stderr) - sys.exit(1) - except IOError as e: - if e.errno == 2: - die("staging finished but neiher \"success\" nor \"error\" indicator was " - "created (no file {})".format(dest, "STAGING-ERRORS")) - raise - - return + raise LibrarianClientRemovedFunctionality( + "stage_files", "Staging is now handled automatically during upload." + ) def upload(args): @@ -934,43 +807,40 @@ def upload(args): """ # Argument validation is pretty simple if os.path.isabs(args.dest_store_path): - die("destination path must be relative to store top; got {}".format(args.dest_store_path)) + die( + "destination path must be relative to store top; got {}".format( + args.dest_store_path + ) + ) if args.null_obsid and args.meta != "infer": die('illegal to specify --null-obsid when --meta is not "infer"') if args.meta == "json-stdin": - try: - rec_info = json.load(sys.stdin) - except Exception as e: - die("cannot parse stdin as JSON data: {}".format(e)) - meta_mode = "direct" + raise LibrarianClientRemovedFunctionality( + "upload::json-stdin", "JSON metadata is no longer supported." + ) elif args.meta == "infer": - rec_info = {} - meta_mode = "infer" + pass else: die("unexpected metadata-gathering method {}".format(args.meta)) - known_staging_store = None - known_staging_subdir = None - - if args.pre_staged is not None: - known_staging_store, known_staging_subdir = args.pre_staged.split(":", 1) - # Let's do it - client = LibrarianClient(args.conn_name) + client = LibrarianClient.from_info(client_settings.connections[args.conn_name]) try: - from pathlib import Path - client.upload( local_path=Path(args.local_path), dest_path=Path(args.dest_store_path), deletion_policy=args.deletion, null_obsid=args.null_obsid, ) - except RPCError as e: - die("upload failed: {}".format(e)) + except ValueError as e: + die("Upload failed, check paths: {}".format(e)) + except LibrarianError as e: + die("Upload failed, librarian not contactable: {}".format(e)) + except Exception as e: + die("Upload failed (unknown error): {}".format(e)) return diff --git a/hera_librarian/client.py b/hera_librarian/client.py index fab8eb0..c2211c9 100644 --- a/hera_librarian/client.py +++ b/hera_librarian/client.py @@ -13,6 +13,7 @@ from .models.ping import PingRequest, PingResponse from .models.uploads import (UploadCompletionRequest, UploadInitiationRequest, UploadInitiationResponse) +from .settings import ClientInfo from .utils import get_md5_from_path, get_size_from_path if TYPE_CHECKING: @@ -53,6 +54,28 @@ def __init__(self, host: str, port: int, user: str): def __repr__(self): return f"Librarian Client ({self.user}) for {self.host}:{self.port}" + @classmethod + def from_info(cls, client_info: ClientInfo): + """ + Create a LibrarianClient from a ClientInfo object. + + Parameters + ---------- + client_info : ClientInfo + The ClientInfo object. + + Returns + ------- + LibrarianClient + The LibrarianClient. + """ + + return cls( + host=client_info.host, + port=client_info.port, + user=client_info.user, + ) + @property def hostname(self): return f"{self.host}:{self.port}/api/v2"